赞
踩
①拉取镜像
docker pull canal/canal-server:v1.1.4
docker pull zookeeper
docker pull wurstmeister/kafka
①启动zookeeper(端口2181)
docker run -d --name zookeeper -p 2181:2181 --restart always zookeeper
①启动kafka(端口9092)
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[zkIP:端口] -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[对外暴露的IP:端口] -e KAFKA_LISTENERS=PLAINTEXT://[建立链接的IP:端口] -t wurstmeister/kafka
- [zkIP:端口]:我的IP:2181
- [对外暴露的IP:端口]:我的IP:9092
- [建立链接的IP:端口]:0.0.0.0:9092
②初始化kafka(进入opt/kafka/bin)
cd opt/kafka/bin
1、创建主题
kafka-topics.sh --create --zookeeper [zkIP]:[zk端口] --replication-factor 1 --partitions 1 --topic [主题名称]
2、创建生产者(订阅主题)
kafka-console-producer.sh --broker-list [kafkaIP]:9092 --topic [主题名称]
3、创建消费者(完成全部配置,依次启动zk、kafka、canal后就可以消费信息、这里提前记录、等下面配置完canal再创建消费者。)
kafka-console-consumer.sh --bootstrap-server [kafkaIP]:9092 --topic [主题名称]
③zookeeper可视化工具ZooInspector
在build目录下cmd输入启动命令java -jar zookeeper-dev-ZooInspector.jar
初始化kafka后可以看到主题(topic)等信息
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = [kafkaIP]:[kafka端口]
# 数据库地址 canal.instance.master.address=[mysqlIP]:3306 # binlog日志名称 canal.instance.master.journal.name=[binlog名称] # mysql主库链接时起始的binlog偏移量 canal.instance.master.position=[起始偏移量] # 在MySQL服务器授权的账号密码 canal.instance.dbUsername=[username] canal.instance.dbPassword=[password] # mq config canal.mq.topic=[主题名] # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
①启动canal(端口11111)
docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server:v1.1.4
- -v 映射配置文件
①创建消费者(完成全部配置,依次启动zk、kafka、canal后就可以消费信息)
kafka-console-consumer.sh --bootstrap-server [kafkaIP]:9092 --topic [主题名称]
成功消费信息!
注意依赖版本,以下是我的版本(踩坑了,搞了半天最后是版本不兼容)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.0.8.RELEASE</version>
</dependency>
spring: cloud: nacos: discovery: server-addr: [nacosIP]:8848 stream: kafka: binder: brokers: - [kafkaIP]:9092 auto-add-partitions: true auto-create-topics: true zk-nodes: - [zkIP]:2182 bindings: #配置自己定义的通道与哪个中间件交互 #ShopChannel里Input和Output的值 shop_input: destination: [topicName] #目标主题 shop_output: destination: [topicName] default-binder: kafka #默认的binder是kafka
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
public interface Processor extends Source, Sink {
}
public interface ShopChannel { /** * 发消息的通道名称 */ String SHOP_OUTPUT = "shop_output"; //application.yml /** * 消息的订阅通道名称 */ String SHOP_INPUT = "shop_input"; //application.yml /** * 发消息的通道 * @return */ @Output(SHOP_OUTPUT) MessageChannel sendShopMessage(); /** * 收消息的通道 * @return */ @Input(SHOP_INPUT) SubscribableChannel receiveShopMessage(); }
@Component
@Slf4j
public class KafkaReceiver {
@StreamListener(value = ShopChannel.SHOP_INPUT)
public void receive(Message<String> message) {
System.out.println(message.getPayload());
}
}
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(ShopChannel.class)
@Slf4j
public class ZqyCanalApplication {
public static void main(String[] args) {
ConfigurableApplicationContext application = SpringApplication.run(ZqyCanalApplication.class, args);
Environment env = application.getEnvironment(); //环境
String port = env.getProperty("server.port");
log.info("\n----------------------------------------------------------\n\t" +
"Service of Canal is running!" +
"\n----------------------------------------------------------");
}
}
成功消费信息!
参考
SpringCloud学习之SpringCloudStream&集成kafka
docker安装kafka及使用
用canal解析MySQLBinlog日志发送到Kafka
Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter
感谢大佬!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。