当前位置:   article > 正文

Spring Boot 整合rabbitmq_springboot整合rabbitmq

springboot整合rabbitmq

一、利用rabbitmq客户端手写生产者和消费者

目录结构以及需要的类:

 1.  在pom文件里添加依赖rabbitmq client:

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>3.6.5</version>
  5. </dependency>

2. 新建connection工具类, 设置虚拟主机、端口、用户名和密码,默认端口为5672

  1. package com.example.shop.common.utils.rabbitmq;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class ConnectionUtil {
  7. private static final String host = "localhost";
  8. private static final Integer port = 5672;
  9. private static final String username = "guest";
  10. private static final String password = "guest";
  11. private static final String visualHost = "/";
  12. public static Connection getConnection() throws IOException, TimeoutException {
  13. ConnectionFactory connectionFactory = new ConnectionFactory();
  14. connectionFactory.setHost(host);
  15. connectionFactory.setPort(port);
  16. connectionFactory.setUsername(username);
  17. connectionFactory.setPassword(password);
  18. connectionFactory.setVirtualHost(visualHost);
  19. return connectionFactory.newConnection();
  20. }
  21. }

3.  新建消息生产者工具类

  1. package com.example.shop.common.utils.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class ProducerUtil {
  7. private static Connection connection;
  8. // 生产者是产生消息
  9. public static void produceMessage(String message) throws IOException, TimeoutException {
  10. connection = ConnectionUtil.getConnection();
  11. // 1. 创建一个信道 channel
  12. Channel channel = connection.createChannel();
  13. // 2. 声明一个交换机
  14. channel.exchangeDeclare("goods-exchange", "direct", true);
  15. // 3. 创建一个队列
  16. channel.queueDeclare("bug-goods", true, false, false, null);
  17. // 4. 交换机绑定队列, 定义routing key为good001
  18. channel.queueBind("bug-goods", "goods-exchange", "good001");
  19. // 5. 发布消息
  20. channel.basicPublish("goods-exchange", "good001", null, message.getBytes());
  21. System.out.println("消息发布成功...");
  22. }
  23. }
  1. 下面是对上面几个方法的用法解析:
  2. channel.exchangeDeclare("goods-exchange", "direct", true) : 声明一个交换机,名称为goods-exchange, 交互机的作用是后面可以绑定消息队列,然后通过匹配routing key来指定路由到哪个队列。 arg1 : 交换机名称, arg2 : 交换机类型(direct、topic、fanout), arg3: durable, 是否持久化。
  3. channel.queueDeclare("bug-goods", true, false, false, null): 声明一个队列,名称为 bug-goods, 查看源码的实现类,可得知几个参数的含义: arg1: 消息队列名称, arg2: 是否持久化, arg3: exclusive, 是否私有,只能被第一个消费者消费,消费后其他消费者不能再消费到, arg4: 是否自动删除消息, arg5: 一些参数。
 this.delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
channel.queueBind("bug-goods", "goods-exchange", "good001");  将对列绑定指定的exchange, 并指定routing key ,如果 声明的exchange 的type 为fanout ,那么不需要指定routing key。 arg1: 队列名称, arg2: 交换机名称, arg3: routing key。channel.basicPublish("goods-exchange", "good001", null, message.getBytes());  往指定交换机发布一条消息。 arg1: 交换机名称,arg2: routing key, 如果没有这个参数是不可以的,如果不指定该参数那么会不指定路由到哪个队列,像是被阻塞了一样。 arg3: 基础属性, arg4: 消息内容,以字节形式传输。

