赞
踩
互联网项目中经常用到MQ,由于本地项目开发连接测试环境kafka很不方便,所有在本机搭建一个kafka,方便开发测试。
提示:Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper
下载Zookeeper地址:https://zookeeper.apache.org/releases.html
下载kafka地址:http://kafka.apache.org/downloads.html
复制zoo_sample.cfg文件,并将新复制的文件命名为zoo.cfg,修改文件zoo.cfg内容如下:
- dataDir=F:\mq\apache-zookeeper-3.6.3\dataDir
- dataLogDir=F:\mq\apache-zookeeper-3.6.3\dataLogDir
进入Zookeeper安装目录,cmd 输入命令zkserver,如图
启动成功!!
进入F:\mq\kafka_2.13-2.8.0\config文件内,修改文件server.properties
log.dirs=F:\mq\kafka_2.13-2.8.0\logs
在安装目录cmd输入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
无报错则正常启动,本地启动窗口不要关闭。
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syn_user
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
启动生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic syn_user
启动消防者监听消息
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic syn_user
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- kafka:
- bootstrap-servers: 127.0.0.1:9092
- producer:
- acks: -1
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- consumer:
- enable-auto-commit: false
- key-serializer: org.apache.kafka.common.serialization.StringDeserializer
- value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
- group-id: test-consumer-group
- listener:
- ack-mode: MANUAL
- @RestController
- @Api(value = "mq消息", tags = "Fh-mq消息")
- @RequestMapping("/wkafka")
- public class ProducerController {
-
- private static final String KAFKA_TOPIC_NAME = "wlhydemo";
- @Autowired
- KafkaTemplate<String, String> kafka;
-
- @PostMapping("/send")
- public String register(@RequestBody User user) {
- try {
- String message = JSONUtil.toJsonStr(user);
- System.out.println("注册用户信息:" + message);
- kafka.send(KAFKA_TOPIC_NAME, message);
- return "OK";
- } catch (Exception e) {
- e.printStackTrace();
- }
- return "消息同步失败";
- }
-
- }
- @Slf4j
- @Component
- public class KaUserConsumer {
- @KafkaListener(topics = "wlhydemo")
- public void listenFlowStart(@Payload String businessStr,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
- @Header(KafkaHeaders.OFFSET) int offset )
- {
- try{
- // 模拟业务处理...
- log.info("当前消费分区:{}", partition);
- log.info("当前消费位置:{}", offset);
- log.info("接收到的消息:{}", businessStr);
- User user= JSONUtil.toBean(businessStr, User.class);
- user.getNickName();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。