当前位置:   article > 正文

Java基础之《微服务(10)—消息驱动》_java微服务消息收发模块

java微服务消息收发模块

一、什么是bus

1、什么是spring cloud bus
spring cloud bus集成了市面上常用的消息代理(rabbit mq、kafka等2种),连接微服务系统中的所有节点,当有数据变更时,可以通过消息代理广播通知微服务及时变更数据。例如微服务的配置更新。

2、bus解决了什么问题
解决了微服务数据变更,及时同步的问题。

二、消息发送模块

1、复制或者新建一个maven模块mycloud-stream-sender

2、pom文件
添加rabbit依赖包spring-cloud-starter-stream-rabbit

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <parent>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-parent</artifactId>
  9. <version>2.1.8.RELEASE</version>
  10. <relativePath /> <!-- lookup parent from repository -->
  11. </parent>
  12. <groupId>com.example</groupId>
  13. <artifactId>mycloud-stream-sender</artifactId>
  14. <version>0.0.1-SNAPSHOT</version>
  15. <name>mycloud-stream-sender</name>
  16. <description>Demo project for Spring Boot</description>
  17. <properties>
  18. <java.version>1.8</java.version>
  19. <spring-cloud.version>Greenwich.SR5</spring-cloud.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-test</artifactId>
  29. <scope>test</scope>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.cloud</groupId>
  33. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-web</artifactId>
  38. </dependency>
  39. <!-- 健康监控配置 -->
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-starter-actuator</artifactId>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.springframework.cloud</groupId>
  46. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  47. </dependency>
  48. </dependencies>
  49. <dependencyManagement>
  50. <dependencies>
  51. <dependency>
  52. <groupId>org.springframework.cloud</groupId>
  53. <artifactId>spring-cloud-dependencies</artifactId>
  54. <version>${spring-cloud.version}</version>
  55. <type>pom</type>
  56. <scope>import</scope>
  57. </dependency>
  58. </dependencies>
  59. </dependencyManagement>
  60. <build>
  61. <plugins>
  62. <plugin>
  63. <groupId>org.springframework.boot</groupId>
  64. <artifactId>spring-boot-maven-plugin</artifactId>
  65. </plugin>
  66. </plugins>
  67. </build>
  68. </project>

3、发送接口
ISendService.java

  1. package com.example.mycloud.resource;
  2. import org.springframework.cloud.stream.annotation.Output;
  3. import org.springframework.messaging.SubscribableChannel;
  4. /**
  5. * 发送的接口
  6. * @author user
  7. *
  8. */
  9. public interface ISendService {
  10. @Output("stream-exchange")
  11. public SubscribableChannel send();
  12. }

4、测试类
TestSender.java

  1. package com.example.mycloud.resource;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.messaging.Message;
  4. import org.springframework.messaging.support.MessageBuilder;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class TestSender {
  9. @Autowired
  10. private ISendService sendService;
  11. @RequestMapping("/send")
  12. public void send() {
  13. String msg = "abc..........";
  14. Message message = MessageBuilder.withPayload(msg.getBytes()).build();
  15. sendService.send().send(message);
  16. }
  17. }

5、启动类
StreamSenderApplication.java

  1. package com.example.mycloud.run;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
  5. import org.springframework.cloud.stream.annotation.EnableBinding;
  6. import org.springframework.context.annotation.ComponentScan;
  7. import com.example.mycloud.resource.ISendService;
  8. @EnableBinding({ISendService.class}) //把发送接口绑定进来
  9. @EnableDiscoveryClient
  10. @SpringBootApplication
  11. @ComponentScan("com.example.mycloud")
  12. public class StreamSenderApplication {
  13. public static void main(String[] args) {
  14. SpringApplication.run(StreamSenderApplication.class, args);
  15. }
  16. }

6、配置文件

  1. spring:
  2. application:
  3. name: stream-sender
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: admin
  8. password: admin
  9. virtual-host: my_vhost
  10. server:
  11. port: 8017
  12. eureka:
  13. server:
  14. port: 8010
  15. instance:
  16. hostname: localhost
  17. client:
  18. registerWithEureka: true
  19. fetchRegistry: true
  20. serviceUrl:
  21. defaultZone: http://admin:123456@${eureka.instance.hostname}:${eureka.server.port}/eureka/
  22. #暴露actuator的所有端口
  23. management:
  24. endpoints:
  25. web:
  26. exposure:
  27. include: "*"
  28. endpoint:
  29. health:
  30. show-details: ALWAYS
  31. shutdown:
  32. #启用shutdown
  33. enabled: true
  34. #禁用密码验证
  35. sensitive: false

