赞
踩
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
完整依赖如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.9</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rocketMqDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketMqDemo</name> <description>rocketMqDemo</description> <properties> <java.version>8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件如下:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.consumer.topic=test-topic
所有的配置参考RocketMQProperties源码中配置。
rocketmq.name-server:服务地址
rocketmq.producer.group:生产者的组名称
rocketmq.consumer.topic:消费者的主题名称
生产者是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。
生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。
生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
代码示例如下:
@Component public class RocketMqProducer { @Resource private RocketMQTemplate rocketMQTemplate; /** * 发送同步消息 * @param msg */ public void sendSyncMsg(String msg){ rocketMQTemplate.convertAndSend("test-topic-1", msg); } /** * 发送Spring消息 * @param msg */ public void sendSpringMsg(String msg){ rocketMQTemplate.send("test-topic-1" , MessageBuilder.withPayload(msg).build()); } /** * 发送异步消息 * @param msg */ public void sendAsyncMsg(String msg){ rocketMQTemplate.asyncSend("test-topic-1", new MsgBean(msg), new SendCallback() { @Override public void onSuccess(SendResult var1) { System.out.printf("async onSucess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { System.out.printf("async onException Throwable=%s %n", var1); } }); } /** * 发送有序消息 * @param msg */ public void sendOrderlyMsg(String msg){ rocketMQTemplate.syncSendOrderly("test-topic-1",MessageBuilder.withPayload(msg).build(),"hashkey"); } }
消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
在消息消费端,可以定义如下传输行为:
消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。具体信息,请参见消费者分类。
消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
代码示例如下:
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-group")
@Component
public class RocketMqConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("received message: "+ JSON.toJSONString(message));
}
}
代码示例如下:
@Controller
public class RestController {
@Autowired
RocketMqProducer producer;
@RequestMapping(value = "/sendSyncMsg")
@ResponseBody
public String sendSyncMsg(){
producer.sendSyncMsg("hello word");
return "ok";
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。