当前位置:   article > 正文

java实现rabbitmq消息的发送接受_rabbitmq 消费者接收对象

rabbitmq 消费者接收对象

RabbitMQ技术学习更多资源请访问 https://www.itkc8.com 

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see http://www.rabbitmq.com/tutorials/tutorial-one-java.html

see http://www.springsource.org/spring-amqp

 

 

  1. <!-- for rabbitmq -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>2.8.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.amqp</groupId>
  9. <artifactId>spring-amqp</artifactId>
  10. <version>1.1.1.RELEASE</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.amqp</groupId>
  14. <artifactId>spring-rabbit</artifactId>
  15. <version>1.1.1.RELEASE</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>com.caucho</groupId>
  19. <artifactId>hessian</artifactId>
  20. <version>4.0.7</version>
  21. </dependency>
  22. </dependencies>


首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

 

 

  1. public class EventMessage implements Serializable{
  2. private String queueName;
  3. private String exchangeName;
  4. private byte[] eventData;
  5. public EventMessage(String queueName, String exchangeName, byte[] eventData) {
  6. this.queueName = queueName;
  7. this.exchangeName = exchangeName;
  8. this.eventData = eventData;
  9. }
  10. public EventMessage() {
  11. }
  12. public String getQueueName() {
  13. return queueName;
  14. }
  15. public String getExchangeName() {
  16. return exchangeName;
  17. }
  18. public byte[] getEventData() {
  19. return eventData;
  20. }
  21. @Override
  22. public String toString() {
  23. return "EopEventMessage [queueName=" + queueName + ", exchangeName="
  24. + exchangeName + ", eventData=" + Arrays.toString(eventData)
  25. + "]";
  26. }
  27. }


为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

 

 

  1. public interface CodecFactory {
  2. byte[] serialize(Object obj) throws IOException;
  3. Object deSerialize(byte[] in) throws IOException;
  4. }


下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

 

 

  1. public class HessionCodecFactory implements CodecFactory {
  2. private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
  3. @Override
  4. public byte[] serialize(Object obj) throws IOException {
  5. ByteArrayOutputStream baos = null;
  6. HessianOutput output = null;
  7. try {
  8. baos = new ByteArrayOutputStream(1024);
  9. output = new HessianOutput(baos);
  10. output.startCall();
  11. output.writeObject(obj);
  12. output.completeCall();
  13. } catch (final IOException ex) {
  14. throw ex;
  15. } finally {
  16. if (output != null) {
  17. try {
  18. baos.close();
  19. } catch (final IOException ex) {
  20. this.logger.error("Failed to close stream.", ex);
  21. }
  22. }
  23. }
  24. return baos != null ? baos.toByteArray() : null;
  25. }
  26. @Override
  27. public Object deSerialize(byte[] in) throws IOException {
  28. Object obj = null;
  29. ByteArrayInputStream bais = null;
  30. HessianInput input = null;
  31. try {
  32. bais = new ByteArrayInputStream(in);
  33. input = new HessianInput(bais);
  34. input.startReply();
  35. obj = input.readObject();
  36. input.completeReply();
  37. } catch (final IOException ex) {
  38. throw ex;
  39. } catch (final Throwable e) {
  40. this.logger.error("Failed to decode object.", e);
  41. } finally {
  42. if (input != null) {
  43. try {
  44. bais.close();
  45. } catch (final IOException ex) {
  46. this.logger.error("Failed to close stream.", ex);
  47. }
  48. }
  49. }
  50. return obj;
  51. }
  52. }


接下来就先实现发送功能,新增一个接口专门用来实现发送功能

 

 

 

 

  1. public interface EventTemplate {
  2. void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
  3. void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
  4. }


SendRefuseException是自定义的发送失败异常类

 

