赞
踩
创建网络:docker network create mynet-a --driver bridge
获取zk镜像:docker pull zookeeper
获取zk管理程序镜像:docker pull maauso/zkui
获取kafka镜像:docker pull bitnami/kafka
获取kafka-manager镜像(可以界面创建和管理topic):docker pull sheepkiller/kafka-manager
获取kafka-map镜像(可以可视化看到kafka消息,每个分区消息):docker pull dushixiang/kafka-map
启动zk:docker run -d --name zk --network mynet -p 2181:2181 zookeeper
启动zkui:docker run -d --name zkui -p 9090:9090 --network mynet -e ZKLIST=zk:2181 maauso/zkui
启动3个kafka broker:这里要注意,网上搜的关于bitnami/kafka 大多过时,目前latest版本默认已经抛弃zk(据说是zk性能不佳,而且kafka不希望依赖第三方组件才能组成集群),转而使用自家的kraft算法进行集群的管理(有关kraft不多介绍,有兴趣可以自行百度),kraft需要配置controller端口以及voters的地址,所以使用网上方法启动有可能启动失败,或者启动后zk看不到任何信息。因为kraft仍处于实验阶段,而且zk可更直观看到kafka集群信息,我还是采用传统的kafka+zk。因此启动container需要加上-e KAFKA_ENABLE_KRAFT=no。
如下:
docker run -d --name k1 --network mynet -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ENABLE_KRAFT=no -e KAFKA_CFG_ZOOKEEPER_CONNECT=zk:2181 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://k1:9092 -p 9092:9092 bitnami/kafka
docker run -d --name k2 --network mynet -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ENABLE_KRAFT=no -e KAFKA_CFG_ZOOKEEPER_CONNECT=zk:2181 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://k2:9092 -p 9093:9092 bitnami/kafka
docker run -d --name k3 --network mynet -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ENABLE_KRAFT=no -e KAFKA_CFG_ZOOKEEPER_CONNECT=zk:2181 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://k3:9092 -p 9094:9092 bitnami/kafka
启动kafka-manager:docker run -d --name km --network mynet -p 9000:9000 -e ZK_HOSTS=zk:2181 sheepkiller/kafka-manager
启动kafka-map:docker run -d --name kmm --network mynet -p 9001:9000-p 9000:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin dushixiang/kafka-map
listeners:kafka监听的地址,默认PLAINTEXT://:9092 等同于PLAINTEXT://0.0.0.0:9092,代表监听所有ip的9092端口。
不熟悉socket的可能会有疑问,socket监听不就是监听自己机器的某个端口吗,这个ip是怎么回事?其实你的服务器可以拥有多个网卡,虚拟网卡,也代表除了127.0.0.1代表你的服务器外,你也可以连接着多个内网,拥有多个ip,因此listener可以指定ip进行监听,0.0.0.0代表监听所有ip,而127.0.0.1代表只监听本地,其他机器就连不上了,当然你也可以指定某个你自己的ip,也就只有通过那个ip才能连上当前kafka broker。
advertised listeners:通过启动后的zkui,不难发现zk里面登记的broker地址信息就是你填的这个advertised listeners,要组成集群,多个kafka之间需要知道大家的地址,就是通过连接zk,获取到其他兄弟的地址信息。因此这个advertised listeners要保证填的地址其他broker能够正常访问。
关于kafka的概念不作介绍,建议参考kafka中Topic、Partition、Groups、Brokers概念辨析_kafka topic group_LoongTech的博客-CSDN博客
创建topic:使用kafka-topics.sh或者直接在kafka-manager中创建即可
分区数量影响可以参考此文进行理解:Kafka partition的数量问题 - 简书
此处因为部署了3个broker,因此分区也设置为3,备份数量设置为3
创建后可在manager看到相关信息
此代码支持简单模拟多个生产者同时生产消息
@RestController @Slf4j public class TestController { @Autowired KafkaTemplate<String,String> kafkaTemplate; @Autowired RestTemplate restTemplate; @Value("${server.port}") private String port; @Value("${producers}") private String producers; @RequestMapping("send") public String send(Integer count){ log.info(port); log.info(producers); List<String> producerList = Arrays.asList(producers.split(",")); String uuid = UUID.randomUUID().toString(); for (String producer : producerList) { restTemplate.getForObject(String.format("http://127.0.0.1:%s/produce?uuid=%s&count=%s",producer, uuid, count),String.class); } return "ok"; } @RequestMapping("produce") public String produce(String uuid, Integer count){ asyncProduce(uuid, count); return "ok"; } @Async void asyncProduce(String uuid, Integer count){ for(int i = 0;i<count;i++){ String message = String.format("producer-%s-%s-%s",port,uuid,i); log.info(message); kafkaTemplate.send("test001",message); } } } "test001",message); } } }
然后本地启动该代码:
java -jar kp.jar --server.port=8080 --producers=8080 --spring.kafka.bootstrap-servers=x.x.x.x:9092
调用send接口后发现报错,连不上k1 k2 k3
这是因为kafka client会通过bootstrap-servers配置的地址连上 kafka,然后通过kafka对应的zk拿到集群里的所有broker配置,再根据配置里登记的endpoints连接上所有集群里的broker,这样做的好处集群可以动态扩容,调整,bootstrap-servers的配置也无需频繁更改,只需保证配置的一个或者几个kafka在集群里是常驻的,保证程序启动能连上某个节点即可。
此处因为我运行web的机器与docker所在服务器不在一个,所以kafka client拿到 k1:9092是无法访问到kafka的。
因此advertised listeners最好还是填外网地址,除非你保证所有应用都在同一个网桥下运行。
这里我重新修改3个kafka容器的KAFKA_CFG_ADVERTISED_LISTENERS为外网地址,然后client就可以正常推送消息了。
@Configuration
@Slf4j
public class Consumer {
@Value("${server.port}")
private String port;
@KafkaListener(topics="test001")
public void consume(String message){;
log.info(String.format("consumer-%s-%s",port,message));
}
}
启动一个消费者:
java -jar kc.jar --server.port=8090 --spring.kafka.bootstrap-servers=x.x.x.x:9092 --spring.kafka.consumer.group-id=default
调用127.0.0.1:8080/send?count=1000 测试推送1000条消息。
从上面例子的消费者日志可以看出,kafka消费消息是无序的,因为多分区的原因,生产者生产的消息会被分配到不同的分区,因此想要有序消费,只有单分区的情况下可行。并且如果有多生产者的情况下,单分区也未必能保证顺序消费,因为生产者是采用分批提交的方式,当一个批次的消息达到某个大小,或者到达某个周期,才会推送到kafka,这样可以避免生产者高频与kafka交互。
下面来模拟这么一种情况,一个分区,ab两个生产者:
a产生a1消息->b产生b1消息->a产生a2消息->a推送该批次->b产生b2消息->b推送改批次
正确顺序应该是a1->b1->a2->b2
但因为批次原因消费顺序会变为a1->a2->b1->b2
首先我们需要启动两个生产者,并且把批次推送的周期调大,方便模拟该情况:
java -jar kp.jar --server.port=8080 --producers=8080,8081 --spring.kafka.bootstrap-servers=8.130.136.133:9092 --spring.kafka.producer.properties.linger.ms=10000
java -jar kp.jar --server.port=8081 --producers=8080,8081 --spring.kafka.bootstrap-servers=8.130.136.133:9092 --spring.kafka.producer.properties.linger.ms=10000
然后调用两次接口http://127.0.0.1:8080/send?count=1
得到如下结果:
由此验证了上面的情况。
所以,某些应用场景下,想要保证相对的顺序消费,需要开发者通过其他方式实现,例如指定分区推送,通过key来保证顺序等等,但无论如何,都只是保证相对的顺序,想要做到严格的顺序消费,那是不可能的,建议参考如何保证kafka消费的顺序性_kafka顺序消费 如何控制_IT小谷的博客-CSDN博客。
接下来我们启动另外一个消费者组,我们会发现新的消费者组启动的时候并不会消费原本已经存在的旧消息,意味着新的消费者组创建的时候,offset会自动标记的最新的offset,某些场景下,例如新部署了一套系统,也想把旧消息消费过来,就需要更改spring.kafka.consumer.auto-offset-reset=earliest,这时候新的消费者组就会从头开始消费。
接下来我们来试试动态扩容会有什么表现
先把k2 k3容器停掉,然后创建topic,启动生产者消费者,然后启动k2,增加分区并且指向k2。
然后推送消息,可以看到消息会被推送到新的分区,也能正常消费,而程序不需要重启(kafkaclient可能需要过一小段时间才能连上扩容的分区)
下面我们来模拟这么一种场景,某个broker挂掉了,会出现什么情况。
使用一个3分区2备份的topic进行测试,如下
可以看到,0分区由 1001broker 管理,备份于自己和1002,1和2分区类推。
随便推送一些消息,通过kafkamap 确保每个分区都有消息
现在我们关掉1003
可以看到0分区已经交由1001备份管理,而且isr的1003已经断开。
再测试推3条消息,spring程序会自动断开1003,但因为由备份节点,仍然正常工作。
接着我们以earliest启动一个新的消费组,也可以看到工作正常。
下面我们再关掉1002,可以看到1分区已经报错,仍然正常工作。
现在生产者消费者仍然正常工作,但因为1分区已经挂了,所以消息不会存去1分区
现在我们以earliest启动一个新的消费组,可以看到新的消费者组是消费不到1分区的消息了,没有备份节点的数据将会暂时丢失。
但当我们再次启动所有节点,可以看到新的消费者组会重新消费1分区的消息。
建议参考RabbitMQ和Kafka比较_kafka和rabbitmq的区别_普通网友的博客-CSDN博客
十分重要的一个是kafka不支持ttl消息,延时队列等。
网上很多与此相关都是在介绍时间轮算法,实际上时间轮是用于kafka内部为了自己的延时操作,如消费者心跳检测,生产者ack超时等待等,并没有开放给用户使用,因此想要实现延时队列等不必纠结于时间轮,网上都是介绍这种算法而已。
实际上ttl可以通过消息header实现,而延时队列需要依赖于中间增加一个主题,通过中间消费实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。