当前位置:   article > 正文

架构师系列- 消息中间件(15)-kafka业务实战

架构师系列- 消息中间件(15)-kafka业务实战

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

 

 

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

 3)通过swagger请求

 

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

 

7.2.3 部署实现

  1. 1)mysql部署
  2. 注意,需要打开binlog,8.0 默认处于开启状态
  3. #启动mysql8
  4. docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0
  5. 连上mysql,执行以下sql,添加canal用户
  6. CREATE USER canal IDENTIFIED BY 'canal';
  7. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  8. FLUSH PRIVILEGES;
  9. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
  10. 创建订单表
  11. CREATE TABLE `orders` (
  12. `id` int unsigned NOT NULL AUTO_INCREMENT,
  13. `name` varchar(255) DEFAULT NULL,
  14. PRIMARY KEY (`id`)
  15. );
  16. 2)canal部署
  17. #canal.properties
  18. #附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
  19. #修改servers为你的kafka的机器地址
  20. canal.serverMode = kafka
  21. kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
  22. #docker-compose.yml
  23. #附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
  24. #修改mysql的链接信息的链接信息
  25. #然后在当前目录下执行 docker-compose up -d
  26. version: '2'
  27. services:
  28. canal:
  29. image: canal/canal-server
  30. container_name: canal
  31. restart: always
  32. ports:
  33. - "10908:11111"
  34. environment:
  35. #mysql的链接信息
  36. canal.instance.master.address: 192.168.10.30:3306
  37. canal.instance.dbUsername: canal
  38. canal.instance.dbPassword: canal
  39. #投放到kafka的哪个主题?要提前准备好!
  40. canal.mq.topic: canal
  41. volumes:
  42. - "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
  43. 3)数据通道验证
  44. 进入kafka容器,用上面3.2.4里的命令行方式监听canal队列
  45. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal
  46. 在mysql上创建orders表,增删数据试一下
  47. mysql> insert into orders (name) values ('张三');
  48. Query OK, 1 row affected (0.03 sec)
  49. 在kafka控制台,可以看到同步的消息
  50. {"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}
  51. 数据通道已打通,还缺少的是druid作为消费端来接收消息
  52. 4)druid部署
  53. #druid.yml
  54. #在附带资料里有
  55. #随便找个目录,执行
  56. docker-compose -f druid.yml up -d
  57. 5)验证
  58. 配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

 

  1. #注意ip地址:192.168.10.30,全部换成你自己服务器的
  2. version: '3'
  3. services:
  4. zookeeper:
  5. image: zookeeper:3.4.13
  6. kafka-1:
  7. container_name: kafka-1
  8. image: wurstmeister/kafka:2.12-2.2.2
  9. ports:
  10. - 10903:9092
  11. - 10913:10913
  12. environment:
  13. KAFKA_BROKER_ID: 1
  14. HOST_IP: 192.168.10.30
  15. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  16. #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
  17. KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
  18. KAFKA_ADVERTISED_PORT: 10903
  19. KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"
  20. JMX_PORT: 10913
  21. volumes:
  22. - /etc/localtime:/etc/localtime
  23. depends_on:
  24. - zookeeper
  25. kafka-2:
  26. container_name: kafka-2
  27. image: wurstmeister/kafka:2.12-2.2.2
  28. ports:
  29. - 10904:9092
  30. - 10914:10914
  31. environment:
  32. KAFKA_BROKER_ID: 2
  33. HOST_IP: 192.168.10.30
  34. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  35. KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
  36. KAFKA_ADVERTISED_PORT: 10904
  37. KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"
  38. JMX_PORT: 10914
  39. volumes:
  40. - /etc/localtime:/etc/localtime
  41. depends_on:
  42. - zookeeper
  43. eagle:
  44. image: gui66497/kafka_eagle
  45. container_name: ke
  46. restart: always
  47. depends_on:
  48. - kafka-1
  49. - kafka-2
  50. ports:
  51. - "10907:8048"
  52. environment:
  53. ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

 

与km到底选哪个呢?

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/495782
推荐阅读
相关标签
  

闽ICP备14008679号