当前位置:   article > 正文

Mac下Docker安装RocketMQ,RocketMQ与Springboot整合教程_macos整合rocketmq

macos整合rocketmq

参考文档:

  • github文档:https://github.com/apache/rocketmq-docker
  • https://rocketmq.apache.org/zh/docs/

一、Docker安装RocketMQ

1.1 获取RocketMQ镜像

git clone https://github.com/apache/rocketmq-docker.git
  • 1

拉去git仓库后,可通过以下链接查看最新版本号:

https://archive.apache.org/dist/rocketmq/
  • 1

在这里插入图片描述
我这里版本是5.1.0,按照如下命令执行脚本

cd image-build
sh build-image.sh 5.1.0 alpine
  • 1
  • 2

检测一下是否创建成功:

docker images
  • 1

在这里插入图片描述

1.2 安装可视化软件

docker pull apacherocketmq/rocketmq-dashboard:latest
  • 1

1.3 配置

1.3.1 查看服务器IP

在配置之前确定服务器的IP,使用ifconfig命令查看ip,我的如下:
在这里插入图片描述

1.3.2 配置broker.conf和rocketmq.yml

选择一个路径,创建broker.conf

/Users/yangmiao/config/rocketmq

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示MasterSlave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功
状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址、物理ip,不能用127.0.0.1、localhost、docker内网ip
brokerIP1 = 10.220.181.76

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

创建自定义的rocketmq.yml

注意

  • 一开始我选择的tag是5.1.0-alpine,而非apacherocketmq/rocketmq:latest,执行docker命令时报错,提示找不到apacherocketmq/rocketmq:5.1.0-alpine
    在这里插入图片描述

我打开docker客户端,查看镜像这一栏,镜像名称是apacherocketmq/rocketmq:latest,于是修改镜像名称
在这里插入图片描述

最终配置如下:

version: '3'
services:
  namesrv:
    image: apacherocketmq/rocketmq:latest
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    command: sh mqnamesrv
  broker:
    image: apacherocketmq/rocketmq:latest
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - /Users/yangmiao/config/rocketmq/broker.conf:/home/rocketmq/rocketmq-5.1.0/conf/broker.conf
    command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.1.0/conf/broker.conf
    depends_on:
      - namesrv
  mqconsole:
    image: apacherocketmq/rocketmq-dashboard
    container_name: rmqdashboard
    ports:
      - 8181:8080
    environment:
      JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false
    depends_on:
      - namesrv
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

1.3.3 启动容器

执行docker命令,启动容器:

docker-compose -f rocketmq.yml up -d
  • 1

查看docker客户端Containers
在这里插入图片描述
在浏览器上访问http://localhost:8181/#/
在这里插入图片描述

二、RocketMQ与Springboot整合

2.1 搭建环境

在pom.xml中引入依赖

<dependency>
   <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

application.yml中配置
定义producer的group

rocketmq:
  name-server: 10.220.181.76:9876
  consumer:
    group: pushConsumerGroup
  producer:
    group: pushProducerGroup
    send-message-timeout: 3000
    retry-times-when-send-async-failed: 2
    retry-times-when-send-failed: 2
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.2 使用

2.2.1 生产者

package com.ym.pushserver.rocketmq;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/**
 * @Author: Yangmiao
 * @Date: 2023/3/16 10:07
 * @Desc: 生产者发送
 * copy: https://www.jianshu.com/p/f7b59073ea01
 */
@Component
@Slf4j
public class RocketMQProducer {
    /**
     * rocketmq模板注入
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 普通发送
     * @param topic     消息主题
     * @param msg       消息体
     * @param <T>       消息泛型
     */
    public <T> void send(String topic, T msg) {
        rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
        //rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等价于上面一行
    }


    /**
     * 发送带tag的消息,直接在topic后面加上":tag"
     *
     * @param topic     消息主题
     * @param tag       消息tag
     * @param msg       消息体
     * @param <T>       消息泛型
     * @return
     */
    public <T> SendResult sendTagMsg(String topic, String tag, T msg) {
        topic = topic + ":" + tag;
        return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
    }


