赞
踩
一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。
source集群:kafka-2.6.0、2个broker、虚拟机
target集群:kafka-2.6.0、3个broker、k8s
工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)
需求:Topic名称不能改变、数据完整
条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true
本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。
并且在MM1多年的使用过程中发现了以下局限性:
MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:
秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。
执行 --help 查看参数选项:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help This tool helps to continuously copy data between two Kafka clusters. Option Description ------ ----------- --abort.on.send.failure <String: Stop Configure the mirror maker to exit on the entire mirror maker when a send a failed send. (default: true) failure occurs> --consumer.config <String: config file> Embedded consumer config for consuming from the source cluster. --consumer.rebalance.listener <String: The consumer rebalance listener to use A custom rebalance listener of type for mirror maker consumer. ConsumerRebalanceListener> --help Print usage information. --message.handler <String: A custom Message handler which will process message handler of type every record in-between consumer and MirrorMakerMessageHandler> producer. --message.handler.args <String: Arguments used by custom message Arguments passed to message handler handler for mirror maker. constructor.> --new.consumer DEPRECATED Use new consumer in mirror maker (this is the default so this option will be removed in a future version). --num.streams <Integer: Number of Number of consumption streams. threads> (default: 1) --offset.commit.interval.ms <Integer: Offset commit interval in ms. offset commit interval in (default: 60000) millisecond> --producer.config <String: config file> Embedded producer config. --rebalance.listener.args <String: Arguments used by custom rebalance Arguments passed to custom rebalance listener for mirror maker consumer. listener constructor as a string.> --version Display Kafka version. --whitelist <String: Java regex Whitelist of topics to mirror. (String)> [root@XXGL-T-TJSYZ-REDIS-03 bin]#
核心参数就两个:消费者和生产者的配置文件:
consumer.properties:(消费source集群)
bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
producer.properties:(发送消息至目标集群)
bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3
执行脚本:
./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"
MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。
有四种运行MM2的方法:
本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。
这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可
老样子,执行 --help 看看使用说明:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
可以看到,参数简单了许多,核心参数就一个配置文件。
mm2.properties:
name = event-center-connector connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max = 2 # 定义集群别名 clusters = event-center, event-center-new # 设置event-center集群的kafka地址列表 event-center.bootstrap.servers = source:9193 event-center.security.protocol=SASL_PLAINTEXT event-center.sasl.mechanism=PLAIN event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd"; # 设置event-center-new集群的kafka地址列表 event-center-new.bootstrap.servers = target:29092 event-center-new.security.protocol=SASL_PLAINTEXT event-center-new.sasl.mechanism=PLAIN event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd"; # 开启event-center集群向event-center-new集群同步 event-center->event-center-new.enabled = true # 允许同步topic的正则 event-center->event-center-new.topics = projects.* event-center->event-center-new.groups = .* # MM2内部同步机制使用的topic,replication数量设置 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 # 自定义参数 # 是否同步源topic配置 sync.topic.configs.enabled=true # 是否同步源event-centerCL信息 sync.topic.acls.enabled=true sync.group.offsets.enabled=true # 连接器是否发送心跳 emit.heartbeats.enabled=true # 心跳间隔 emit.heartbeats.interval.seconds=5 # 是否发送检查点 emit.checkpoints.enabled=true # 是否刷新topic列表 refresh.topics.enabled=true # 刷新间隔 refresh.topics.interval.seconds=60 # 是否刷新消费者组id refresh.groups.enabled=true # 刷新间隔 refresh.groups.interval.seconds=60 # DefaultReplicationPolicy / CustomReplicationPolicy replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy # 远端创建新topic的replication数量设置 replication.factor=3
需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。
官方也给出了解释:
这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。
针对如何自定义策略及使用方法,见我的另一篇文章:
为了保证脚本后台运行,写一个脚本包装一下:
run-mm2.sh:
#!/bin/bash
exec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &
之后执行脚本即可。
这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh
–help看看如何使用:
./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)
worker.properties:
bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAIN
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.properties:
name = MirrorSourceConnector topics = projects.* groups = * connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max = 1 # source # 这个配置会使同步之后的Topic都加上一个前缀,慎重 source.cluster.alias = old source.cluster.bootstrap.servers = source:9193 source.cluster.security.protocol=SASL_PLAINTEXT source.cluster.sasl.mechanism=PLAIN source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd"; # target target.cluster.alias = new target.cluster.bootstrap.servers = target:29092 target.cluster.security.protocol=SASL_PLAINTEXT target.cluster.sasl.mechanism=PLAIN target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE"; # 是否同步源topic配置信息 sync.topic.configs.enabled=true # 是否同步源ACL信息 sync.topic.acls.enabled=true sync.group.offsets.enabled=true # 连接器是否发送心跳 emit.heartbeats.enabled=true # 心跳间隔 emit.heartbeats.interval.seconds=5 # 是否发送检查点 emit.checkpoints.enabled=true # 是否刷新topic列表 refresh.topics.enabled=true # 刷新间隔 refresh.topics.interval.seconds=30 # 是否刷新消费者组id refresh.groups.enabled=true # 刷新间隔 refresh.groups.interval.seconds=30 # 连接器消费者预读队列大小 # readahead.queue.capacity=500 # 使用自定义策略 replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy replication.factor = 3
执行:
./connect-standalone.sh worker.properties connector.properties
这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接
验证:
消息数量 OK
使用kafka-tool工具连接上两个集群进行比对
Topic数量 OK
./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt
./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt
security.protocol = SASL_PLAINTEXT
sasl.mechanism = PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
新消息是否同步 OK
新Topic是否同步 OK
Consumer是否同步 NO
./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt
如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils
consumer offset是否同步 NO
ACL是否同步 OK
通过kafka-acls.sh或者客户端工具kafka-tool可以查看
property | default value | description |
---|---|---|
name | required | name of the connector, e.g. “us-west->us-east” |
topics | empty string | regex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported. |
topics.blacklist | “..internal, ..replica, __consumer_offsets” or similar | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. “.*” |
groups.blacklist | empty string | groups to exclude from replication |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes |
emit.heartbeats.enabled | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints.enabled | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics.enabled | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups.enabled | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time |
offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time |
replication.factor | 2 | used when creating remote topics |
参考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0
https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new
https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf
https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide
后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。