赞
踩
如图:
方法1 属于 groupA 这个组,方法2 属于 groupB 这个组,因为是属于不同的组,所以功能是类似 pub/sub 这个发布订阅模型的,就是他们都订阅了相同的主题,就都能消费到test2主题的消息。
如图:我发送的消息,因为两个监听方法不是同一个组的,所以都能消费到该主题的消息。
之前是小黑窗输入命令监听,现在直接启动项目,在控制台看监听的消息。
发送不带key的消息,也是一样的。
如图:方法1和方法2 属于同一个组,那么这两个监听方法就是以轮询的方式监听着test2主题里面的消息。
如图:test2主题有四个分区,然后我们写了两个监听方法,意味着一个监听方法会被分配去监听2个分区的消息。
然后我们发送不带key的消息,那么这些消息会被随机分配到某个分区里面,然后被监听该分区的监听方法监听到。
如图可以看到,监听器A 在监听 分区3 的消息,消息i 和 消息k 因为都被分配到 分区3,所以都被 监听器A 监听到。
消息 j 被发送到分区0 ,被 监听器B 监听到,也就是表明 监听器B 被 Kafka 分配去监听分区0的消息
注意点:这里说的同一个组的2个监听器以轮询的方式监听同一个主题的消息,是kafka先给两个监听器分配它们各自负责监听哪个分区的消息,然后当我们把消息发送到某个分区时,那么负责监听该分区消息的监听器就会监听到该消息并做一些业务处理。
如图:如上面所说,这里可以看出,分区2也是监听器A在监听,而且同一个key的消息,都是发送到同一个分区里面的,所以这些消息都是被监听器A监听着。
发送不同的消息,但是key一样,也都是发送到同一个分区,而这个分区被监听器A监听着。所以无论发多少条消息,只要key是ljh,那么都是监听器A在监听。
test2 主题有4个分区,然后我们写了两个监听器,那么kafka 就会为这两个监听器分配它们去监听哪个分区的消息,以为分配是以轮询的方法分配的。所以刚好一个监听器被分配去监听2个分区。
(如果有5个分区,那么一般就是 监听器A被分配监听3个分区,监听器B被分配监听2个分区,这个分配分区的规则前面的文章有提到:分区分配规则)
因为同一个key的消息,无论发送多少条,一般都是发往同一个分区的,和上面说的同一个组的监听器,以轮询的方式监听同一个主题的消息时,两者并不冲突,需要结合具体的消息类型(这里说的是消息的key是否相同)来理解,
注意点:
通过springboot来整合kafka,演示的时候就需要来通过小黑窗输入命令来演示发送消息和消费消息了。
直接在web端发送消息,然后在IDEA控制台看监听到的消息就可以了
server.port=8081
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
spring.kafka.client-id=ljh-boot
spring.kafka.producer.acks=all
spring.kafka.producer.retries=0
spring.kafka.producer.properties[linger.ms]=3
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.properties[auto.commit.interval.ms]=1000
spring.kafka.consumer.properties[session.timeout.ms]=15000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=batch
spring.kafka.listener.type=single
spring.kafka.listener.poll-timeout=5s
该类的作用是用来自动创建主题
package cn.ljh.kafkaboot.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//kafka 配置类
// proxyBeanMethods = false:表示这个配置类不用帮我们做一些其他的代理操作,只用我自己写的bean就可以了
@Configuration(proxyBeanMethods = false)
public class KafkaConfig
{
//用来自动创建主题
@Bean
public NewTopic test3()
{
//参数1:主题名字 参数2:分区数量 参数3:复制因子数量
return new NewTopic(“test3”, 3, (short) 2);
}
}
发送消息
package cn.ljh.kafkaboot.controller;
import cn.ljh.kafkaboot.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* author JH
*/
@RestController
public class MessageController
{
//业务层
public final MessageService messageService;
//构造器依赖注入
public MessageController(MessageService messageService)
{
this.messageService = messageService;
}
//发送带key的消息
@GetMapping(“/send/{key}/{value}”)
public String sendMessage(@PathVariable String key, @PathVariable String value){
//发送带key的消息成功
messageService.sendMessage(key,value);
return “发送带key的消息成功”;
}
//发送不带key的消息
@GetMapping(“/sendNoKey/{value}”)
public String sendNoKey(@PathVariable String value)
{
//发送不带key的消息成功
messageService.sendMessage(null,value);
return “发送不带key的消息成功”;
}
}
发送消息的业务逻辑
package cn.ljh.kafkaboot.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* author JH
*/
@Service
public class MessageService
{
//定义一个主题的常量
public static final String TARGET_TOPIC = “test2”;
//因为没有自己写KafkaTemplate这个类,所以是没有初始化的,需要进行依赖注入才行
private final KafkaTemplate<String, String> kafkaTemplate;
//通过构造函数进行依赖注入
public MessageService(KafkaTemplate<String, String> kafkaTemplate)
{
this.kafkaTemplate = kafkaTemplate;
}
//发送消息
public void sendMessage(String key, String value)
{
//如果这个key不为空,则为true
if(Objects.nonNull(key))
{
//发送带 key、value的消息
kafkaTemplate.send(TARGET_TOPIC,key,value);
}
else
{
//发送不带 key 的消息
kafkaTemplate.send(TARGET_TOPIC,value);
}
}
}
监听和消费消息
package cn.ljh.kafkaboot.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @KafkaListener 注解修饰的方法会被注册为消息监听器方法。该注解支持如下常用的属性:
* topics- 指定要监听的主题
* topicPattern:指定要监听的主题的正则表达式
* topicPartitions:指定要监听的主题的分区
* groupId:指定组id
*/
// @Component是Spring框架中的一个通用注解,用于标记一个类为组件,让Spring能够在应用程序启动时自动扫描并加载这些组件
@Component
public class KafkaMessageListener
{
//因为这两个监听器不属于同一个组,相当于pub/sub模型,都能监听到同一个主题的消息
//监听器方法A:监听 test2 主题,这个监听器是属于groupA这个组
@KafkaListener(topics = “test2”,groupId = “groupA”)
public void processMsg01(ConsumerRecord<String,String> msg)
{
System.err.printf(“监听器 【 A 】 收到消息, offset = %d , partition=%s , key = %s, value = %s%n”,
msg.offset(),msg.partition(), msg.key(), msg.value());
}
//监听器方法B:监听 test2 主题,这个监听器是属于groupB这个组
@KafkaListener(topics = “test2”,groupId = “groupB”)
public void processMsg02(ConsumerRecord<String,String> msg)
{
System.err.printf(“监听器 【 B 】 收到消息, offset = %d , partition=%s , key = %s, value = %s%n”,
msg.offset(),msg.partition(), msg.key(), msg.value());
}
}
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.4.5
cn.ljh
kafkaboot
1.0.0
kafkaboot
1、启动 zookeeper 服务器端
小黑窗输入命令: zkServer
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
!**
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
[外链图片转存中…(img-G9ZSQzIF-1712508836945)]
[外链图片转存中…(img-vKzb6Fbk-1712508836946)]
[外链图片转存中…(img-7bL7Y1nS-1712508836946)]
[外链图片转存中…(img-673Rlcpd-1712508836946)]
[外链图片转存中…(img-cKvrizYA-1712508836947)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-pBLuslel-1712508836947)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。