当前位置:   article > 正文

SpringBoot_rocketmq使用实战_docker环境_rockermqspringboot使用

rockermqspringboot使用

前言

  1. 记录rocketmq 4.8.0 docker环境安装及springboot集成, 消息功能测试持续集成,
  2. 验证MQ在不同场景的使用,以及由于引入mq带来的问题,rockermq都是怎么解决的

mq介绍

  • 本质上是先进先出的队列
  • MQ作用及优点
    • 系统解耦: 同步调用改成异步调用,下游服务故障不影响上游服务
    • 流量削峰: 解决请求激增超过应用最大的qps,导致服务故障
    • 数据分发: 不用关心谁使用数据,关注自己服务,谁要使用谁订阅
  • MQ缺点
    • 系统可用性降低(mq服务挂了怎么办–>MQ的高可用怎么保证)
    • 系统复杂性提高(重复消费,消息丢失,消息顺序性怎么解决)
    • 一致性问题(A发送消息, BC成功,D失败了,怎么保证消息处理一致性–>怎么处理事务呢)

一 rocketmq安装

1.docker方式

虚拟机安装docker环境可参考:centos7.4安装docker

1. 1获取镜像

#创建RocketMQ使用的共有网络,便于相互访问
docker network create rocketmq_network

#foxiswho/rocketmq 4.7.0以后不再分别创建broker及nameserver的镜像,统一使用rocketmq镜像,只是在启动命令上区分
docker pull foxiswho/rocketmq:4.8.0

#rocketmq控制台2.0.0版本,源码来自于官方仓库https://github.com/apache/rocketmq-externals#rocketmq-console
docker pull 56553655/rocketmq-console-ng:2.0.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.2. 启动容器

#启动rocketmq nameserver
docker run -d --network rocketmq_network --network-alias rmqnamesrv --name rmqnamesrv -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 9876:9876 foxiswho/rocketmq:4.8.0 sh mqnamesrv

#启动rocketmq broker
docker run -d --network rocketmq_network --network-alias rmqbroker --name rmqbroker -e "NAMESRV_ADDR=rmqnamesrv:9876" -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 10911:10911 -p 10912:10912 -p 10909:10909 foxiswho/rocketmq:4.8.0 sh mqbroker -c ../conf/broker.conf

#启动rocketmq-console-ng
docker run -d --network rocketmq_network --network-alias rocketmq-console-ng -p 8080:8080 --name rocketmq-console-ng 56553655/rocketmq-console-ng:2.0.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 注意以下三个端口都要开放(简单起见关闭防火墙)

    • nameServer对应的9876端口
    • broker对应的10911端口
    • rocketmq-console-ng对应的8080端口
  • broker启动指定了配置文件: …/conf/broker.conf

1.3. 访问控制台

http://192.168.100.70:8080
成功效果
在这里插入图片描述

1.4. 踩坑处理

broker的默认ip地址,使用docker内部网络地址,需要修改成docker宿主机的IP地址(192.168.100.70),否则后面springboot集成对接时候,会无法连接上.

  1. 进入rmqbroker 容器内部修改配置

broker.conf 增加配置brokerIP1 = 192.168.100.70

[root@localhost ~]# docker exec -it rmqbroker bash
[rocketmq@ad5cc337a328 bin]$  cat ../conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 192.168.100.70
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 退出容器,重启rockermq的broker
docker restart rmqbroker
  • 1

2.docker-compose方式

由于测试需要,docker命令安装还是太繁琐了。不方便在任意docker环境快速启动mq的测试环境,因此补充了docker-compose一键安装版本。

  1. 完整复制代码块,生成sh脚本,输出docker-compose.yaml文件
  2. docker-compose.yaml注意brokerIP1 = 192.168.100.70 要修改成你的docker宿主机的IP地址(服务器IP地址)

2.1.生成docker-compose文件

cd ~
cat > rocketmq_docker.sh <<EOF
#创建rocketmq compose目录
mkdir -p /root/test/docker-compose-rocketmq
cd /root/test/docker-compose-rocketmq

#生成rocketmq docker compose配置文件
cat > docker-compose.yaml << !EOF
version: '3'

