赞
踩
1、安装 zookkeeper
docker pull wurstmeister/zookeeper
2、运行zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
3、拉去kafka
docker pull wurstmeister/kafka
4、启动kafka
-
- # 192.168.1.117 => {你的宿主机ip}
-
- #启动kafka
- docker run -d --name kafka \
- -p 9092:9092 \
- -e KAFKA_BROKER_ID=0 \
- -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.117:2181 \
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.117:9092 \
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
5、简单命令
- #创建主题
- ./kafka-topics.sh --create --zookeeper 192.168.1.117:2181 --replication-factor 1 --partitions 1 --topic test1
-
- #查看主题
- ./kafka-topics.sh --zookeeper 192.168.1.117:2181 --list
-
- #添加消息
- ./kafka-console-producer.sh --bootstrap-server 192.168.1.117:9092 --topic test1
-
- #消费消息
- ./kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092 --topic test1 --from-beginning
1、添加依赖
- <!--kafka-->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
2、编写controller层
- @RequestMapping("/KafkaControllerWeb")
- public interface KafkaControllerWeb {
- @PostMapping("/sendKafkaMessage")
- R sendKafkaMessage(@RequestBody OrderOutsideDto orderDto);
- }
- //生产者
- @RestController
- public class KafkaController implements KafkaControllerWeb {
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
- @Override
- public R sendKafkaMessage(OrderOutsideDto orderDto) {
- try {
- ListenableFuture<SendResult<String, String>> test2 = kafkaTemplate.send("test1", orderDto.getOrderStatus());
- return R.ok("发送成功");
- }catch (Exception e){
- return R.error("kafka发送失败"+e);
- }
- }
-
- }
3、编写 消费者
- /**
- * kafka 消费者
- *
- * */
- @Configuration
- public class KafkaConsumer {
- /**
- * 消费监听
- */
- @KafkaListener(topics = "test1")
- public void onMessage1(String body){
- System.out.println("从kafka收到的消息"+body);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。