赞
踩
项目结构:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
生产者代码
yml—rocketmq配置
rocketmq:
producer:
group: producer_group
name-server: 192.168.216.131:9876;192.168.216.132:9876
topic-key: wtc_topic
生产者配置(注意,当你使用@Bean DefaultMQProducer 的时候,不要调用producer.start(),会自动开启)
@Component public class DefaultProductConfig { #生产者组 @Value("${rocketmq.producer.group}") private String producerGroup; #nameServer地址 @Value("${rocketmq.name-server}") private String nameServer; @Bean public DefaultMQProducer getProduct() throws MQClientException { //示例生产者 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //不开启vip通道 开通口端口会减2 producer.setVipChannelEnabled(false); //绑定name server producer.setNamesrvAddr(nameServer); return producer; } }
发送消息工具类
@Component public class SendMsgUtil { final static Logger logger = LoggerFactory.getLogger(SendMsgUtil.class); @Value("${rocketmq.topic-key}") private String topic; @Autowired DefaultMQProducer defaultMQProducer; public SendResult sendMsg(String msg) { Message message = new Message(topic, object.getTagName(), msg.getBytes()); try { SendResult sendResult = defaultMQProducer.send(message); logger.info("输出生产者信息={}", sendResult); return sendResult; } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; }
OK,接下来直接上消费者代码,yml配置文件和jar都不说了。
消费者工具类
@Configuration public class MsgConsumerUtil { @Value("${rocketmq.name-server}") private String nameSer; @Value("${rocketmq.consumer.group}") private String consumerGroup; @Value("${rocketmq.topic-key}") private String topic; @Autowired MsgListener msgListener; @Bean public DefaultMQPushConsumer startConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(nameSer); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(msgListener); try { consumer.subscribe(topic,"*"); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } return consumer; } }
监听器:
@Component
public class MsgListener implements MessageListenerConcurrently {
@Autowired
private MsgCustomerService msgCustomerService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(messageExt -> {
msgCustomerService.handlerMsg(messageExt);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
接口:
public interface MsgCustomerService {
Boolean handlerMsg(MessageExt messageExt);
}
实现:
@Service
public class MsgCustomerServiceImpl implements MsgCustomerService {
final static Logger logger = LoggerFactory.getLogger(MsgCustomerServiceImpl.class);
@Override
public Boolean handlerMsg(MessageExt messageExt) {
logger.info(MsgProductBean.ACCOUNT_INFO.getTagName() + "--> 消息:" + new String(messageExt.getBody()));
return null;
}
}
测试:
@RequestMapping("/test/sendRocketMq")
@ResponseBody
public String sendRocketMq() {
String accountNo = UUID.randomUUID().toString();
SendResult sendResult = sendMsgUtil.sendMsg(accountNo, MsgProductBean.ACCOUNT_INFO);
return "success";
}
枚举类,我是用来区分一个topic,不同tag来分别获取消息。
public enum MsgProductBean { ACCOUNT_INFO("accountInfo", "tag_account"), ACCOUNT_INFO_OTHER("accountInfo", "tag_account_other"), ; private String beanName; private String tagName; MsgProductBean(String beanName, String tagName) { this.tagName = tagName; this.beanName = beanName; } public String getTagName() { return tagName; } public String getBeanName() { return beanName; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。