services:
  rmqnamesrv:
    image: foxiswho/rocketmq:4.8.0
    container_name: rmqnamesrv
    networks:
      - rocketmq_network
    environment:
      JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
    ports:
      - "9876:9876"
    command: sh mqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:4.8.0
    container_name: rmqbroker
    networks:
      - rocketmq_network
    environment:
      NAMESRV_ADDR: rmqnamesrv:9876
      JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m"
    ports:
      - "10911:10911"
      - "10912:10912"
      - "10909:10909"
    # command: sh mqbroker -c ../conf/broker.conf
    command: sh -c "echo 'brokerIP1 = 192.168.4.100' >> /home/rocketmq/rocketmq-4.8.0/conf/broker.conf && sh mqbroker -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf"

  rocketmq-console-ng:
    image: 56553655/rocketmq-console-ng:2.0.0
    container_name: rocketmq-console-ng
    networks:
      - rocketmq_network
    ports:
      - "8080:8080"

networks:
  rocketmq_network:
    driver: bridge

!EOF

EOF

chmod +x rocketmq_docker.sh
./rocketmq_docker.sh


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

2.2.docker-compose启停rocketmq

# 进入docker-compose目录
cd /root/test/docker-compose-rocketmq

# 关闭mq
docker-compose down

# 启动mq
docker-compose up -d
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

三 springboot集成rocketmq

1. maven依赖


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.18.RELEASE</version>
        <relativePath/>
    </parent>
    
    <!--rocketmq-->
     <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-boot-starter</artifactId>
         <version>2.2.0</version>
     </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2. application.properties配置

rocketmq.name-server=192.168.100.70:9876
rocketmq.producer.group=my-group
# 默认3000 太小会报RemotingTooMuchRequestException: sendDefaultImpl call timeout错误
rocketmq.producer.send-message-timeout=10000
  • 1
  • 2
  • 3
  • 4

基本springboot框架搭建后增加上述,rocketmq配置。测试时候,生产者项目和消费者项目application.properties的rockermq都配都一样, 不同项目端口设置不同即可。

3. 普通消息测试

完成springboot + rocketmq集成后。进行基本消息测试。

普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。

3.1 消息生产者

@Api(tags = "mq接口测试")
@RestController
@RequestMapping("/rocketMq")
public class RocketTestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketTestController.class);

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/send")
    public ApiResult<String> sendMessage(@RequestParam(defaultValue = "TOPIC_TEST") String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
        return new ApiResult<>();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3.2 消息消费者

@Component
@RocketMQMessageListener(topic = "TOPIC_TEST",consumerGroup = "group_consumer")
public class Consumer implements RocketMQListener<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    @Override 
    public void onMessage(String message) {
        LOGGER.info("Receive message:"+message);
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.3 测试

通过在线接口工具,连续往“TOPIC_TEST”topic发送多条消息,发现消息分别被两个消费者随机消费
在这里插入图片描述
结果
在这里插入图片描述
在这里插入图片描述

三 进阶–功能特性

1. 消费重试

发送普通消息后,此时mq服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

没有收到消费者响应分两种情况

  • 执行报错了
  • 业务执行太长

1.1 业务报错

@Component
@RocketMQMessageListener(topic = "TOPIC_TEST",consumerGroup = "C_GROUP_TEST")
public class ConsumerListener implements RocketMQListener<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);
    @Override 
    public void onMessage(String message) {
        LOGGER.debug("接收到MQ消息--处理开始");
        if ("5".equals(message)) {
            throw new IllegalStateException("模拟业务出现异常");
        }
        LOGGER.info("Receive message:"+message);
        LOGGER.debug("接收到MQ消息--处理结束");


    } 
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述

四 进阶–rocketmq高可用集群搭建

TODO

参考文档

  1. Docker下安装RocketMQ服务及控制台的Shell脚本

    • docker安装主要参考该文档(目前看到最快安装,作者很赞),但是安装成功和springboot集成会有下面的问题
  2. DOCKER安装ROCKETMQ遇到的坑(外部无法连接)

    • 参考docker rocketmq closeChannel: close the connection to remote address[] result: true 问题处理
  3. foxiswho/rocketmq(镜像地址及安装注意事项)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/434687
推荐阅读
相关标签
  

闽ICP备14008679号