当前位置:   article > 正文

Springboot集成RocketMq_springboot3.0整合rocketmq 指定序列化器

springboot3.0整合rocketmq 指定序列化器

生产者和消费者都需要的jar包

项目结构:

生产者是console,消费者是my-msg

 <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

生产者代码

yml—rocketmq配置

rocketmq:
  producer:
    group: producer_group
  name-server: 192.168.216.131:9876;192.168.216.132:9876
  topic-key: wtc_topic
  • 1
  • 2
  • 3
  • 4
  • 5

生产者配置(注意,当你使用@Bean DefaultMQProducer 的时候,不要调用producer.start(),会自动开启)

@Component
public class DefaultProductConfig {
	#生产者组
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    #nameServer地址
    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Bean
    public DefaultMQProducer getProduct() throws MQClientException {
        //示例生产者
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr(nameServer);
        return producer;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

发送消息工具类

@Component
public class SendMsgUtil {

    final static Logger logger = LoggerFactory.getLogger(SendMsgUtil.class);

    @Value("${rocketmq.topic-key}")
    private String topic;


    @Autowired
    DefaultMQProducer defaultMQProducer;

    public SendResult sendMsg(String msg) {
        Message message = new Message(topic, object.getTagName(), msg.getBytes());
        try {
            SendResult sendResult = defaultMQProducer.send(message);
            logger.info("输出生产者信息={}", sendResult);
            return sendResult;
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

OK,接下来直接上消费者代码,yml配置文件和jar都不说了。

消费者工具类

@Configuration
public class MsgConsumerUtil {

    @Value("${rocketmq.name-server}")
    private String nameSer;
    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
    @Value("${rocketmq.topic-key}")
    private String topic;

    @Autowired
    MsgListener msgListener;

    @Bean
    public DefaultMQPushConsumer startConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameSer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(msgListener);
        try {
            consumer.subscribe(topic,"*");
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return consumer;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

监听器:

@Component
public class MsgListener implements MessageListenerConcurrently {

    @Autowired
    private MsgCustomerService msgCustomerService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        msgs.forEach(messageExt -> {
            msgCustomerService.handlerMsg(messageExt);
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

接口:

public interface MsgCustomerService {

    Boolean handlerMsg(MessageExt messageExt);
}
  • 1
  • 2
  • 3
  • 4

实现:

@Service
public class MsgCustomerServiceImpl implements MsgCustomerService {

    final static Logger logger = LoggerFactory.getLogger(MsgCustomerServiceImpl.class);

    @Override
    public Boolean handlerMsg(MessageExt messageExt) {

        logger.info(MsgProductBean.ACCOUNT_INFO.getTagName() + "--> 消息:" + new String(messageExt.getBody()));
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

测试:

    @RequestMapping("/test/sendRocketMq")
    @ResponseBody
    public String sendRocketMq() {
        String accountNo = UUID.randomUUID().toString();
        SendResult sendResult = sendMsgUtil.sendMsg(accountNo, MsgProductBean.ACCOUNT_INFO);
        return "success";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

枚举类,我是用来区分一个topic,不同tag来分别获取消息。

public enum MsgProductBean {

    ACCOUNT_INFO("accountInfo", "tag_account"),
    ACCOUNT_INFO_OTHER("accountInfo", "tag_account_other"),
    ;

    private String beanName;
    private String tagName;

    MsgProductBean(String beanName, String tagName) {
        this.tagName = tagName;
        this.beanName = beanName;
    }

    public String getTagName() {
        return tagName;
    }

    public String getBeanName() {
        return beanName;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  1. RocketMq开启的时候,不建议开启自动创建主题,会导致负载不均衡
  2. 如果项目采用rocketmq集群,建议线上采用 同步复制 + 异步刷盘;
  3. 手动添加主题的命令:sh mqadmin updatetopic -n 192.168.3.118:9876 -c AdpMqCluster -t topic_topic (-c 后面是集群名,-t后面是主题)
  4. 这是简单的生产,消费。后面我们继续说,MQ的重置,以及分布式,顺序消费等
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/434648
推荐阅读
相关标签
  

闽ICP备14008679号