赞
踩
Kafka Connect是一个工具,Kafka Connect为kafka和外部数据系统之间移动数据提供了一种可靠且可伸缩的实现方式。Kafka Connect可以简单快捷的将数据从Kafka中导入或导出,数据范围涵盖关系型数据库、日志和度量数据、Hadoop和数据仓库、NoSQL数据存储、搜索索引等。
Kafka有两个核心概念:Source和Sink,Source和Sink都被称为Connector连接器。
在Kafka Connect中还有两个重要的概念:Task和Worker。
Kafka中的connect-standalone.sh脚本用来实现以独立的模式运行Kafka Connector。在独立模式下所有的操作都是在一个进程中完成的。
在执行这个脚本时需要指定两个配置文件:
先了解下Source连接器的用法:将文件source.txt中的内容通过Source连接器写入Kafka的主题Topic-connect。
首先修改用于Worker进程运行的配置文件($KAFKA_HOME/config/connect-standalone.properties),内容参考如下:
bootstrap.servers = localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
接下来修改Source连接器的配置文件($KAFKA_HOME/config/connect-file-source.properties),内容参考如下:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/kafka_2.11-2.0.0/source.txt
topic=topic-connect
比如对本例中的主题topic-connect而言,可以事先创建,详细信息如下:
bin/kafka-topic.sh --zookeeper localhost:2181/kafka --create --topic topic-connect --replication-factor 1 --partitions 1
启动Source连接器,示例如下:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
连接器启动后,向source.txt文件中输入两条句子:
echo "hello kafka connect" >> source.txt
echo "hello kafka connect" >> source.txt
之后可以观察主题topic-connect中是否包含这两条消息。既可以使用kafka-console-consumer.sh脚本,也可以使用kafka-dump-log.sh脚本来查看内容。
bin/kafka-dump-log.sh --files /tmp/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log
再来看一下Sink连接器的用法:将主题topic-connect中的内容通过Sink连接器写入文件sink.txt。对config/connect-standalone.properties文件做修改,参考如下:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
再配置Sink连接器的配置文件($KAFKA_HOME/config/connect-file-sink.properties),内容参考如下:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/kafka_2.11-2.0.0/sink.txt
topics=topic-connect
接下来启动Sink连接器
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
往主题topic-connect中发送一条消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-connect
hello kafka
就可以在sink.txt文件中看到这条消息
cat sink.txt
hello kafka
可以通过Kafka Connect提供的基于REST风格的API接口来管理连接器,默认端口号8083,可以通过Worker进程的配置文件中的rest.port参数来修改端口号。
Kafka Connect REST API接口如下所示:
REST API | 释义 |
---|---|
GET / | 查看kafka集群版本信息 |
GET /connectors | 查看当前活跃的连接器列表,显示连接器的名字 |
POST /connectors | 根据指定配置,创建一个新的连接器 |
GET /connectors/{name} | 查看置顶连接器的信息 |
GET /connectors/{name}/config | 查看连接器的配置信息 |
GET /connectors/{name}/status | 查看连接器的状态 |
POST /connectors/{name}/restart | 重启指定的连接器 |
PUT /connectors/{name}/pause | 暂停指定的连接器 |
GET /connectors/{name}/tasks | 查看指定连接器正在运行的Task |
POST /connectors/{name}/tasks | 修改Task的配置 |
GET /connector/{name}/tasks/{taskId}/status | 查看指定连接器中指定Task的状态 |
POST /connectors/{name}/tasks/{taskId}/restart | 重启指定连接器中指定的Task |
Delete /connectors/{name} | 删除指定的连接器 |
与独立模式不同,分布式模式结合了Kafka提供的负载均衡和故障转移功能,能够自动在多个节点机器上平衡负载。分布式模式只能通过访问REST API来创建连接器。
在运行分布式模式的连接器前,同样要修改Worker进程的相关配置文件($KAFKA_HOME/config/connect-distributed.properties),内容参考如下:
bootstrap.servers=localhost1:9092,localhost2:9092,localhost3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
启动分布式模式,运行脚本变成了对应的connect-distributed.sh,示例如下所示:
bin/connect-distributed.sh config/connect-distributed.properties
接下来调用POST /connectors接口来创建指定的连接器,示例如下:
curl -u debezium:************* 'http://debezium-001:8083/connectors' -X POST -i -H "Content-Type:application/json" -d
'
{
"name":"mysql-optics-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"task.max":1,
"database.hostname":"10.10.128.146",
"database.port":"3306",
"database.dbname":"unified_view_test",
"database.user":"insight_admin",
"database.password":"Kp1Kd8XkzMI8MgnzZfq",
"database.server.id":2022032008,
"database.server.name":"debezium-optics-test",
"database.include.list":"unified_view_test",
"table.include.list":"unified_view_test.retail_order_detail",
"database.history.kafka.bootstrap.servers":"kafka-002:9092,kafka-003:9092,kafka-001:9092",
"database.history.kafka.topic":"history-debezium-mysql-optics-test"
}
}
'
接下来就可以向distribute-source.txt文件中写入内容,然后订阅消费主题topic-distribute-source中的消息来验证是否成功。
使用完毕后,可以调用DELETE /connectors/{name}接口来删除对应的连接器。
curl -i -X DELETE http://localhost:8083/connectors/local-file-distribute-source
基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客,更多关于Debezium的技术可以阅读博主的debezium专栏:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。