赞
踩
虚拟机安装docker环境可参考:centos7.4安装docker
#创建RocketMQ使用的共有网络,便于相互访问
docker network create rocketmq_network
#foxiswho/rocketmq 4.7.0以后不再分别创建broker及nameserver的镜像,统一使用rocketmq镜像,只是在启动命令上区分
docker pull foxiswho/rocketmq:4.8.0
#rocketmq控制台2.0.0版本,源码来自于官方仓库https://github.com/apache/rocketmq-externals#rocketmq-console
docker pull 56553655/rocketmq-console-ng:2.0.0
#启动rocketmq nameserver
docker run -d --network rocketmq_network --network-alias rmqnamesrv --name rmqnamesrv -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 9876:9876 foxiswho/rocketmq:4.8.0 sh mqnamesrv
#启动rocketmq broker
docker run -d --network rocketmq_network --network-alias rmqbroker --name rmqbroker -e "NAMESRV_ADDR=rmqnamesrv:9876" -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 10911:10911 -p 10912:10912 -p 10909:10909 foxiswho/rocketmq:4.8.0 sh mqbroker -c ../conf/broker.conf
#启动rocketmq-console-ng
docker run -d --network rocketmq_network --network-alias rocketmq-console-ng -p 8080:8080 --name rocketmq-console-ng 56553655/rocketmq-console-ng:2.0.0
注意以下三个端口都要开放(简单起见关闭防火墙)
broker启动指定了配置文件: …/conf/broker.conf
http://192.168.100.70:8080
成功效果
broker的默认ip地址,使用docker内部网络地址,需要修改成docker宿主机的IP地址(192.168.100.70),否则后面springboot集成对接时候,会无法连接上.
broker.conf 增加配置brokerIP1 = 192.168.100.70
[root@localhost ~]# docker exec -it rmqbroker bash
[rocketmq@ad5cc337a328 bin]$ cat ../conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 192.168.100.70
docker restart rmqbroker
由于测试需要,docker命令安装还是太繁琐了。不方便在任意docker环境快速启动mq的测试环境,因此补充了docker-compose一键安装版本。
cd ~ cat > rocketmq_docker.sh <<EOF #创建rocketmq compose目录 mkdir -p /root/test/docker-compose-rocketmq cd /root/test/docker-compose-rocketmq #生成rocketmq docker compose配置文件 cat > docker-compose.yaml << !EOF version: '3' services: rmqnamesrv: image: foxiswho/rocketmq:4.8.0 container_name: rmqnamesrv networks: - rocketmq_network environment: JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m" ports: - "9876:9876" command: sh mqnamesrv rmqbroker: image: foxiswho/rocketmq:4.8.0 container_name: rmqbroker networks: - rocketmq_network environment: NAMESRV_ADDR: rmqnamesrv:9876 JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m" ports: - "10911:10911" - "10912:10912" - "10909:10909" # command: sh mqbroker -c ../conf/broker.conf command: sh -c "echo 'brokerIP1 = 192.168.4.100' >> /home/rocketmq/rocketmq-4.8.0/conf/broker.conf && sh mqbroker -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf" rocketmq-console-ng: image: 56553655/rocketmq-console-ng:2.0.0 container_name: rocketmq-console-ng networks: - rocketmq_network ports: - "8080:8080" networks: rocketmq_network: driver: bridge !EOF EOF chmod +x rocketmq_docker.sh ./rocketmq_docker.sh
# 进入docker-compose目录
cd /root/test/docker-compose-rocketmq
# 关闭mq
docker-compose down
# 启动mq
docker-compose up -d
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.18.RELEASE</version>
<relativePath/>
</parent>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
rocketmq.name-server=192.168.100.70:9876
rocketmq.producer.group=my-group
# 默认3000 太小会报RemotingTooMuchRequestException: sendDefaultImpl call timeout错误
rocketmq.producer.send-message-timeout=10000
基本springboot框架搭建后增加上述,rocketmq配置。测试时候,生产者项目和消费者项目application.properties的rockermq都配都一样, 不同项目端口设置不同即可。
完成springboot + rocketmq集成后。进行基本消息测试。
普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。
@Api(tags = "mq接口测试") @RestController @RequestMapping("/rocketMq") public class RocketTestController { private static final Logger LOGGER = LoggerFactory.getLogger(RocketTestController.class); @Resource private RocketMQTemplate rocketMQTemplate; @PostMapping("/send") public ApiResult<String> sendMessage(@RequestParam(defaultValue = "TOPIC_TEST") String topic, String message) { rocketMQTemplate.convertAndSend(topic, message); return new ApiResult<>(); } }
@Component
@RocketMQMessageListener(topic = "TOPIC_TEST",consumerGroup = "group_consumer")
public class Consumer implements RocketMQListener<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
@Override
public void onMessage(String message) {
LOGGER.info("Receive message:"+message);
}
}
通过在线接口工具,连续往“TOPIC_TEST”topic发送多条消息,发现消息分别被两个消费者随机消费
结果
发送普通消息后,此时mq服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。
没有收到消费者响应分两种情况
@Component @RocketMQMessageListener(topic = "TOPIC_TEST",consumerGroup = "C_GROUP_TEST") public class ConsumerListener implements RocketMQListener<String> { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class); @Override public void onMessage(String message) { LOGGER.debug("接收到MQ消息--处理开始"); if ("5".equals(message)) { throw new IllegalStateException("模拟业务出现异常"); } LOGGER.info("Receive message:"+message); LOGGER.debug("接收到MQ消息--处理结束"); } }
TODO
Docker下安装RocketMQ服务及控制台的Shell脚本
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。