当前位置:   article > 正文

Springboot整合Kafka

springboot整合kafka

Springboot整合Kafka


需求:传输订单ID,并将ID放入消息队列(生产者),最后取出消息,完成发送短信业务(消费者)。

使用spring-boot-web-starter,添加Web模块。

首先,创造一个topic:order:

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 2 --topic first
  • 1

结构

图片名称

业务层

MessageService.java

package com.jd.springboot_mq.service;

public interface MessageService {

    // 发送消息
    void sendMessage(String id);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

OrderService.java

package com.jd.springboot_mq.service;

public interface OrderService {

    // 生成订单
    void order(String id);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

生产消息

MessageServiceKafkaImpl.java

package com.jd.springboot_mq.service.impl.kafka;

import com.jd.springboot_mq.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;


@Service
public class MessageServiceKafkaImpl implements MessageService {

    //生产消息
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        kafkaTemplate.send("order",id);
        System.out.println("待发送短信的订单已纳入处理队列(Kafka),id:"+id);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费消息

MessageListener.java

package com.jd.springboot_mq.service.impl.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    // 消费消息
    @KafkaListener(topics = "order")
    public void onMessage(ConsumerRecord<String,String> record) {
        System.out.println("已完成短信发送业务(Kafka),id:"+record.value());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

订单业务

OrderServiceImpl.java

package com.jd.springboot_mq.service.impl;

import com.jd.springboot_mq.service.MessageService;
import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements OrderService {

    @Qualifier("messageServiceKafkaImpl")
    @Autowired
    private MessageService messageService;

    @Override
    public void order(String id) {
        //订单处理之前
        System.out.println("订单处理开始......");
        //短信的处理
        messageService.sendMessage(id);
        //后续订单处理
        System.out.println("订单处理结束......");
        System.out.println();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

控制层

OrderContoller.java

package com.jd.springboot_mq.contoller;

import com.jd.springboot_mq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id) {
        orderService.order(id);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

测试

使用Postman发送请求:

http://localhost/orders/id3
  • 1

结果:

生产消费订单id.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/726169
推荐阅读
相关标签
  

闽ICP备14008679号