下面是它的实现类,主要的任务就是将数据转换为EventMessage

  1. public class DefaultEventTemplate implements EventTemplate {
  2. private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
  3. private AmqpTemplate eventAmqpTemplate;
  4. private CodecFactory defaultCodecFactory;
  5. // private DefaultEventController eec;
  6. //
  7. // public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
  8. // CodecFactory defaultCodecFactory, DefaultEventController eec) {
  9. // this.eventAmqpTemplate = eopAmqpTemplate;
  10. // this.defaultCodecFactory = defaultCodecFactory;
  11. // this.eec = eec;
  12. // }
  13. public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
  14. this.eventAmqpTemplate = eopAmqpTemplate;
  15. this.defaultCodecFactory = defaultCodecFactory;
  16. }
  17. @Override
  18. public void send(String queueName, String exchangeName, Object eventContent)
  19. throws SendRefuseException {
  20. this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
  21. }
  22. @Override
  23. public void send(String queueName, String exchangeName, Object eventContent,
  24. CodecFactory codecFactory) throws SendRefuseException {
  25. if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
  26. throw new SendRefuseException("queueName exchangeName can not be empty.");
  27. }
  28. // if (!eec.beBinded(exchangeName, queueName))
  29. // eec.declareBinding(exchangeName, queueName);
  30. byte[] eventContentBytes = null;
  31. if (codecFactory == null) {
  32. if (eventContent == null) {
  33. logger.warn("Find eventContent is null,are you sure...");
  34. } else {
  35. throw new SendRefuseException(
  36. "codecFactory must not be null ,unless eventContent is null");
  37. }
  38. } else {
  39. try {
  40. eventContentBytes = codecFactory.serialize(eventContent);
  41. } catch (IOException e) {
  42. throw new SendRefuseException(e);
  43. }
  44. }
  45. // 构造成Message
  46. EventMessage msg = new EventMessage(queueName, exchangeName,
  47. eventContentBytes);
  48. try {
  49. eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
  50. } catch (AmqpException e) {
  51. logger.error("send event fail. Event Message : [" + eventContent + "]", e);
  52. throw new SendRefuseException("send event fail", e);
  53. }
  54. }
  55. }


注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明

 

然后我们再实现接受消息

首先我们需要一个消费接口,所有的消费程序都实现这个类

  1. public interface EventProcesser {
  2. public void process(Object e);
  3. }


为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

 

  1. /**
  2. * MessageListenerAdapter的Pojo
  3. * <p>消息处理适配器,主要功能:</p>
  4. * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
  5. * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
  6. *
  7. */
  8. public class MessageAdapterHandler {
  9. private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
  10. private ConcurrentMap<String, EventProcessorWrap> epwMap;
  11. public MessageAdapterHandler() {
  12. this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
  13. }
  14. public void handleMessage(EventMessage eem) {
  15. logger.debug("Receive an EventMessage: [" + eem + "]");
  16. // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
  17. if (eem == null) {
  18. logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
  19. return;
  20. }
  21. if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
  22. logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
  23. return;
  24. }
  25. // 解码,并交给对应的EventHandle执行
  26. EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
  27. if (eepw == null) {
  28. logger.warn("Receive an EopEventMessage, but no processor can do it.");
  29. return;
  30. }
  31. try {
  32. eepw.process(eem.getEventData());
  33. } catch (IOException e) {
  34. logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
  35. return;
  36. }
  37. }
  38. protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
  39. if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
  40. throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
  41. }
  42. EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
  43. EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
  44. if (oldProcessorWrap != null) {
  45. logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
  46. }
  47. }
  48. protected Set<String> getAllBinding() {
  49. Set<String> keySet = epwMap.keySet();
  50. return keySet;
  51. }
  52. protected static class EventProcessorWrap {
  53. private CodecFactory codecFactory;
  54. private EventProcesser eep;
  55. protected EventProcessorWrap(CodecFactory codecFactory,
  56. EventProcesser eep) {
  57. this.codecFactory = codecFactory;
  58. this.eep = eep;
  59. }
  60. public void process(byte[] eventData) throws IOException{
  61. Object obj = codecFactory.deSerialize(eventData);
  62. eep.process(obj);
  63. }
  64. }
  65. }


这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

 

 

 

 

  1. public class MessageErrorHandler implements ErrorHandler{
  2. private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
  3. @Override
  4. public void handleError(Throwable t) {
  5. logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
  6. }
  7. }