三、消息接收模块

1、复制或者新建一个maven模块mycloud-stream-receiver

2、pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <parent>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-parent</artifactId>
  9. <version>2.1.8.RELEASE</version>
  10. <relativePath /> <!-- lookup parent from repository -->
  11. </parent>
  12. <groupId>com.example</groupId>
  13. <artifactId>mycloud-stream-receiver</artifactId>
  14. <version>0.0.1-SNAPSHOT</version>
  15. <name>mycloud-stream-receiver</name>
  16. <description>Demo project for Spring Boot</description>
  17. <properties>
  18. <java.version>1.8</java.version>
  19. <spring-cloud.version>Greenwich.SR5</spring-cloud.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-test</artifactId>
  29. <scope>test</scope>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.cloud</groupId>
  33. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-web</artifactId>
  38. </dependency>
  39. <!-- 健康监控配置 -->
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-starter-actuator</artifactId>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.springframework.cloud</groupId>
  46. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  47. </dependency>
  48. </dependencies>
  49. <dependencyManagement>
  50. <dependencies>
  51. <dependency>
  52. <groupId>org.springframework.cloud</groupId>
  53. <artifactId>spring-cloud-dependencies</artifactId>
  54. <version>${spring-cloud.version}</version>
  55. <type>pom</type>
  56. <scope>import</scope>
  57. </dependency>
  58. </dependencies>
  59. </dependencyManagement>
  60. <build>
  61. <plugins>
  62. <plugin>
  63. <groupId>org.springframework.boot</groupId>
  64. <artifactId>spring-boot-maven-plugin</artifactId>
  65. </plugin>
  66. </plugins>
  67. </build>
  68. </project>

3、接收接口
IReceiveService.java

  1. package com.example.mycloud.resource;
  2. import org.springframework.cloud.stream.annotation.Input;
  3. import org.springframework.messaging.SubscribableChannel;
  4. /**
  5. * 接收的接口
  6. * @author user
  7. *
  8. */
  9. public interface IReceiveService {
  10. @Input("stream-exchange")
  11. public SubscribableChannel receive();
  12. }

4、监听类
ReceiveService.java

  1. package com.example.mycloud.service;
  2. import org.springframework.cloud.stream.annotation.EnableBinding;
  3. import org.springframework.cloud.stream.annotation.StreamListener;
  4. import org.springframework.stereotype.Service;
  5. import com.example.mycloud.resource.IReceiveService;
  6. @Service
  7. @EnableBinding(IReceiveService.class) //绑定
  8. public class ReceiveService {
  9. @StreamListener("stream-exchange")
  10. public void onReceive(byte[] msg) {
  11. System.out.println("receive: " + new String(msg));
  12. }
  13. }

5、启动类
StreamReceiverApplication.java

  1. package com.example.mycloud.run;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
  5. import org.springframework.context.annotation.ComponentScan;
  6. @EnableDiscoveryClient
  7. @SpringBootApplication
  8. @ComponentScan("com.example.mycloud")
  9. public class StreamReceiverApplication {
  10. public static void main(String[] args) {
  11. SpringApplication.run(StreamReceiverApplication.class, args);
  12. }
  13. }

6、配置文件

  1. spring:
  2. application:
  3. name: stream-receiver
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: admin
  8. password: admin
  9. virtual-host: my_vhost
  10. server:
  11. port: 8018
  12. eureka:
  13. server:
  14. port: 8010
  15. instance:
  16. hostname: localhost
  17. client:
  18. registerWithEureka: true
  19. fetchRegistry: true
  20. serviceUrl:
  21. defaultZone: http://admin:123456@${eureka.instance.hostname}:${eureka.server.port}/eureka/
  22. #暴露actuator的所有端口
  23. management:
  24. endpoints:
  25. web:
  26. exposure:
  27. include: "*"
  28. endpoint:
  29. health:
  30. show-details: ALWAYS
  31. shutdown:
  32. #启用shutdown
  33. enabled: true
  34. #禁用密码验证
  35. sensitive: false

四、stream解决了什么问题

1、应用图

Application Core:应用服务

2、stream解决了开发人员无感知使用消息中间件的问题
因为stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(例如从rabbitmq切换为kafka)。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

3、middleware
消息中间件,目前只支持rabbitmq和kafka。

4、binder
binder是应用与消息中间件之间的封装。目前实现了kafka和rabbitmq的binder。通过binder,可以很方便的连接中间件,可以动态改变消息类型。

