赞
踩
- <!-- kafka依赖 -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- kafka:
- bootstrap-servers: 自己的ip:端口号
- producer: #生产者序列化器
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- ack-mode: manual #设置生产者的消息确认模式。manual表示手动确认
- consumer: #消费者序列化器
- group-id: ${spring.application.name}-test
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- listener: #监听器相关配置
- ack-mode: manual #消息确认模式。manual表示手动确认
创建消息队列
- @Bean
- public NewTopic logTopic() {
- // 1表示分区数为1,(short)1 表示副本数为1
- return new NewTopic("消息队列", 1, (short) 1);
- }
在业务层发送消息先注入KafkaTemplate
- @Autowried
- KafkaTemplate kafkaTemplate;
消息发送(字符串消息直接发送,对象消息转化成json字符串后在发送)
发送失败后重新发送消息,保证消息不丢失
- String s = JSON.toJSONString(messageVo);
- //s就是要发送的消息
- kafkaTemplate.send("logTopic",s) //发型消息
- .addCallback(new SuccessCallback() { //.addcallback(),添加一个回调函数,用来处理发送消息的结果
- @Override
- public void onSuccess(Object o) {
- //发送成功
- // 把消息的唯一ID设置到Redis中,为了解决重复消费的问题
- stringRedisTemplate.opsForValue().set("key","消息已经发送");
- }
- },
- new FailureCallback() {
- @Override
- public void onFailure(Throwable throwable) {
- //发送失败,重新发送消息
- kafkaTemplate.send("logTopic",s)
- }
- }
- );
- @KafkaListener(topics = {"logTopic"})
- public void recvLogMessage(String message,Acknowledgment acknowledgment){
- try {
- //将消息转换成对象
- JSON.parseObject(message, “对象”.class);
- //获取唯一标识,查看消息,是否已经被接收
- Boolean flag = stringRedisTemplate.hasKey(key);
- if(flag){
- //消息还没有接收,
- 业务处理,进行CRUD操作
- //处理完成之后从列表中将消息删除
- acknowledgment.acknowledge();
- //删除redis记录,解决重复消费问题
- stringRedisTemplate.delete(key);
- return;
- }
- //redis中没有记录,消息已经消费完成,将消息从队列中删除
- acknowledgment.acknowledge();
- }catch(Exception e){
- e.printStackTrace();
- }
-
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。