当前位置:   article > 正文

kafka connect 分布式部署

kafka connect 分布式部署

1、环境介绍
操作系统:centos 7.9
jdk版本:8u291
kafka版本:2.8.0
kafka下载地址:
https://kafka.apache.org/downloads

节点清单:
10.99.27.111 kafkac01.wtown.com 4核心 8G内存 500G硬盘
10.99.27.112 kafkac02.wtown.com 4核心 8G内存 500G硬盘
10.99.27.113 kafkac03.wtown.com 4核心 8G内存 500G硬盘

在这里插入图片描述

2、设置主机名及host文件(三台机器)

10.99.27.111	kafkac01.wtown.com
10.99.27.112	kafkac02.wtown.com
10.99.27.113	kafkac03.wtown.com
10.99.27.11		zk01.wtown.com
10.99.27.12		zk02.wtown.com
10.99.27.13		zk03.wtown.co
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3、关闭防火墙和selinux(三台机器)
在这里插入图片描述
4、创建数据目录/data,并挂载数据盘(三台机器)

mkdir /data
  • 1

https://blog.csdn.net/zyj81092211/article/details/118054000

5、配置jdk
https://blog.csdn.net/zyj81092211/article/details/118055068

6、创建zookeeper集群
https://blog.csdn.net/zyj81092211/article/details/118066724

7、上传软件到服务器解压并重命名为kafka-connect(三台机器)
在这里插入图片描述
8、创建软连接到 /usr/local下(三台机器)

ln -s /data/kafka-connect /usr/local/kafka-connect
  • 1

9、更改配置文件(三台机器)
编辑配置文件server.properties,替换文件内容为下

vi /data/kafka-connect/config/server.properties
  • 1

kafakac01.wtown.com:

broker.id=111
listeners=PLAINTEXT://kafkac01.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

kafakac02.wtown.com:

broker.id=112
listeners=PLAINTEXT://kafkac02.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

kafakac03.wtown.com:

broker.id=113
listeners=PLAINTEXT://kafkac03.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

10、添加环境变量(三台机器)

# kafka environment
export KAFKA_HOME=/data/kafka-connect
export PATH=$PATH:$KAFKA_HOME/bin
  • 1
  • 2
  • 3

11、启动kafka集群(三台机器)

kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
  • 1

12、kafka集群状态
在这里插入图片描述
zookeeper状态:
在这里插入图片描述
13、创建插件目录

mkdir /data/kafka-connect/plugins
  • 1

14、修改connector配置文件connect-distributed.properties(三台机器配置一样)
在这里插入图片描述

bootstrap.servers=kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092
group.id=connect-cluster-01
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/data/kafka-connect/plugins

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

15、启动kafka connector

connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties
  • 1

16、查看kafka connector状态
在这里插入图片描述
查看kafka topic信息

kafka-topics.sh --list --zookeeper zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
  • 1

在这里插入图片描述
可以看到已经自动创建了配置文件中的topic connect-configs

17、测试样例
(1)创建测试目录和文件

mkdir /data/test
touch /data/test/in.txt
touch /data/test/out.txt
  • 1
  • 2
  • 3

(2)获取插件信息

curl http://kafkac01.wtown.com:8083/connector-plugins
  • 1

在这里插入图片描述
可以在https://www.sojson.com/格式化json数据(或者直接使用postman请求)
在这里插入图片描述
(3)建立source connector

curl -i -k  -H "Content-type: application/json" -X POST -d '{"name":"in","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","file":"/data/test/in.txt","topic":"localfiles"}}' http://kafkac01.wtown.com:8083/connectors
  • 1

(4)查看source connector

curl http://kafkac01.wtown.com:8083/connectors/in/status
  • 1

在这里插入图片描述
注意这里,测试程序是本地file获取,所以应该上connector运行的节点上进行文件输入操作,即10.99.27.112上
(5)查看topic

kafka-topics.sh --list --zookeeper zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
  • 1

在这里插入图片描述
(6)模拟消费者

kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic localfiles
  • 1

到10.99.27.112(connector运行的节点上)输入数据到int.txt
在这里插入图片描述

消费者这边已经接到数据
在这里插入图片描述

(7)创建sink connector

curl -i -k  -H "Content-type: application/json" -X POST -d '{"name":"out","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","topics":"localfiles","file":"/data/test/out.txt"}}' http://kafkac01.wtown.com:8083/connectors
  • 1

(8)查看sink connector

curl http://kafkac01.wtown.com:8083/connectors/out/status
  • 1

在这里插入图片描述
(9)查看out输出文件(还是要到connector运行节点上去看)
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/782241
推荐阅读
相关标签
  

闽ICP备14008679号