当前位置:   article > 正文

debezium系列之:Kafka Connect_debezium kafka connect

debezium kafka connect

Kafka Connect是一个工具,Kafka Connect为kafka和外部数据系统之间移动数据提供了一种可靠且可伸缩的实现方式。Kafka Connect可以简单快捷的将数据从Kafka中导入或导出,数据范围涵盖关系型数据库、日志和度量数据、Hadoop和数据仓库、NoSQL数据存储、搜索索引等。

一、Source和Sink

Kafka有两个核心概念:Source和Sink,Source和Sink都被称为Connector连接器。

  • Source负责导入数据到Kafka
  • Sink负责从Kafka导出数据

二、Task和Worker

在Kafka Connect中还有两个重要的概念:Task和Worker。

  • Task是Kafka Connect数据模型的主角,每一个Connector都会协调一系列的Task去执行任务,Connector可以把一项工作分割成许多Task,然后把Task分发到各个Worker进程中去执行(分布式模式下),Task不保存自己的状态信息,而是交给特定的Kafka主题去保存。
  • Connector和Task都是逻辑工作单位,比须安排在进程中执行,而在Kafka Connector中,这些进程就是Worker。

三、Kafka Connect特性

  • 通用性:规范化其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理
  • 支持独立模式(standalone)和分布式模式(distributed)
  • REST接口:使用REST API提交和管理Connector
  • 自动位移管理:自动管理位移提交,不需要开发人员干预,降低了开发成本
  • 分布式和可扩展性:Kafka Connect基于现有的组管理协议来实现扩展Kafka Connect集群
  • 流式计算/批处理的集成。

四、独立模式

Kafka中的connect-standalone.sh脚本用来实现以独立的模式运行Kafka Connector。在独立模式下所有的操作都是在一个进程中完成的。

  • 独立模式适合测试或功能验证的场景
  • 由于是单进程,所以独立模式无法充分利用Kafka自身所提供的负载均衡和高容错等特性。

在执行这个脚本时需要指定两个配置文件:

  • 一个是用于Worker进程运行的相关配置文件。
  • 另一个是指定Source连接器或Sink连接器的配置文件,可以同时指定多个连接器配置,每个连接器配置文件对应一个连接器
  • 因此,要保证连接器名称全局唯一,连接器名称通过name参数指定。

1.Source连接器的用法

先了解下Source连接器的用法:将文件source.txt中的内容通过Source连接器写入Kafka的主题Topic-connect。

首先修改用于Worker进程运行的配置文件($KAFKA_HOME/config/connect-standalone.properties),内容参考如下:

bootstrap.servers = localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • bootstrap.servers参数用来配置与Kafka集群连接的地址。
  • key.converter和value.converter参数指定Kafka消息中key和value的格式转化类。本例中使用JsonConverter来将每一条消息的key和value都转化成JSON格式。
  • key.converter.schemas.enable和value.converter.schemas.enable参数用来指定JSON消息中是否可以包含schema。
  • offset.storage.file.filename参数用于指定保存偏移量的文件路径。
  • offset.flush.interval.ms参数用于设定提交偏移量的频率。

接下来修改Source连接器的配置文件($KAFKA_HOME/config/connect-file-source.properties),内容参考如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/kafka_2.11-2.0.0/source.txt
topic=topic-connect
  • 1
  • 2
  • 3
  • 4
  • 5
  • name参数用来配置连接器的名称。
  • connector.class用来设置连接器类的全限定名称。Kafka Connect会在classpath中自动搜索这个类并加载。
  • 相关连接器主要有:kafka-connect-elasticsearch,kafka-connect-jdbc,kafka-connect-hdfs,kafka-connect-storage-cloud
  • task.max参数指定了Task的数量。
  • file参数指定该连接器数据源文件路径,指定了Kafka根目录下的source.txt文件,在启动连接器前需要先创建好。
  • topic参数设置连接器把数据导入哪个主题,如果该主题不存在,则连接器会自动创建,不过建议最好还是提前手工创建该主题。

比如对本例中的主题topic-connect而言,可以事先创建,详细信息如下:

bin/kafka-topic.sh --zookeeper localhost:2181/kafka --create --topic topic-connect --replication-factor 1 --partitions 1
  • 1

启动Source连接器,示例如下:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
  • 1

连接器启动后,向source.txt文件中输入两条句子:

