当前位置:   article > 正文

Spring Boot对接RocketMQ示例

Spring Boot对接RocketMQ示例

部署服务

参考RocketMq入门介绍

示例

引入maven依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

完整依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.9</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rocketMqDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketMqDemo</name>
    <description>rocketMqDemo</description>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

修改application.properties文件

配置文件如下:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.consumer.topic=test-topic
  • 1
  • 2
  • 3
  • 4

所有的配置参考RocketMQProperties源码中配置。
rocketmq.name-server:服务地址
rocketmq.producer.group:生产者的组名称
rocketmq.consumer.topic:消费者的主题名称

定义生产者

生产者是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。

生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。

生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
在这里插入图片描述
代码示例如下:

@Component
public class RocketMqProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送同步消息
     * @param msg
     */
    public void sendSyncMsg(String msg){
        rocketMQTemplate.convertAndSend("test-topic-1", msg);
    }

    /**
     * 发送Spring消息
     * @param msg
     */
    public void sendSpringMsg(String msg){
        rocketMQTemplate.send("test-topic-1"
                , MessageBuilder.withPayload(msg).build());
    }

    /**
     * 发送异步消息
     * @param msg
     */
    public void sendAsyncMsg(String msg){
        rocketMQTemplate.asyncSend("test-topic-1", new MsgBean(msg), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }

        });
    }

    /**
     * 发送有序消息
     * @param msg
     */
    public void sendOrderlyMsg(String msg){
        rocketMQTemplate.syncSendOrderly("test-topic-1",MessageBuilder.withPayload(msg).build(),"hashkey");

    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

定义消费者

消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

在消息消费端,可以定义如下传输行为:

  • 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。

  • 消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。具体信息,请参见消费者分类。

  • 消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
    在这里插入图片描述

代码示例如下:

@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-group")
@Component
public class RocketMqConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        System.out.println("received message: "+ JSON.toJSONString(message));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

定义Controller调用消费者

代码示例如下:

@Controller
public class RestController {
    @Autowired
    RocketMqProducer producer;
    
    @RequestMapping(value = "/sendSyncMsg")
    @ResponseBody
    public String sendSyncMsg(){
        producer.sendSyncMsg("hello word");
        return "ok";    
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/152168
推荐阅读
相关标签
  

闽ICP备14008679号