赞
踩
消息渠道简单点说就是利用rabbitmq和kafka发送消息的。
Stream解决了开发人员无感知使用消息中间件的问题。
因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(例如rabbitmq和kafka)。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。(仅修改pom和properties文件)
我们需要2个项目,一个sender和一个receiver。
receiver项目结构
pom文件的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件的代码:
spring.application.name=stream-receiver
server.port=9041
eureka.client.serviceUrl.defaultZone=http://user:test@eureka2:8762/eureka/,http://user:test@eureka1:8761/eureka/
eureka.instance.perferIpAddress=true
#spring.cloud.config.server.git.uri=https://gitee.com/agan_jiagou/config
spring.cloud.config.server.git.uri=https://gitee.com/Xinyangyunyang/config
spring.cloud.config.server.git.username=997355706@qq.com
spring.cloud.config.server.git.password=wen1314520++
spring.rabbitmq.host=192.168.23.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=agan
spring.rabbitmq.password=123456
spring.rabbitmq.virtualHost=/
StreamReceiverApplication代码:
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({IReceiveService.class})
public class StreamReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverApplication.class, args);
}
}
IReceiveService代码:
public interface IReceiveService {
@Input("me-exchange")
SubscribableChannel receive();
}
ReceiveService代码:
@Service
@EnableBinding({IReceiveService.class})
public class ReceiveService {
@StreamListener("me-exchange")
public void onReceive(byte[] msg) {
System.out.println("receive:" + new String(msg));
}
}
sender项目目录结构
pom文件依赖和配置文件都是一样的,就是加上rabbitmq和依赖和配置就OK了,可以参考上面。然后sender中多了一个测试类。
StreamSenderApplication代码:
@SpringBootApplication
@EnableEurekaClient
@EnableBinding({ISendService.class})
public class StreamSenderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamSenderApplication.class, args);
}
}
ISendService代码:
public interface ISendService {
@Output("me-exchange")
SubscribableChannel send();
}
测试StreamTests代码:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamSenderApplication.class)
public class StreamTests {
@Autowired
private ISendService send;
@Test
public void send() throws InterruptedException {
String msg = "agan..........";
Message message = MessageBuilder.withPayload(msg.getBytes()).build();
this.send.send().send(message);
}
}
然后启动receiver进行测试:
接收消息成功
消息的分组解决了临时队列的问题使队列持久化。
我们还是利用上面的项目改造,上面我们发送的是string,这回发送一个对象product,然后修改一下配置文件就OK了。下面我只讲到需要修改的地方。
group-receiver项目目录结构
添加的Product类:
public class Product implements Serializable{ private Integer id; private String name; private Byte status; private Integer price; private Byte deleted; private Date createTime; private Date updateTime; private String detail; public Product() { //构造函数 super(); } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name == null ? null : name.trim(); } public Byte getStatus() { return status; } public void setStatus(Byte status) { this.status = status; } public Integer getPrice() { return price; } public void setPrice(Integer price) { this.price = price; } public Byte getDeleted() { return deleted; } public void setDeleted(Byte deleted) { this.deleted = deleted; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getDetail() { return detail; } public void setDetail(String detail) { this.detail = detail == null ? null : detail.trim(); } @Override public String toString() { return "Product [id=" + id + ", name=" + name + "]"; } }
当然你们可以自己定义。
IReceiveService代码:
public interface IReceiveService {
String INPUT="inputProduct";
@Input(INPUT)
SubscribableChannel receive();
}
ReceiveService代码:
@Service
@EnableBinding({IReceiveService.class})
public class ReceiveService {
@StreamListener(IReceiveService.INPUT)
public void onReceive(Product obj) {
System.out.println("receive:" + obj.toString());
}
}
配置文件添加的代码,其他和上面的一样:
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
这个bindings后面的值就是IReceiveService 里面的INPUT值,然后这个交换器的名字叫做exchangeProduct。
group-sender项目目录结构
这个也是和上面一样,添加了一个类,和上面一样的。然后修改了配置文件的代码。
ISendService代码:
public interface ISendService {
String OUTPUT="outputProduct";
@Output(OUTPUT)
SubscribableChannel send();
}
StreamTests代码:
@RunWith(SpringRunner.class) @SpringBootTest(classes = StreamSenderApplication.class) public class StreamTests { @Autowired private ISendService send; @Test public void send() throws InterruptedException { Product obj = new Product(); obj.setId(100); obj.setName("spring cloud"); for (int i = 0; i < 10; i++) { Message message = MessageBuilder.withPayload(obj).build(); this.send.send().send(message); } } }
配置文件中添加的代码:
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
同样的bindings后面的outputProduct是ISendService 里面的OUTPUT,交换器名称都是exchangeProduct。
启动项目测试:
接收成功。
消息分区在消息分组之后,如果有多个消息发送,也有多个服务,那么这个消息会被发送到2个服务上面。如果添加消息分区,消息只被分配到集群的同一个节点上面。
我们还是利用上面的项目,将receiver复制一份修改一下端口,启动2个项目。
测试:
发送了10次receiver1中6次,receiver2中4次。
所以不是讲消息发给同一个集群节点。然后我们只需要在配置文件中修改一下就可以起到消息分区的效果。
我们需要2个receiver端,复制分组的项目,然后只需要修改一下配置文件。
在receiver1的配置文件中添加:
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从0开始
spring.cloud.stream.instanceIndex=0
在receiver2的配置文件中添加:
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从0开始
spring.cloud.stream.instanceIndex=1
在sender项目文件额配置文件中添加:
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
然后启动项目测试,测试类利用循环发送多条消息。
测试:
receiver2接收到了10条消息。
感谢大家的观看,如果有什么问题或者交流的可以加我qq997355706,么么哒。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。