4. 新建消费者工具类

     通过basicConsume() 方法指定rabbitmq客户端监听消息队列的名称,在handleDelivery()方法里接收消息。

     需要注意的是: 客户端需要向rabbitmq 服务确认已经接收到消息,存在rabbitmq里的消息才会删除掉。

  1. package com.example.shop.common.utils.rabbitmq;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConsumerUtil {
  6. private static Connection connection;
  7. // 消费者接收消息
  8. public static void reciveMsg() throws IOException, TimeoutException {
  9. connection = ConnectionUtil.getConnection();
  10. // 创建信道
  11. Channel channel = connection.createChannel();
  12. // 创建消费者,绑定消息队列, 第二个参数为自动确认
  13. channel.basicConsume("bug-goods", true, new Consumer() {
  14. @Override
  15. public void handleConsumeOk(String s) {
  16. System.out.println("handleConsumeOk:");
  17. }
  18. @Override
  19. public void handleCancelOk(String s) {
  20. System.out.println("handleCancelOk:" + s);
  21. }
  22. @Override
  23. public void handleCancel(String s) throws IOException {
  24. System.out.println("handleCancel:" + s);
  25. }
  26. @Override
  27. public void handleShutdownSignal(String s, ShutdownSignalException e) {
  28. System.out.println("handleShutdownSignal:" + s);
  29. }
  30. @Override
  31. public void handleRecoverOk(String s) {
  32. System.out.println("handleRecoverOk:" + s);
  33. }
  34. @Override
  35. public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
  36. System.out.println("handleDelivery:" + s);
  37. String msg = new String(bytes);
  38. System.out.println("收到消息:" + msg);
  39. // 可以根据rounting key来判断是不是自己想要的消息。在微服务架构中,可以使用服务名作为rounting key,实现单一接收消息。
  40. System.out.println("routing key: " + envelope.getRoutingKey());
  41. System.out.println("exchange: " + envelope.getExchange());
  42. }
  43. });
  44. }
  45. }

5.  测试

  1. package com.example.shop.common.utils.rabbitmq;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class MessageTest {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. // 1. 发布一条消息
  7. ProducerUtil.produceMessage("您好!");
  8. // 2. 接收消息
  9. ConsumerUtil.reciveMsg();
  10. }
  11. }

执行main方法后,会在队列里自动创建exchange和queue。

可以发现此时还是闲置状态。

场景一:

       不确认消息接收, 执行一次Main()方法,观察queue队列里的消息。

      可以发现有一条消息unacked,就是rabbitmq不知道消费方有没有接收到消息。

场景二:

       开启自动确认接收消息,确认接收消息,观察控制台和queue队列里的消息。

可以发现控制台收到了两条消息,队列的消息也被清掉,由此可以发现,rabbitmq只有确认接收到消息后才会清掉队列里的消息。

二、Spring Boot 整合rabbitmq

一、在management 桌面配置虚拟主机、交换机和队列

         1. 先进入到rabbitmq的管理页面,然后在管理页面新建一个虚拟主机,默认的虚拟主机为"/" ,不同的环境使用不同的虚拟主机,如开发环境可以设置为dev,生产环境可以设置为prod。

     

      2. 在默认的虚拟主机下,新增一个队列rabbitmqtest

二、测试案例

1. 添加rabbitmq依赖 :

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>

  2.如果下载不下来,在settings.xml文件中添加以下镜像: 

  1. <mirror>
  2. <id>nexus-aliyun</id>
  3. <name>Nexus aliyun</name>
  4. <mirrorOf>central</mirrorOf>
  5. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  6. </mirror>
  7. <mirror>
  8. <id>nexus</id>
  9. <name>internal nexus repository</name>
  10. <!-- <url>http://192.168.1.100:8081/nexus/content/groups/public/</url>-->
  11. <url>http://repo.maven.apache.org/maven2</url>
  12. <mirrorOf>central</mirrorOf>
  13. </mirror>

3.  新建一个controller,代码如下:

  1. package com.example.rabbitmq.web;
  2. import org.junit.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. @RequestMapping("/api/user")
  10. public class UserController {
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. @Autowired
  14. private Getmsg getmsg;
  15. @GetMapping("/send/msg")
  16. public String sendmsgtormq(){
  17. //指定向队列名为rabbitmqtest的消息队列中发送消息
  18. rabbitTemplate.convertAndSend("rabbitmqtest","发送一条消息");
  19. return "消息发送成功!";
  20. }
  21. }