接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

 

 

 

 

  1. public class EventControlConfig {
  2. private final static int DEFAULT_PORT = 5672;
  3. private final static String DEFAULT_USERNAME = "guest";
  4. private final static String DEFAULT_PASSWORD = "guest";
  5. private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
  6. private static final int PREFETCH_SIZE = 1;
  7. private String serverHost ;
  8. private int port = DEFAULT_PORT;
  9. private String username = DEFAULT_USERNAME;
  10. private String password = DEFAULT_PASSWORD;
  11. private String virtualHost;
  12. /**
  13. * 和rabbitmq建立连接的超时时间
  14. */
  15. private int connectionTimeout = 0;
  16. /**
  17. * 事件消息处理线程数,默认是 CPU核数 * 2
  18. */
  19. private int eventMsgProcessNum;
  20. /**
  21. * 每次消费消息的预取值
  22. */
  23. private int prefetchSize;
  24. public EventControlConfig(String serverHost) {
  25. this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
  26. }
  27. public EventControlConfig(String serverHost, int port, String username,
  28. String password, String virtualHost, int connectionTimeout,
  29. int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
  30. this.serverHost = serverHost;
  31. this.port = port>0?port:DEFAULT_PORT;
  32. this.username = username;
  33. this.password = password;
  34. this.virtualHost = virtualHost;
  35. this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
  36. this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
  37. this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
  38. }
  39. public String getServerHost() {
  40. return serverHost;
  41. }
  42. public int getPort() {
  43. return port;
  44. }
  45. public String getUsername() {
  46. return username;
  47. }
  48. public String getPassword() {
  49. return password;
  50. }
  51. public String getVirtualHost() {
  52. return virtualHost;
  53. }
  54. public int getConnectionTimeout() {
  55. return connectionTimeout;
  56. }
  57. public int getEventMsgProcessNum() {
  58. return eventMsgProcessNum;
  59. }
  60. public int getPrefetchSize() {
  61. return prefetchSize;
  62. }
  63. }


具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

 

 

 

 

  1. public interface EventController {
  2. /**
  3. * 控制器启动方法
  4. */
  5. void start();
  6. /**
  7. * 获取发送模版
  8. */
  9. EventTemplate getEopEventTemplate();
  10. /**
  11. * 绑定消费程序到对应的exchange和queue
  12. */
  13. EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
  14. /*in map, the key is queue name, but value is exchange name*/
  15. EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
  16. }


