当前位置:   article > 正文

Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)_springboot kafka 消息发送与订阅

springboot kafka 消息发送与订阅

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

好了,配置工作准备完毕。

我们先来搞Kafka的生产者,也就是负责推送消息的模块:

创建一个类, 叫KafkaSender(注解不能少,留意代码),

package com.kafkademo.producer;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Component;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

/**

  • Hello!

  • Created By JCccc on 2018/11/24

  • 11:25

*/

@Component

public class KafkaSender {

@Autowired

private KafkaTemplate<String, Object> kafkaTemplate;

private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);

public void send(String topic, String taskid, String jsonStr) {

//发送消息

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);

future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

@Override

//推送成功

public void onSuccess(SendResult<String, Object> result) {

logger.info(topic + " 生产者 发送消息成功:" + result.toString());

}

@Override

//推送失败

public void onFailure(Throwable ex) {

logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());

}

});

}

}

以上就是kafka生产者了,到此刻,你已经可以开始往kafka服务器推送消息了

事不宜迟,我们立马试试:

创建个controller,搞个接口试试推送下消息,

@GetMapping(“/sendMessageToKafka”)

public String sendMessageToKafka() {

Map<String,String> messageMap=new HashMap();

messageMap.put(“message”,“我是一条消息”);

String taskid=“123456”;

String jsonStr=JSONObject.toJSONString(messageMap);

//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)

kafkaSender.send(“testTopic”,taskid,jsonStr);

return “hi guy!”;

}

用postman测一下(对了,这些推送的前提是你的kafka服务器是没问题的,能正常连接)

看看控制台反应:

可以看到,我们的kafka生产者再推送消息成功后,成功进入了我们的回调函数onSuccess,也打印了日志。

没错,你已经掌握kafak生产者了,你已经掌握推送消息了。 那么接下来,我们继续搞下kafka的消费者。

我们创一个类,叫KafkaConsumer (同样,注意看代码,注解不能少) :

package com.kafkademo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import org.springframework.stereotype.Controller;

import javax.servlet.http.HttpSession;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

/**

  • Hello!

  • Created By JCccc on 2018/11/24

  • 13:13

小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
img
img
img

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频

如果你觉得这些内容对你有帮助,可以添加下面V无偿领取!(备注Java)
img

[外链图片转存中…(img-xc3wtuFV-1710892189855)]
[外链图片转存中…(img-DR9VOLHB-1710892189855)]
[外链图片转存中…(img-2xhHefiY-1710892189856)]

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频

如果你觉得这些内容对你有帮助,可以添加下面V无偿领取!(备注Java)
[外链图片转存中…(img-NSYdJKhq-1710892189856)]

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/502227
推荐阅读
相关标签
  

闽ICP备14008679号