赞
踩
参考文档:
- github文档:https://github.com/apache/rocketmq-docker
- https://rocketmq.apache.org/zh/docs/
git clone https://github.com/apache/rocketmq-docker.git
拉去git仓库后,可通过以下链接查看最新版本号:
https://archive.apache.org/dist/rocketmq/
我这里版本是5.1.0,按照如下命令执行脚本
cd image-build
sh build-image.sh 5.1.0 alpine
检测一下是否创建成功:
docker images
docker pull apacherocketmq/rocketmq-dashboard:latest
在配置之前确定服务器的IP,使用ifconfig命令查看ip,我的如下:
选择一个路径,创建broker.conf
/Users/yangmiao/config/rocketmq
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功 状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址、物理ip,不能用127.0.0.1、localhost、docker内网ip brokerIP1 = 10.220.181.76
创建自定义的rocketmq.yml
注意:
我打开docker客户端,查看镜像这一栏,镜像名称是apacherocketmq/rocketmq:latest,于是修改镜像名称
最终配置如下:
version: '3' services: namesrv: image: apacherocketmq/rocketmq:latest container_name: rmqnamesrv ports: - 9876:9876 command: sh mqnamesrv broker: image: apacherocketmq/rocketmq:latest container_name: rmqbroker ports: - 10909:10909 - 10911:10911 - 10912:10912 volumes: - /Users/yangmiao/config/rocketmq/broker.conf:/home/rocketmq/rocketmq-5.1.0/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.1.0/conf/broker.conf depends_on: - namesrv mqconsole: image: apacherocketmq/rocketmq-dashboard container_name: rmqdashboard ports: - 8181:8080 environment: JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false depends_on: - namesrv
执行docker命令,启动容器:
docker-compose -f rocketmq.yml up -d
查看docker客户端Containers
在浏览器上访问http://localhost:8181/#/
在pom.xml中引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
application.yml
中配置
定义producer的group
rocketmq:
name-server: 10.220.181.76:9876
consumer:
group: pushConsumerGroup
producer:
group: pushProducerGroup
send-message-timeout: 3000
retry-times-when-send-async-failed: 2
retry-times-when-send-failed: 2
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
package com.ym.pushserver.rocketmq; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; /** * @Author: Yangmiao * @Date: 2023/3/16 10:07 * @Desc: 生产者发送 * copy: https://www.jianshu.com/p/f7b59073ea01 */ @Component @Slf4j public class RocketMQProducer { /** * rocketmq模板注入 */ @Autowired private RocketMQTemplate rocketMQTemplate; /** * 普通发送 * @param topic 消息主题 * @param msg 消息体 * @param <T> 消息泛型 */ public <T> void send(String topic, T msg) { rocketMQTemplate.convertAndSend(topic + ":tag1", msg); //rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等价于上面一行 } /** * 发送带tag的消息,直接在topic后面加上":tag" * * @param topic 消息主题 * @param tag 消息tag * @param msg 消息体 * @param <T> 消息泛型 * @return */ public <T> SendResult sendTagMsg(String topic, String tag, T msg) { topic = topic + ":" + tag; return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build()); } /** * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息) * sendResult为返回的发送结果 */ public <T> SendResult sendMsg(String topic, T msg) { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, message); log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; } /** * 发送异步消息 * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) * @param topic 消息Topic * @param msg 消息实体 * */ public <T> void asyncSend(String topic, T msg) { Message<T> message = MessageBuilder.withPayload(msg).build(); asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("topic:{}消息---发送MQ成功---", topic); } @Override public void onException(Throwable throwable) { log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage()); } }); } /** * 发送异步消息 * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) * @param topic 消息Topic * @param message 消息实体 * @param sendCallback 回调函数 */ public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) { rocketMQTemplate.asyncSend(topic, message, sendCallback); } /** * 发送异步消息 * * @param topic 消息Topic * @param message 消息实体 * @param sendCallback 回调函数 * @param timeout 超时时间 */ public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) { rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout); } /** * 同步延迟消息 * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。 * RocketMQ 目前只支持固定精度的定时消息。 * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * 延迟的底层方法是用定时任务实现的。 * 发送延时消息(delayLevel的值就为0,因为不延时) * * @param topic 消息主题 * @param msg 消息体 * @param timeout 发送超时时间 * @param delayLevel 延迟级别 1到18 * @param <T> 消息泛型 */ public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) { Message<T> message = MessageBuilder.withPayload(msg).build(); rocketMQTemplate.syncSend(topic, message, timeout, delayLevel); } /** * 发送异步延迟消息 * * @param topic 消息Topic * @param message 消息实体 * @param sendCallback 回调函数 * @param timeout 超时时间 * @param delayLevel 延迟消息的级别 */ public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) { rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel); } /** * 发送异步延迟消息 * * @param topic 消息Topic * @param message 消息实体 * @param timeout 超时时间 * @param delayLevel 延迟消息的级别 */ public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) { rocketMQTemplate.asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("topic:{}消息---发送MQ成功---", topic); } @Override public void onException(Throwable throwable) { log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage()); } }, timeout, delayLevel); } /** * 单向消息 * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答 * 此方式发送消息的过程耗时非常短,一般在微秒级别 * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集 * @param topic 消息主题 * @param msg 消息体 * @param <T> 消息泛型 */ public <T> void sendOneWayMsg(String topic, T msg) { Message<T> message = MessageBuilder.withPayload(msg).build(); rocketMQTemplate.sendOneWay(topic, message); } /** * 发送顺序消息 * * @param topic 消息主题 * @param msg 消息体 * @param hashKey 确定消息发送到哪个队列中 * @param <T> 消息泛型 */ public <T> void syncSendOrderly(String topic, T msg, String hashKey) { Message<T> message = MessageBuilder.withPayload(msg).build(); log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey); rocketMQTemplate.syncSendOrderly(topic, message, hashKey); } /** * 发送顺序消息 * * @param topic 消息主题 * @param msg 消息体 * @param hashKey 确定消息发送到哪个队列中 * @param timeout 超时时间 */ public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) { Message<T> message = MessageBuilder.withPayload(msg).build(); log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout); rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout); } /** * 发送批量消息 * * @param topic 消息主题 * @param msgList 消息体集合 * @param <T> 消息泛型 * @return */ public <T> SendResult asyncSendBatch(String topic, List<T> msgList) { List<Message<T>> messageList = msgList.stream() .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList()); return rocketMQTemplate.syncSend(topic, messageList); } /** * 发送事务消息 * * @param txProducerGroup 事务消息的生产者组名称 * @param topic 事务消息主题 * @param tag 事务消息tag * @param msg 事务消息体 * @param arg 事务消息监听器回查参数 * @param <T> 事务消息泛型 */ public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){ if(!StringUtils.isEmpty(tag)){ topic = topic + ":" + tag; } String transactionId = UUID.randomUUID().toString(); Message<T> message = MessageBuilder.withPayload(msg) //header也有大用处 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) // .setHeader("share_id", msg.getId()) .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, message, arg); if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && result.getSendStatus().equals(SendStatus.SEND_OK)){ log.info("事物消息发送成功"); } log.info("事物消息发送结果:{}", result); } }
package com.ym.pushserver.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; /** * @Author: Yangmiao * @Date: 2023/3/16 10:33 * @Desc: 消费端 */ @Component @RocketMQMessageListener(consumerGroup = "test_group",topic = RocketMQConstant.DEFAULT_TOPIC) @Slf4j public class RocketMQConsmer implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("Push msg:"+s); } }
在网址中访问localhost:8081/push/mq
package com.ym.pushserver.controller; import cn.hutool.core.util.StrUtil; import com.ym.pushserver.entity.R; import com.ym.pushserver.redis.RedisUtil; import com.ym.pushserver.rocketmq.RocketMQConstant; import com.ym.pushserver.rocketmq.RocketMQProducer; import com.ym.pushserver.service.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Author: Yangmiao * @Date: 2023/3/9 20:00 * @Desc: */ @RestController @RequestMapping("/push") @Slf4j public class PushController { @Autowired private WebSocketServer webSocketServer; @Autowired private RedisUtil redisUtil; @Autowired private RocketMQProducer rocketMQProducer; @GetMapping("/mq") public R testMQ(){ Message<String> msg = MessageBuilder.withPayload("测试MQ").build(); rocketMQProducer.asyncSend(RocketMQConstant.DEFAULT_TOPIC, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("发送成功,"+sendResult.toString()); } @Override public void onException(Throwable throwable) { log.error("发送失败,"+throwable.getMessage()); } }); return R.ok(); } }
查看Dashboard,统计中有一条信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。