它的实现类如下:

 

 

 

 

 

  1. /**
  2. * 和rabbitmq通信的控制器,主要负责:
  3. * <p>1、和rabbitmq建立连接</p>
  4. * <p>2、声明exChange和queue以及它们的绑定关系</p>
  5. * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
  6. * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
  7. * @author yangyong
  8. *
  9. */
  10. public class DefaultEventController implements EventController {
  11. private CachingConnectionFactory rabbitConnectionFactory;
  12. private EventControlConfig config;
  13. private RabbitAdmin rabbitAdmin;
  14. private CodecFactory defaultCodecFactory = new HessionCodecFactory();
  15. private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
  16. private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
  17. private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
  18. //queue cache, key is exchangeName
  19. private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
  20. //queue cache, key is queueName
  21. private Map<String, Queue> queues = new HashMap<String, Queue>();
  22. //bind relation of queue to exchange cache, value is exchangeName | queueName
  23. private Set<String> binded = new HashSet<String>();
  24. private EventTemplate eventTemplate; // 给App使用的Event发送客户端
  25. private AtomicBoolean isStarted = new AtomicBoolean(false);
  26. private static DefaultEventController defaultEventController;
  27. public synchronized static DefaultEventController getInstance(EventControlConfig config){
  28. if(defaultEventController==null){
  29. defaultEventController = new DefaultEventController(config);
  30. }
  31. return defaultEventController;
  32. }
  33. private DefaultEventController(EventControlConfig config){
  34. if (config == null) {
  35. throw new IllegalArgumentException("Config can not be null.");
  36. }
  37. this.config = config;
  38. initRabbitConnectionFactory();
  39. // 初始化AmqpAdmin
  40. rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
  41. // 初始化RabbitTemplate
  42. RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
  43. rabbitTemplate.setMessageConverter(serializerMessageConverter);
  44. eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
  45. }
  46. /**
  47. * 初始化rabbitmq连接
  48. */
  49. private void initRabbitConnectionFactory() {
  50. rabbitConnectionFactory = new CachingConnectionFactory();
  51. rabbitConnectionFactory.setHost(config.getServerHost());
  52. rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
  53. rabbitConnectionFactory.setPort(config.getPort());
  54. rabbitConnectionFactory.setUsername(config.getUsername());
  55. rabbitConnectionFactory.setPassword(config.getPassword());
  56. if (!StringUtils.isEmpty(config.getVirtualHost())) {
  57. rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
  58. }
  59. }
  60. /**
  61. * 注销程序
  62. */
  63. public synchronized void destroy() throws Exception {
  64. if (!isStarted.get()) {
  65. return;
  66. }
  67. msgListenerContainer.stop();
  68. eventTemplate = null;
  69. rabbitAdmin = null;
  70. rabbitConnectionFactory.destroy();
  71. }
  72. @Override
  73. public void start() {
  74. if (isStarted.get()) {
  75. return;
  76. }
  77. Set<String> mapping = msgAdapterHandler.getAllBinding();
  78. for (String relation : mapping) {
  79. String[] relaArr = relation.split("\\|");
  80. declareBinding(relaArr[1], relaArr[0]);
  81. }
  82. initMsgListenerAdapter();
  83. isStarted.set(true);
  84. }
  85. /**
  86. * 初始化消息监听器容器
  87. */
  88. private void initMsgListenerAdapter(){
  89. MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
  90. msgListenerContainer = new SimpleMessageListenerContainer();
  91. msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
  92. msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
  93. msgListenerContainer.setMessageListener(listener);
  94. msgListenerContainer.setErrorHandler(new MessageErrorHandler());
  95. msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
  96. msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
  97. msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
  98. msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));
  99. msgListenerContainer.start();
  100. }
  101. @Override
  102. public EventTemplate getEopEventTemplate() {
  103. return eventTemplate;
  104. }
  105. @Override
  106. public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
  107. return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
  108. }
  109. public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
  110. msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
  111. if(isStarted.get()){
  112. initMsgListenerAdapter();
  113. }
  114. return this;
  115. }
  116. @Override
  117. public EventController add(Map<String, String> bindings,
  118. EventProcesser eventProcesser) {
  119. return add(bindings, eventProcesser,defaultCodecFactory);
  120. }
  121. public EventController add(Map<String, String> bindings,
  122. EventProcesser eventProcesser, CodecFactory codecFactory) {
  123. for(Map.Entry<String, String> item: bindings.entrySet())
  124. msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
  125. return this;
  126. }
  127. /**
  128. * exchange和queue是否已经绑定
  129. */
  130. protected boolean beBinded(String exchangeName, String queueName) {
  131. return binded.contains(exchangeName+"|"+queueName);
  132. }
  133. /**
  134. * 声明exchange和queue已经它们的绑定关系
  135. */
  136. protected synchronized void declareBinding(String exchangeName, String queueName) {
  137. String bindRelation = exchangeName+"|"+queueName;
  138. if (binded.contains(bindRelation)) return;
  139. boolean needBinding = false;
  140. DirectExchange directExchange = exchanges.get(exchangeName);
  141. if(directExchange == null) {
  142. directExchange = new DirectExchange(exchangeName, true, false, null);
  143. exchanges.put(exchangeName, directExchange);
  144. rabbitAdmin.declareExchange(directExchange);//声明exchange
  145. needBinding = true;
  146. }
  147. Queue queue = queues.get(queueName);
  148. if(queue == null) {
  149. queue = new Queue(queueName, true, false, false);
  150. queues.put(queueName, queue);
  151. rabbitAdmin.declareQueue(queue); //声明queue
  152. needBinding = true;
  153. }
  154. if(needBinding) {
  155. Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
  156. rabbitAdmin.declareBinding(binding);//声明绑定关系
  157. binded.add(bindRelation);
  158. }
  159. }
  160. }

 

 

 

 

 

搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO

  1. @SuppressWarnings("serial")
  2. public class People implements Serializable{
  3. private int id;
  4. private String name;
  5. private boolean male;
  6. private People spouse;
  7. private List<People> friends;
  8. public int getId() {
  9. return id;
  10. }
  11. public void setId(int id) {
  12. this.id = id;
  13. }
  14. public String getName() {
  15. return name;
  16. }
  17. public void setName(String name) {
  18. this.name = name;
  19. }
  20. public boolean isMale() {
  21. return male;
  22. }
  23. public void setMale(boolean male) {
  24. this.male = male;
  25. }
  26. public People getSpouse() {
  27. return spouse;
  28. }
  29. public void setSpouse(People spouse) {
  30. this.spouse = spouse;
  31. }
  32. public List<People> getFriends() {
  33. return friends;
  34. }
  35. public void setFriends(List<People> friends) {
  36. this.friends = friends;
  37. }
  38. @Override
  39. public String toString() {
  40. // TODO Auto-generated method stub
  41. return "People[id="+id+",name="+name+",male="+male+"]";
  42. }
  43. }


建立单元测试

 

  1. public class RabbitMqTest{
  2. private String defaultHost = "127.0.0.1";
  3. private String defaultExchange = "EXCHANGE_DIRECT_TEST";
  4. private String defaultQueue = "QUEUE_TEST";
  5. private DefaultEventController controller;
  6. private EventTemplate eventTemplate;
  7. @Before
  8. public void init() throws IOException{
  9. EventControlConfig config = new EventControlConfig(defaultHost);
  10. controller = DefaultEventController.getInstance(config);
  11. eventTemplate = controller.getEopEventTemplate();
  12. controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
  13. controller.start();
  14. }
  15. @Test
  16. public void sendString() throws SendRefuseException{
  17. eventTemplate.send(defaultQueue, defaultExchange, "hello world");
  18. }
  19. @Test
  20. public void sendObject() throws SendRefuseException{
  21. eventTemplate.send(defaultQueue, defaultExchange, mockObj());
  22. }
  23. @Test
  24. public void sendTemp() throws SendRefuseException, InterruptedException{
  25. String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
  26. String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
  27. eventTemplate.send(tempQueue, tempExchange, mockObj());
  28. //发送成功后此时不会接受到消息,还需要绑定对应的消费程序
  29. controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
  30. }
  31. @After
  32. public void end() throws InterruptedException{
  33. Thread.sleep(2000);
  34. }
  35. private People mockObj(){
  36. People jack = new People();
  37. jack.setId(1);
  38. jack.setName("JACK");
  39. jack.setMale(true);
  40. List<People> friends = new ArrayList<>();
  41. friends.add(jack);
  42. People hanMeiMei = new People();
  43. hanMeiMei.setId(1);
  44. hanMeiMei.setName("韩梅梅");
  45. hanMeiMei.setMale(false);
  46. hanMeiMei.setFriends(friends);
  47. People liLei = new People();
  48. liLei.setId(2);
  49. liLei.setName("李雷");
  50. liLei.setMale(true);
  51. liLei.setFriends(friends);
  52. liLei.setSpouse(hanMeiMei);
  53. hanMeiMei.setSpouse(liLei);
  54. return hanMeiMei;
  55. }
  56. class ApiProcessEventProcessor implements EventProcesser{
  57. @Override
  58. public void process(Object e) {//消费程序这里只是打印信息
  59. Assert.assertNotNull(e);
  60. System.out.println(e);
  61. if(e instanceof People){
  62. People people = (People)e;
  63. System.out.println(people.getSpouse());
  64. System.out.println(people.getFriends());
  65. }
  66. }
  67. }
  68. }


源码地址请点击这里

 

RabbitMQ技术学习更多资源请访问 https://www.itkc8.com 

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/384333
推荐阅读
相关标签
  

闽ICP备14008679号