4. 新建一个rabbitmq的监听器类,GetMsg,代码如下 

  1. package com.example.rabbitmq.web;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @RabbitListener(queues = "rabbitmqtest")
  6. @Component
  7. public class Getmsg {
  8. @RabbitHandler
  9. public void getmsg(String string) {
  10. System.out.println("ranbbitmqtest接收到的消息为:"+string);
  11. }
  12. }

5. 启动项目,输入地址:   http://localhost:9000/api/user/send/msg   ,监听器类收到队列的消息,队列的消息被消费后,默认会清除该条消息。

 

  6. 另外附一个常见问题,如果在启动的时候报错:
aused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'rabbitmqtest' in vhost '/dev', class-id=50, method-id=10)

   请检查该虚拟主机下是否存在有该名字对应的队列。

三、 rabbitmq开发实战

      1. 项目中需要将提交的单据到工作流,然后工作流会给前端一个响应为提交成功或者失败。业务模块会根据提交的工作流来更新对应的单据状态并根据单据的状态来做一些业务逻辑。

     

     发送消息代码用到了一个ApplicationEventPublisher  里的一个 publishEvent 方法,参数为object,支持任意类型的消息内容。

  1. /*
  2. * Copyright 2002-2016 the original author or authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.springframework.context;
  17. /**
  18. * Interface that encapsulates event publication functionality.
  19. * Serves as super-interface for {@link ApplicationContext}.
  20. *
  21. * @author Juergen Hoeller
  22. * @author Stephane Nicoll
  23. * @since 1.1.1
  24. * @see ApplicationContext
  25. * @see ApplicationEventPublisherAware
  26. * @see org.springframework.context.ApplicationEvent
  27. * @see org.springframework.context.event.EventPublicationInterceptor
  28. */
  29. @FunctionalInterface
  30. public interface ApplicationEventPublisher {
  31. /**
  32. * Notify all <strong>matching</strong> listeners registered with this
  33. * application of an application event. Events may be framework events
  34. * (such as RequestHandledEvent) or application-specific events.
  35. * @param event the event to publish
  36. * @see org.springframework.web.context.support.RequestHandledEvent
  37. */
  38. default void publishEvent(ApplicationEvent event) {
  39. publishEvent((Object) event);
  40. }
  41. /**
  42. * Notify all <strong>matching</strong> listeners registered with this
  43. * application of an event.
  44. * <p>If the specified {@code event} is not an {@link ApplicationEvent},
  45. * it is wrapped in a {@link PayloadApplicationEvent}.
  46. * @param event the event to publish
  47. * @since 4.2
  48. * @see PayloadApplicationEvent
  49. */
  50. void publishEvent(Object event);
  51. }

     ApplicationEventPublisher 类在 spring-context包里:

