赞
踩
官网地址:http://rocketmq.apache.org/dowloading/releases/
- unzip rocketmq-all-4.6.1-bin-release .zip
-
- mv rocketmq-all-4.6.1-bin-release rocketmq
nohup sh mqnamesrv &
nohup sh mqbroker -n 192.168.12.250:9876 autoCreateTopicEnable=true &
【注】autoCreateTopicEnable=true 表示可以动态创建topic
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.2.0</version>
- </dependency>
- #RocketMQ:
- rocketmq:
- producer:
- isOnOff: on
- groupName: group1
- namesrvAddr: 192.168.12.250:9876
- topic: TRAFFIC_EVENT
- #tag: test
- maxMessageSize: 100
- sendMsgTimeout: 1000
- consumer:
- namesrvAddr: 192.168.12.250:9876
- groupName: group1
- topic: TRAFFIC_EVENT
- #tag: test
- consumeThreadMin: 20
- consumerThreadMax: 64
方式一、同步发送
Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
方式二、异步发送
Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
方式三、Oneway 发送
Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。
- // 1、同步发送
- SendResult sendResult = producer.send(msg);
- //2、异步发送
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- }
- @Override
- public void onException(Throwable e) {
- }
- });
- //3、 Oneway发送
- producer.sendOneway(msg);
方式一:PushConsumer
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
缺点:实时性高,但增加服务端负载,消费端能力不同,如果push的速度过快,消费端会出现很多问题
方式二:PullConsumer
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
缺点:消费者从server端拉消息,主动权在消费端,可控性好,但是时间间隔不好设置,间隔太短,则空请求会多,浪费资源,间隔太长,则消息不能及时处理
推荐使用PushConsumer,直接上代码!
- @Component
- @Slf4j
- public class MQPushConsumer implements MessageListenerConcurrently {
- private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
-
- @Value("${rocketmq.consumer.namesrvAddr}")
- private String rocketmqNameServer;
- @Value("${rocketmq.consumer.topic}")
- private String topic;
-
- /**
- * 初始化
- */
- @PostConstruct //java5中引入,指在项目启动的时候执行这个方法
- public void start() {
- try {
- consumer.setNamesrvAddr(rocketmqNameServer);
- consumer.setVipChannelEnabled(false);
- //从消费队列头开始消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- //consumer.setMessageModel(MessageModel.CLUSTERING);
- //订阅主题
- consumer.subscribe("TRAFFIC_EVENT","*");
- consumer.registerMessageListener(this);
- consumer.start();
- log.info("[启动日志]:MQ消费者已启动");
- } catch (MQClientException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- int index=0;
- try {
- for (;index<list.size();index++) {
- MessageExt msg = list.get(index);
- String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
-
- log.info("MQ:消费者接受新消息:{}{}{}{}{}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);
- //拿到json通过websocket发给前端
- if (!StringUtils.isEmpty(messageBody) && MyWebSocket.getOnlineCount() >0) {
- MyWebSocket.sendInfo(messageBody);
- log.info("[webSocket发送]" + messageBody);
- }
- }
- } catch (Exception e) {
- log.error(e.getMessage(),e);
- }finally {
-
- if (index < list.size()) {
- consumeConcurrentlyContext.setAckIndex(index+1);
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- @PreDestroy //在关闭spring容器后释放一些资源*
- public void stop() {
- if (null != consumer) {
- consumer.shutdown();
- log.error("MQ: 关闭消费者");
- }
- }
- }
-
-
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。