赞
踩
下面是关于kafka connect的官方定义
Kafka Connect是一种用于在Apache Kafka和其他系统之间以可伸缩的方式可靠地流式传输数据的工具。它使快速定义连接器变得简单,这些连接器可以将大量数据移入和移出kafka。Kafka Connect可以将整个数据库或从所有应用程序服务器收集到Kafka 的topic中,使数据可用于低延迟的流处理。导出作业可以将kafka topic中的数据传递到辅助存储和查询系统或批处理系统中,以便进行脱机分析。
首先下载kafka并安装到本地环境,如果不知道怎么安装请跳转到kafka系列-入门篇之安装。
分别启动zookeeper、kafka
kafka connect有两种方式,一种是单机模式,另一种是分布式模式。
启动命令如下,稍微有点长。其中connect-standalone.properties是关于kafka connect的配置,另外两个配置是文件输入输出的配置。
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties
name=local-file-source_demo
connector.class=FileStreamSource
tasks.max=1
file=/usr/local/apache/kafka_2.12-2.8.0/source.txt
topic=connect-demo
name=local-file-sink_demo
connector.class=FileStreamSink
tasks.max=1
file=/usr/local/apache/kafka_2.12-2.8.0/sink.txt
topics=connect-demo
测试结果如下
kafka connect在单机模式下,有一个connect.offset文件会记录file source connector的offset,来保证不会重复消费
启动分布式模式的kafka connect
connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties
使用postman提交一个file source demo
{
"name": "local-file-source_demo",
"config": {
"name":"local-file-source_demo",
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": 1,
"file": "/usr/local/apache/kafka_2.12-2.8.0/source.txt",
"topic": "connect-demo"
}
}
{
"name": "local-file-sink_demo",
"config": {
"name":"local-file-sink_demo",
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/usr/local/apache/kafka_2.12-2.8.0/sink.txt",
"topics": "connect-demo"
}
}
测试方式同单机一样,就忽略了
分布式模式有三个默认topic
官方提供的API
单机和分布式都可以使用API,包括connector的增删改查、任务状态的控制和plugins列表等等。这里我就不翻译了,直接贴出来重要的几个api
kafka connect定义connector和task来处理数据同步。了解源代码可以发现是connector来启动task,connector负责kafka的连接信息、topic,之后再启动一个或者多个task,去读取connector对应的信息,来同步数据。生产环境在同步mysql数据库时,如果同步失败,往往task就出现异常停止了,而connector还是正常工作。以后有机会在仔细的介绍吧。
访问这个地址第三方提供的connector插件,可以查看到许多三方提供的kafka connect插件,上游同步jdbc的debizium是个不错的选择,下游可以使用jdbc、elasticsearch、mongo搭配着,就可以做端到端的实时同步了。依赖kafka的可用性、扩展性和性能,是一个非常不错的解决方案。
通过一个简单的方式介绍了kafka connect的使用,实际生产使用的插件选择还比较多,做实时同步jdbc时,会遇到不少问题,有兴趣的可以留言一起讨论。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。