echo "hello kafka connect" >> source.txt
echo "hello kafka connect" >> source.txt
  • 1
  • 2

之后可以观察主题topic-connect中是否包含这两条消息。既可以使用kafka-console-consumer.sh脚本,也可以使用kafka-dump-log.sh脚本来查看内容。

bin/kafka-dump-log.sh --files /tmp/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log
  • 1

2.Sink连接器的用法

再来看一下Sink连接器的用法:将主题topic-connect中的内容通过Sink连接器写入文件sink.txt。对config/connect-standalone.properties文件做修改,参考如下:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 将Kafka消息中的key和value的格式转化类指定为StringConverter

再配置Sink连接器的配置文件($KAFKA_HOME/config/connect-file-sink.properties),内容参考如下:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/kafka_2.11-2.0.0/sink.txt
topics=topic-connect
  • 1
  • 2
  • 3
  • 4
  • 5

接下来启动Sink连接器

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
  • 1

往主题topic-connect中发送一条消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-connect

hello kafka
  • 1
  • 2
  • 3

就可以在sink.txt文件中看到这条消息

cat sink.txt
hello kafka
  • 1
  • 2

五、REST API

可以通过Kafka Connect提供的基于REST风格的API接口来管理连接器,默认端口号8083,可以通过Worker进程的配置文件中的rest.port参数来修改端口号。

Kafka Connect REST API接口如下所示:

REST API释义
GET /查看kafka集群版本信息
GET /connectors查看当前活跃的连接器列表,显示连接器的名字
POST /connectors根据指定配置,创建一个新的连接器
GET /connectors/{name}查看置顶连接器的信息
GET /connectors/{name}/config查看连接器的配置信息
GET /connectors/{name}/status查看连接器的状态
POST /connectors/{name}/restart重启指定的连接器
PUT /connectors/{name}/pause暂停指定的连接器
GET /connectors/{name}/tasks查看指定连接器正在运行的Task
POST /connectors/{name}/tasks修改Task的配置
GET /connector/{name}/tasks/{taskId}/status查看指定连接器中指定Task的状态
POST /connectors/{name}/tasks/{taskId}/restart重启指定连接器中指定的Task
Delete /connectors/{name}删除指定的连接器

六、分布式模式

与独立模式不同,分布式模式结合了Kafka提供的负载均衡和故障转移功能,能够自动在多个节点机器上平衡负载。分布式模式只能通过访问REST API来创建连接器。

在运行分布式模式的连接器前,同样要修改Worker进程的相关配置文件($KAFKA_HOME/config/connect-distributed.properties),内容参考如下:

bootstrap.servers=localhost1:9092,localhost2:9092,localhost3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
  • 1
  • 2
  • 3
  • 4

启动分布式模式,运行脚本变成了对应的connect-distributed.sh,示例如下所示:

bin/connect-distributed.sh config/connect-distributed.properties
  • 1

接下来调用POST /connectors接口来创建指定的连接器,示例如下:

curl -u debezium:************* 'http://debezium-001:8083/connectors' -X POST -i -H "Content-Type:application/json" -d
'
{
    "name":"mysql-optics-connector",
    "config":{
        "connector.class":"io.debezium.connector.mysql.MySqlConnector",
        "task.max":1,
        "database.hostname":"10.10.128.146",
        "database.port":"3306",
        "database.dbname":"unified_view_test",
        "database.user":"insight_admin",
        "database.password":"Kp1Kd8XkzMI8MgnzZfq",
        "database.server.id":2022032008,
        "database.server.name":"debezium-optics-test",
        "database.include.list":"unified_view_test",
        "table.include.list":"unified_view_test.retail_order_detail",
        "database.history.kafka.bootstrap.servers":"kafka-002:9092,kafka-003:9092,kafka-001:9092",
        "database.history.kafka.topic":"history-debezium-mysql-optics-test"
    }
}
'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

接下来就可以向distribute-source.txt文件中写入内容,然后订阅消费主题topic-distribute-source中的消息来验证是否成功。

使用完毕后,可以调用DELETE /connectors/{name}接口来删除对应的连接器。

curl -i -X DELETE http://localhost:8083/connectors/local-file-distribute-source
  • 1

七、总结

基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客,更多关于Debezium的技术可以阅读博主的debezium专栏:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/782180
推荐阅读
相关标签
  

闽ICP备14008679号