5、@Input
注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

6、@Output
注解标识输出通道,发布的消息将通过该通道离开应用程序。

7、@StreamListener
监听队列,用于消费者的队列的消息接收。

8、@EnableBinding
指信道channel和exchange绑定在一起。

五、消息的分组

1、之前的服务有什么问题
(1)队列是临时队列
(2)如果起多个receiver就会有多个队列queue绑定到exchange上,多个服务会同时接收到消息,消费多次

2、指定信道绑定交换器,交换器绑定队列
stream-sender模块
ISendService.java添加:

  1. package com.example.mycloud.resource;
  2. import org.springframework.cloud.stream.annotation.Output;
  3. import org.springframework.messaging.SubscribableChannel;
  4. /**
  5. * 发送的接口
  6. * @author user
  7. *
  8. */
  9. public interface ISendService {
  10. @Output("stream-exchange")
  11. public SubscribableChannel send();
  12. //消息分组
  13. String OUTPUT = "groupOutput";
  14. @Output(ISendService.OUTPUT)
  15. public SubscribableChannel send2();
  16. }

TestSender.java添加:

  1. package com.example.mycloud.resource;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.messaging.Message;
  4. import org.springframework.messaging.support.MessageBuilder;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import com.example.mycloud.bean.Product;
  8. @RestController
  9. public class TestSender {
  10. @Autowired
  11. private ISendService sendService;
  12. @RequestMapping("/send")
  13. public void send() {
  14. String msg = "abc..........";
  15. Message message = MessageBuilder.withPayload(msg.getBytes()).build();
  16. sendService.send().send(message);
  17. }
  18. //消息分组
  19. @RequestMapping("/send2")
  20. public void send2() {
  21. Product product = new Product();
  22. product.setId("1");
  23. product.setName("abc");
  24. Message message = MessageBuilder.withPayload(product).build();
  25. sendService.send().send(message);
  26. }
  27. }

配置文件添加:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. groupOutput:
  6. #指定输出通道对应的exchange主题名
  7. destination: object-exchange

stream-receiver模块
IReceiveService.java添加:

  1. package com.example.mycloud.resource;
  2. import org.springframework.cloud.stream.annotation.Input;
  3. import org.springframework.messaging.SubscribableChannel;
  4. /**
  5. * 接收的接口
  6. * @author user
  7. *
  8. */
  9. public interface IReceiveService {
  10. @Input("stream-exchange")
  11. public SubscribableChannel receive();
  12. //消息分组
  13. String INPUT = "groupInput";
  14. @Input(IReceiveService.INPUT)
  15. public SubscribableChannel receive2();
  16. }

ReceiveService.java添加:

  1. package com.example.mycloud.service;
  2. import org.springframework.cloud.stream.annotation.EnableBinding;
  3. import org.springframework.cloud.stream.annotation.StreamListener;
  4. import org.springframework.stereotype.Service;
  5. import com.example.mycloud.bean.Product;
  6. import com.example.mycloud.resource.IReceiveService;
  7. @Service
  8. @EnableBinding(IReceiveService.class) //绑定
  9. public class ReceiveService {
  10. @StreamListener("stream-exchange")
  11. public void onReceive(byte[] msg) {
  12. System.out.println("receive: " + new String(msg));
  13. }
  14. //消息分组
  15. @StreamListener(IReceiveService.INPUT)
  16. public void onReceive2(Product obj) {
  17. System.out.println("receive: " + obj.toString());
  18. }
  19. }

配置文件添加:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. groupInput:
  6. #指定输入通道对应的exchange主题名
  7. destination: object-exchange
  8. #具体分组对应MQ的队列名称,并持久化队列
  9. group: groupQueue

3、查看绑定情况

消息分组解决了,队列持久化问题。还有集群中一个队列对应多个consumer,只消费一次。

六、消息的分区
集群环境下,多个服务组成一个集群,如果想让相同的消息被同一个服务消费

1、stream-receiver模块,修改配置文件

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. partInput:
  6. consumer:
  7. #开启消费者分区功能
  8. partitioned: true
  9. #指定了当前消费者的总示例数量
  10. instanceCount: 2
  11. #设置当前实例的索引号,从0开始
  12. instanceIndex: 0

2、stream-sender模块,修改配置文件

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. partOutput:
  6. producer:
  7. #通过该参数指定了分区键的表达式规则
  8. partitionKeyExpression: payload
  9. #指定了消息分区的数量
  10. partitionCount: 2

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/840710
推荐阅读
相关标签
  

闽ICP备14008679号