    /**
     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
     * sendResult为返回的发送结果
     */
    public <T> SendResult sendMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
        return sendResult;
    }


    /**
     * 发送异步消息
     * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
     * (适合对响应时间敏感的业务场景)
     * @param topic     消息Topic
     * @param msg       消息实体
     *
     */
    public <T> void asyncSend(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---发送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
            }
        });
    }


    /**
     * 发送异步消息
     * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
     * (适合对响应时间敏感的业务场景)
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback);
    }


    /**
     * 发送异步消息
     *
     * @param topic         消息Topic
     * @param message       消息实体
     * @param sendCallback  回调函数
     * @param timeout       超时时间
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }


    /**
     * 同步延迟消息
     * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
     * RocketMQ 目前只支持固定精度的定时消息。
     * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 延迟的底层方法是用定时任务实现的。
     * 发送延时消息(delayLevel的值就为0,因为不延时)
     *
     * @param topic         消息主题
     * @param msg           消息体
     * @param timeout       发送超时时间
     * @param delayLevel    延迟级别  1到18
     * @param <T>           消息泛型
     */
    public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
    }


    /**
     * 发送异步延迟消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }


    /**
     * 发送异步延迟消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---发送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
            }
        }, timeout, delayLevel);
    }


    /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     * @param topic     消息主题
     * @param msg       消息体
     * @param <T>       消息泛型
     */
    public <T> void sendOneWayMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendOneWay(topic, message);
    }


    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param <T>       消息泛型
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }


    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param timeout   超时时间
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }


    /**
     * 发送批量消息
     *
     * @param topic     消息主题
     * @param msgList   消息体集合
     * @param <T>       消息泛型
     * @return
     */
    public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
        List<Message<T>> messageList = msgList.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
        return rocketMQTemplate.syncSend(topic, messageList);
    }

    /**
     * 发送事务消息
     *
     * @param txProducerGroup   事务消息的生产者组名称
     * @param topic             事务消息主题
     * @param tag               事务消息tag
     * @param msg               事务消息体
     * @param arg               事务消息监听器回查参数
     * @param <T>               事务消息泛型
     */
    public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
        if(!StringUtils.isEmpty(tag)){
            topic = topic + ":" + tag;
        }
        String transactionId = UUID.randomUUID().toString();
        Message<T> message = MessageBuilder.withPayload(msg)
                //header也有大用处
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//                .setHeader("share_id", msg.getId())
                .build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, message, arg);
        if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
                && result.getSendStatus().equals(SendStatus.SEND_OK)){
            log.info("事物消息发送成功");
        }
        log.info("事物消息发送结果:{}", result);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268

2.2.2 消费者

package com.ym.pushserver.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @Author: Yangmiao
 * @Date: 2023/3/16 10:33
 * @Desc: 消费端
 */
@Component
@RocketMQMessageListener(consumerGroup = "test_group",topic = RocketMQConstant.DEFAULT_TOPIC)
@Slf4j
public class RocketMQConsmer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("Push msg:"+s);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2.2.3 测试

在网址中访问localhost:8081/push/mq

package com.ym.pushserver.controller;

import cn.hutool.core.util.StrUtil;
import com.ym.pushserver.entity.R;
import com.ym.pushserver.redis.RedisUtil;
import com.ym.pushserver.rocketmq.RocketMQConstant;
import com.ym.pushserver.rocketmq.RocketMQProducer;
import com.ym.pushserver.service.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: Yangmiao
 * @Date: 2023/3/9 20:00
 * @Desc:
 */
@RestController
@RequestMapping("/push")
@Slf4j
public class PushController {

    @Autowired
    private WebSocketServer webSocketServer;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/mq")
    public R testMQ(){
        Message<String> msg = MessageBuilder.withPayload("测试MQ").build();
        rocketMQProducer.asyncSend(RocketMQConstant.DEFAULT_TOPIC, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.debug("发送成功,"+sendResult.toString());
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("发送失败,"+throwable.getMessage());
            }
        });
        return R.ok();
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

查看Dashboard,统计中有一条信息
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/256818
推荐阅读
相关标签
  

闽ICP备14008679号