组装生产者消息:

  1. protected WorkflowCustomRemoteEvent createWorkflowCustomRemoteEvent(WorkFlowDocumentRef workFlowDocumentRef) {
  2. Assert.notNull(workFlowDocumentRef, "workFlowDocumentRef null");
  3. WorkflowMessageCO workflowMessageCO = new WorkflowMessageCO();
  4. workflowMessageCO.setUserBean(OrgInformationUtil.getUser());
  5. workflowMessageCO.setEntityOid(workFlowDocumentRef.getDocumentOid());
  6. workflowMessageCO.setEntityType(workFlowDocumentRef.getDocumentCategory().toString());
  7. workflowMessageCO.setStatus(workFlowDocumentRef.getStatus());
  8. workflowMessageCO.setUserId(workFlowDocumentRef.getCreatedBy());
  9. workflowMessageCO.setDocumentId(workFlowDocumentRef.getDocumentId());
  10. workflowMessageCO.setApprovalText(workFlowDocumentRef.getRejectReason());
  11. workflowMessageCO.setRemark("单据编号:" + workFlowDocumentRef.getDocumentNumber());
  12. workflowMessageCO.setDocumentTypeId(workFlowDocumentRef.getDocumentTypeId());
  13. workflowMessageCO.setDocumentTypeCode(workFlowDocumentRef.getDocumentTypeCode());
  14. String originalSevice = applicationName + ":**";
  15. String destinationService = workFlowDocumentRef.getDestinationService();
  16. WorkflowCustomRemoteEvent workflowCustomRemoteEvent = new WorkflowCustomRemoteEvent(
  17. this, originalSevice, destinationService, workflowMessageCO);
  18. return workflowCustomRemoteEvent;
  19. }

  由 WorkflowCustomRemoteEvent 实体来封装要发送的消息,该类实现了 ApplicationEvent类。

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package com.hand.hcf.app.mdata.client.workflow.event;
  6. import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  7. import com.fasterxml.jackson.annotation.JsonTypeInfo;
  8. import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
  9. import com.hand.hcf.app.mdata.client.workflow.dto.WorkflowMessageCO;
  10. import java.text.SimpleDateFormat;
  11. import java.util.Date;
  12. import org.springframework.cloud.bus.event.RemoteApplicationEvent;
  13. @JsonTypeInfo(
  14. use = Id.NAME,
  15. property = "type"
  16. )
  17. @JsonIgnoreProperties({"source"})
  18. public class WorkflowCustomRemoteEvent extends RemoteApplicationEvent {
  19. private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss:SSS");
  20. private WorkflowMessageCO workflowMessage;
  21. public WorkflowCustomRemoteEvent(Object source, String originService, String destinationService, WorkflowMessageCO workflowMessage) {
  22. super(source, originService, destinationService);
  23. this.workflowMessage = workflowMessage;
  24. }
  25. public String toString() {
  26. return "WorkflowCustomRemoteEvent{WorkflowMessageCO=" + this.workflowMessage + ",eventId:" + super.getId() + ",originService:" + super.getOriginService() + ",destinationService:" + super.getDestinationService() + ",time:" + simpleDateFormat.format(new Date(super.getTimestamp())) + '}';
  27. }
  28. public WorkflowMessageCO getWorkflowMessage() {
  29. return this.workflowMessage;
  30. }
  31. public void setWorkflowMessage(final WorkflowMessageCO workflowMessage) {
  32. this.workflowMessage = workflowMessage;
  33. }
  34. public boolean equals(final Object o) {
  35. if (o == this) {
  36. return true;
  37. } else if (!(o instanceof WorkflowCustomRemoteEvent)) {
  38. return false;
  39. } else {
  40. WorkflowCustomRemoteEvent other = (WorkflowCustomRemoteEvent)o;
  41. if (!other.canEqual(this)) {
  42. return false;
  43. } else {
  44. Object this$workflowMessage = this.getWorkflowMessage();
  45. Object other$workflowMessage = other.getWorkflowMessage();
  46. if (this$workflowMessage == null) {
  47. if (other$workflowMessage != null) {
  48. return false;
  49. }
  50. } else if (!this$workflowMessage.equals(other$workflowMessage)) {
  51. return false;
  52. }
  53. return true;
  54. }
  55. }
  56. }
  57. protected boolean canEqual(final Object other) {
  58. return other instanceof WorkflowCustomRemoteEvent;
  59. }
  60. public int hashCode() {
  61. int PRIME = true;
  62. int result = 1;
  63. Object $workflowMessage = this.getWorkflowMessage();
  64. int result = result * 59 + ($workflowMessage == null ? 43 : $workflowMessage.hashCode());
  65. return result;
  66. }
  67. public WorkflowCustomRemoteEvent() {
  68. }
  69. }

