当前位置:   article > 正文

RocketMQ—RocketMQ集成SpringBoot

rocketmq集成springboot

RocketMQ—RocketMQ集成SpringBoot

新建生产者的boot项目和消费者的boot项目,pom文件重点如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.25</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

02-boot-producer和03-boot-consumer分别对应生产者和消费者。

项目结构

生产者

生产者yml文件如下:

rocketmq:
    name-server: 地址:端口
    producer:
        group: boot-producer-group
  • 1
  • 2
  • 3
  • 4

同步发送消息

生产者同步发送消息的代码如下:

@SpringBootTest
class Rocketmq02BootProducerApplicationTests {
    //注入rocketMQTemplate

   	@Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void producer(){
        rocketMQTemplate.syncSend("bootTestTopic","这是boot的一个消息");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

运行完毕看面板如下:

面板

发送异步消息

// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功");
    }

    @Override
    public void onException(Throwable throwable) {
        System.out.println("失败" + throwable.getMessage());
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

发送单向消息

rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
  • 1

延迟消息

// 延迟消息
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); //第三个参数表示连接消息队列的超时时间,第四个参数表示延时等级
  • 1
  • 2
  • 3

顺序消息

MSGModel类如下

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {

    private String orderSn;
    private Integer userId;
    private String desc; // 下单 短信 物流

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

发送顺序消息的生产者如下:

//发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
    new MsgModel("qwer", 1, "下单"),
    new MsgModel("qwer", 1, "短信"),
    new MsgModel("qwer", 1, "物流"),
    new MsgModel("zxcv", 2, "下单"),
    new MsgModel("zxcv", 2, "短信"),
    new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
    // 发送  一般都是以json的方式进行处理
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
    //第二个参数表示消息内容	第三个参数表示hashKey
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

带标签的消息

@Test
void tagKeyTest() throws Exception {
    rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}
  • 1
  • 2
  • 3
  • 4

带key的消息

@Test
void tagKeyTest() throws Exception {
   // key是写带在消息头的
    Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
        .setHeader(RocketMQHeaders.KEYS, "key-id-1")
        .build();
    rocketMQTemplate.syncSend("bootKeyTopic", message);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消费者

yml配置文件如下:

server:
  port: 8890
rocketmq:
  name-server: 地址:端口
  • 1
  • 2
  • 3
  • 4

简单消费者

消费者代码如下

@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group")
public class ASimpleMsgListener implements RocketMQListener<MessageExt> {
    //如果泛型指定固定类型,消息体就是我们的参数
    //MessageExt 是消息所有内容,可以拿到所有内容

    /**
     * 这个方法就是消费消息的方法
     * 只要没有报错,就签收了
     * 如果报错了,就是拒收,就会重试
     * @param message 是消息内容
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

运行结果

顺序消息的消费者

@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

带tag的消费者

@Component
@RocketMQMessageListener(topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式  这种一般不用,这种默认没有开启,需要在sql92 
                         //需要在broker.conf配置文件中开启enbalePropertyFilter=true
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/434558
推荐阅读
相关标签
  

闽ICP备14008679号