参考:https://blog.csdn.net/yao8513277/article/details/81116718
1.单机版
1.1 安装jdk
略
1.2 安装zookeeper
https://my.oschina.net/springMVCAndspring/blog/3045210
1.3 安装window上安装kafka
1.3.1 下载
http://kafka.apache.org/downloads
1.3.2 解压
1.3.3 建立一个空文件夹 logs
1.3.4 进入config目录,编辑 server.properties文件
(1)编辑log.dirs
(2)编辑zookeeper.connect
1.4 启动kafka
(1)启动zookeeper
(2)启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
2.kafka整合springboot
效果:
2.1 生产者项目
2.1.1 创建 生产者项目
设置maven
2..1.2 配置pom.xml
<!--1.web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 2.kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.1.RELEASE</version> </dependency> <!--3.lombook--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency> <!-- 4jackson --> <!--4.1 定义了底层的streaming API和实现了Json特性 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.3</version> </dependency> <!-- 4.2 实现了数据绑定和对象序列化,它依赖于streaming和annotations的包 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.3</version> </dependency> <!-- 4.3 包含了标准的Jackson注解。本文暂不介绍 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.7.3</version> </dependency>
2.1.3 配置文件
2.1.4 编写代码
这里只简单的编写controller层代码
2.1.4.1 domain
2.1.4.2 controller
@Resource private KafkaTemplate<String, String> kafkaTemplate; //发送消息方法 @ResponseBody @RequestMapping("/send") public void send(String msg) { Message message = new Message(); message.setId(UUIDUtils.returnUuid()); message.setMsg(msg); message.setSendTime(TimeUtils.getStringDate(new Date())); try { kafkaTemplate.send("kafka0620", JacksonUtils.objectToJson(message)); } catch (JsonProcessingException e) { e.printStackTrace(); } }
2.2 消费者
(1)搭建项目 与生产者一模一样
(2)pom.xml相同
(3)配置文件
#项目端口 server.port=9000 #============== kafka =================== # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=localhost:9092 #=========== consumer ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=test201906191305 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
(4) controller
@KafkaListener(topics = {"kafka0620"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.err.println(record); System.err.println(message); } }
2.3 源码
链接:https://pan.baidu.com/s/1zhbepbSKHuuDtnY9Z5DiKw
提取码:8c59