代码解析:

        WorkflowCustomRemoteEvent 继承了 RemoteApplicationEvent 类,RemoteApplicationEvent 该类是 spring cloud bus包下的一个接收发送消息的事件类,同时该类继承了spring -context的ApplicationEvent 类,引入spring cloud bus包,就可以使用RemoteApplicationEvent 向绑定的rabbitmq队列发送消息。

 spring cloud的bus依赖:

  1. <dependency>
  2.     <groupId>org.springframework.cloud</groupId>
  3.     <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. </dependency>

Spring Cloud Bus包里的RemoteApplicationEvent类,该类用来监听远程的事件:

@JsonIgnoreProperties 注解表示属性值source不会被解析成json数据: 

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package org.springframework.cloud.bus.event;
  6. import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  7. import com.fasterxml.jackson.annotation.JsonTypeInfo;
  8. import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
  9. import java.util.UUID;
  10. import org.springframework.context.ApplicationEvent;
  11. import org.springframework.util.StringUtils;
  12. @JsonTypeInfo(
  13. use = Id.NAME,
  14. property = "type"
  15. )
  16. @JsonIgnoreProperties({"source"})
  17. public abstract class RemoteApplicationEvent extends ApplicationEvent {
  18. private static final Object TRANSIENT_SOURCE = new Object();
  19. private final String originService;
  20. private final String destinationService;
  21. private final String id;
  22. protected RemoteApplicationEvent() {
  23. this(TRANSIENT_SOURCE, (String)null, (String)null);
  24. }
  25. protected RemoteApplicationEvent(Object source, String originService, String destinationService) {
  26. super(source);
  27. this.originService = originService;
  28. if (destinationService == null) {
  29. destinationService = "**";
  30. }
  31. if (!"**".equals(destinationService) && StringUtils.countOccurrencesOf(destinationService, ":") <= 1 && !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {
  32. destinationService = destinationService + ":**";
  33. }
  34. this.destinationService = destinationService;
  35. this.id = UUID.randomUUID().toString();
  36. }
  37. protected RemoteApplicationEvent(Object source, String originService) {
  38. this(source, originService, (String)null);
  39. }
  40. public String getOriginService() {
  41. return this.originService;
  42. }
  43. public String getDestinationService() {
  44. return this.destinationService;
  45. }
  46. public String getId() {
  47. return this.id;
  48. }
  49. public int hashCode() {
  50. int prime = true;
  51. int result = 1;
  52. int result = 31 * result + (this.destinationService == null ? 0 : this.destinationService.hashCode());
  53. result = 31 * result + (this.id == null ? 0 : this.id.hashCode());
  54. result = 31 * result + (this.originService == null ? 0 : this.originService.hashCode());
  55. return result;
  56. }
  57. public boolean equals(Object obj) {
  58. if (this == obj) {
  59. return true;
  60. } else if (obj == null) {
  61. return false;
  62. } else if (this.getClass() != obj.getClass()) {
  63. return false;
  64. } else {
  65. RemoteApplicationEvent other = (RemoteApplicationEvent)obj;
  66. if (this.destinationService == null) {
  67. if (other.destinationService != null) {
  68. return false;
  69. }
  70. } else if (!this.destinationService.equals(other.destinationService)) {
  71. return false;
  72. }
  73. if (this.id == null) {
  74. if (other.id != null) {
  75. return false;
  76. }
  77. } else if (!this.id.equals(other.id)) {
  78. return false;
  79. }
  80. if (this.originService == null) {
  81. if (other.originService != null) {
  82. return false;
  83. }
  84. } else if (!this.originService.equals(other.originService)) {
  85. return false;
  86. }
  87. return true;
  88. }
  89. }
  90. }

     业务逻辑收到消息源码分析:

      使用@EventListener注解来监听事件,如果收到事件,会在WorkflowCustomRemoteEvent类里进行回调,就到指定对应WorkflowEventConsumerInterface的实现类里进行处理:

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package com.hand.hcf.app.mdata.client.workflow.event;
  6. import org.springframework.context.event.EventListener;
  7. public interface WorkflowEventConsumerInterface {
  8. @EventListener({WorkflowCustomRemoteEvent.class})
  9. void workFlowConsumer(WorkflowCustomRemoteEvent event);
  10. }

 WorkflowCustomRemoteEvent完整代码: 

  1.  //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package com.hand.hcf.app.mdata.client.workflow.event;
  6. import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  7. import com.fasterxml.jackson.annotation.JsonTypeInfo;
  8. import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
  9. import com.hand.hcf.app.mdata.client.workflow.dto.WorkflowMessageCO;
  10. import java.text.SimpleDateFormat;
  11. import java.util.Date;
  12. import org.springframework.cloud.bus.event.RemoteApplicationEvent;
  13. @JsonTypeInfo(
  14. use = Id.NAME,
  15. property = "type"
  16. )
  17. @JsonIgnoreProperties({"source"})
  18. public class WorkflowCustomRemoteEvent extends RemoteApplicationEvent {
  19. private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss:SSS");
  20. private WorkflowMessageCO workflowMessage;
  21. public WorkflowCustomRemoteEvent(Object source, String originService, String destinationService, WorkflowMessageCO workflowMessage) {
  22. super(source, originService, destinationService);
  23. this.workflowMessage = workflowMessage;
  24. }
  25. public String toString() {
  26. return "WorkflowCustomRemoteEvent{WorkflowMessageCO=" + this.workflowMessage + ",eventId:" + super.getId() + ",originService:" + super.getOriginService() + ",destinationService:" + super.getDestinationService() + ",time:" + simpleDateFormat.format(new Date(super.getTimestamp())) + '}';
  27. }
  28. public WorkflowMessageCO getWorkflowMessage() {
  29. return this.workflowMessage;
  30. }
  31. public void setWorkflowMessage(final WorkflowMessageCO workflowMessage) {
  32. this.workflowMessage = workflowMessage;
  33. }
  34. public boolean equals(final Object o) {
  35. if (o == this) {
  36. return true;
  37. } else if (!(o instanceof WorkflowCustomRemoteEvent)) {
  38. return false;
  39. } else {
  40. WorkflowCustomRemoteEvent other = (WorkflowCustomRemoteEvent)o;
  41. if (!other.canEqual(this)) {
  42. return false;
  43. } else {
  44. Object this$workflowMessage = this.getWorkflowMessage();
  45. Object other$workflowMessage = other.getWorkflowMessage();
  46. if (this$workflowMessage == null) {
  47. if (other$workflowMessage != null) {
  48. return false;
  49. }
  50. } else if (!this$workflowMessage.equals(other$workflowMessage)) {
  51. return false;
  52. }
  53. return true;
  54. }
  55. }
  56. }
  57. protected boolean canEqual(final Object other) {
  58. return other instanceof WorkflowCustomRemoteEvent;
  59. }
  60. public int hashCode() {
  61. int PRIME = true;
  62. int result = 1;
  63. Object $workflowMessage = this.getWorkflowMessage();
  64. int result = result * 59 + ($workflowMessage == null ? 43 : $workflowMessage.hashCode());
  65. return result;
  66. }
  67. public WorkflowCustomRemoteEvent() {
  68. }
  69. }

准备接收消息: 

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package com.hand.hcf.app.mdata.client.workflow.event;
  6. import com.hand.hcf.app.mdata.base.util.OrgInformationUtil;
  7. import com.hand.hcf.app.mdata.client.workflow.dto.ApprovalNotificationCO;
  8. import com.hand.hcf.app.mdata.client.workflow.dto.ApprovalResultCO;
  9. import com.hand.hcf.app.mdata.client.workflow.dto.WorkflowMessageCO;
  10. import com.hand.hcf.app.mdata.client.workflow.enums.DocumentOperationEnum;
  11. import com.hand.hcf.core.exception.BizException;
  12. import com.hand.hcf.core.security.domain.PrincipalLite;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.beans.factory.annotation.Value;
  16. import org.springframework.transaction.annotation.Transactional;
  17. import org.springframework.web.bind.annotation.PostMapping;
  18. import org.springframework.web.bind.annotation.RequestBody;
  19. public abstract class AbstractWorkflowEventConsumerInterface implements WorkflowEventConsumerInterface {
  20. @Value("${spring.application.name:}")
  21. private String applicationName;
  22. private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowEventConsumerInterface.class);
  23. public AbstractWorkflowEventConsumerInterface() {
  24. }
  25. @Transactional(
  26. rollbackFor = {Exception.class}
  27. )
  28. public void workFlowConsumer(WorkflowCustomRemoteEvent workflowCustomRemoteEvent) {
  29. String destinationService = this.applicationName + ":**";
  30. WorkflowMessageCO workflowMessage = workflowCustomRemoteEvent.getWorkflowMessage();
  31. if (destinationService.equalsIgnoreCase(workflowCustomRemoteEvent.getDestinationService()) && workflowMessage.getStatus() > DocumentOperationEnum.APPROVAL.getId()) {
  32. if (logger.isInfoEnabled()) {
  33. logger.info("接收到工作流事件消息:" + workflowCustomRemoteEvent);
  34. }
  35. PrincipalLite userBean = workflowMessage.getUserBean();
  36. OrgInformationUtil.setAuthentication(userBean);
  37. this.doWorkFlowConsumer(workflowCustomRemoteEvent, workflowMessage);
  38. }
  39. }
  40. protected void doWorkFlowConsumer(WorkflowCustomRemoteEvent workflowCustomRemoteEvent, WorkflowMessageCO workflowMessage) {
  41. ApprovalNotificationCO approvalNotificationCO = new ApprovalNotificationCO();
  42. approvalNotificationCO.setDocumentId(workflowMessage.getDocumentId());
  43. approvalNotificationCO.setDocumentOid(workflowMessage.getEntityOid());
  44. approvalNotificationCO.setDocumentCategory(Integer.parseInt(workflowMessage.getEntityType()));
  45. approvalNotificationCO.setDocumentStatus(workflowMessage.getStatus());
  46. approvalNotificationCO.setDocumentTypeId(workflowMessage.getDocumentTypeId());
  47. approvalNotificationCO.setDocumentTypeCode(workflowMessage.getDocumentTypeCode());
  48. ApprovalResultCO approvalResultCO = this.approve(approvalNotificationCO);
  49. if (Boolean.FALSE.equals(approvalResultCO.getSuccess())) {
  50. throw new BizException(approvalResultCO.getError());
  51. }
  52. }
  53. @PostMapping({"/api/implement/workflow/approve"})
  54. public abstract ApprovalResultCO approve(@RequestBody ApprovalNotificationCO approvalNoticeCO);
  55. }

 最后在实现 AbstractWorkflowEventConsumerInterface  的类中,重写 approve 方法:

  1. package com.hand.hcf.app.expense.common.workflow;
  2. import com.codingapi.txlcn.tc.annotation.LcnTransaction;
  3. import com.hand.hcf.app.apply.prepayment.dto.CashPaymentRequisitionHeaderCO;
  4. import com.hand.hcf.app.client.org.OrganizationInterface;
  5. import com.hand.hcf.app.client.user.UserClient;
  6. import com.hand.hcf.app.expense.accrual.service.ExpenseAccrualHeaderService;
  7. import com.hand.hcf.app.expense.adjust.service.ExpenseAdjustHeaderService;
  8. import com.hand.hcf.app.expense.application.service.ApplicationHeaderService;
  9. import com.hand.hcf.app.expense.client.AccountingClient;
  10. import com.hand.hcf.app.expense.client.extraApi.FecPeripheralInterface;
  11. import com.hand.hcf.app.expense.client.extraApi.GetWorkflowInterface;
  12. import com.hand.hcf.app.expense.common.domain.enums.DocumentTypeEnum;
  13. import com.hand.hcf.app.expense.common.externalApi.OrganizationService;
  14. import com.hand.hcf.app.expense.common.externalApi.PrepaymentService;
  15. import com.hand.hcf.app.expense.common.utils.SyncLockPrefix;
  16. import com.hand.hcf.app.expense.report.domain.ExpenseReportHeader;
  17. import com.hand.hcf.app.expense.report.service.ExpenseReportHeaderService;
  18. import com.hand.hcf.app.expense.report.service.ExpenseReportPrintInfoService;
  19. import com.hand.hcf.app.expense.sftp.SFTP;
  20. import com.hand.hcf.app.expense.sftp.SftpConfig;
  21. import com.hand.hcf.app.expense.travel.service.TravelApplicationHeaderService;
  22. import com.hand.hcf.app.mdata.base.util.OrgInformationUtil;
  23. import com.hand.hcf.app.mdata.client.contact.ContactClient;
  24. import com.hand.hcf.app.mdata.client.workflow.WorkflowClient;
  25. import com.hand.hcf.app.mdata.client.workflow.WorkflowInterface;
  26. import com.hand.hcf.app.mdata.client.workflow.dto.ApprovalErrorDataCO;
  27. import com.hand.hcf.app.mdata.client.workflow.dto.ApprovalNotificationCO;
  28. import com.hand.hcf.app.mdata.client.workflow.dto.ApprovalResultCO;
  29. import com.hand.hcf.app.mdata.client.workflow.dto.WorkFlowDocumentRefCO;
  30. import com.hand.hcf.app.mdata.client.workflow.enums.DocumentOperationEnum;
  31. import com.hand.hcf.app.mdata.client.workflow.event.AbstractWorkflowEventConsumerInterface;
  32. import com.hand.hcf.core.exception.BizException;
  33. import com.hand.hcf.core.redisLock.annotations.LockedObject;
  34. import com.hand.hcf.core.redisLock.annotations.SyncLock;
  35. import com.jcraft.jsch.ChannelSftp;
  36. import jline.internal.Log;
  37. import org.springframework.amqp.core.Message;
  38. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  39. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  40. import org.springframework.beans.factory.annotation.Autowired;
  41. import org.springframework.http.ResponseEntity;
  42. import org.springframework.transaction.annotation.Transactional;
  43. import org.springframework.web.bind.annotation.GetMapping;
  44. import org.springframework.web.bind.annotation.PostMapping;
  45. import org.springframework.web.bind.annotation.RequestBody;
  46. import org.springframework.web.bind.annotation.RestController;
  47. import java.io.*;
  48. import java.time.ZonedDateTime;
  49. import java.util.Vector;
  50. @RestController
  51. public class WorkflowEventConsumer extends AbstractWorkflowEventConsumerInterface {
  52. // 接收消息队列的消息
  53. @LcnTransaction
  54. @Transactional(rollbackFor = Exception.class)
  55. @SyncLock(lockPrefix = SyncLockPrefix.PUBLIC_REPORT)
  56. @Override
  57. public ApprovalResultCO approve(@LockedObject("documentId")@RequestBody ApprovalNotificationCO approvalNoticeCO) { }
  58. }

 最后在approver方法里执行业务逻辑。

   

四、死信队列

1.什么是死信队列? 

       死信队列也是一种队列,用来存放未经消费的消息,可以设置一段时间后,会重新路由到其他设置的队列中,可以运用死信机制来实现延迟队列。

2.哪些情况消息会进入到死信队列中?

  •  生产者产生的消息被拒绝消费后,会进入到死信队列中。
  •  通过TTL设置的key超过最大的有效时间后,会进入到死信队列中。
  1. channel.QueueDeclare(queue: "q1",
  2. durable: false,
  3. exclusive: false,
  4. autoDelete: false,
  5. arguments: new Dictionary<string, object> {
  6. { "x-message-ttl",10000}
  7. //x-message-ttl即设置当前队列消息的过期时间。
  8. });
  •  队列中的消息存放满后,消息会进入到死信队列中。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/438764
推荐阅读
相关标签
  

闽ICP备14008679号