当前位置:   article > 正文

用阻塞式来接收RabbitMQ的消息_阻塞式rabbitmq

阻塞式rabbitmq

目录

一、阻塞式

二、连接RabbitMQ

三、封装新订单消息 

四、发送消息

1. 同步发送消息

2. 异步发送消息

五、接收消息

六、删除与清空队列

七、程序测试


一、阻塞式

        RabbitMQ提供了阻塞和非阻塞两种收发消息的模式,默认在SpringBoot上面配置的都是非阻塞的模式。非阻塞模式不适合用在本案例中。因为非阻塞模式是后端Java程序依靠线程主动轮询消息队列,并不是移动端主动发起的请求。如果Java程序从RabbitMQ中获取到抢单消息,而移动端根本就没运行,你说这个抢单消息怎么发送给移动端?

        所以正确的做法是用阻塞式来接收RabbitMQ的消息,阻塞式顾名思义就是Java没收发完消息,绝对不往下执行其他代码。直到收完消息,然后把消息打包成R对象返回给移动端。

        虽然SpringBoot里面YML文件可以配置RabbitMQ,但是配置出来的是非阻塞的形式,这一点我们不能接受。所以我们在hxds-snm子系统的application.yml文件中,只定义了值注入的信息,然后手工编码的方式去连接RabbitMQ,这样就能使用阻塞式读写RabbitMQ的消息了。

  1. rabbitmq:
  2. host: localhost
  3. port: 5672
  4. username: guest
  5. password: guest

二、连接RabbitMQ

        在com.example.hxds.snm.config包中创建RabbitMQConfig类,里面封装创建RabbitMQ连接的方法。因为我提供给大家的RabbitMQ没有设置密码,所以创建连接的时候没用上用户名和密码。

  1. package com.example.hxds.snm.config;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. @Value("${rabbitmq.host}")
  9. private String host;
  10. @Value("${rabbitmq.port}")
  11. private int port;
  12. @Value("${rabbitmq.username}")
  13. private String username;
  14. @Value("${rabbitmq.password}")
  15. private String password;
  16. @Bean
  17. public ConnectionFactory getFactory() {
  18. ConnectionFactory factory = new ConnectionFactory();
  19. factory.setHost(host);
  20. factory.setPort(port);
  21. // factory.setUsername(username);
  22. // factory.setPassword(password);
  23. return factory;
  24. }
  25. }

三、封装新订单消息 

        因为新订单消息里面包含了很多内容,例如订单编号、上车点地址、终点地址、总里程、预估金额、上车点距离代驾司机的距离等等,我们在Web层和业务层之间传递新订单需要给方法定义很多参数,这太麻烦了,不如我们创建一个新订单消息的封装类。

        在com.example.hxds.snm.entity包中创建NewOrderMessage.java类,封装新订单消息。

  1. @Data
  2. public class NewOrderMessage {
  3. private String userId;
  4. private String orderId;
  5. private String from;
  6. private String to;
  7. private String expectsFee;
  8. private String mileage;
  9. private String minute;
  10. private String distance;
  11. private String favourFee;
  12. }

四、发送消息

        我们知道RabbitMQ收发消息可以分成阻塞和非阻塞,那么Java程序执行代码也可以分成两类:同步和异步。

        同步就是由当前线程来执行,我们平时写的Java代码都属于同步执行的。

        异步执行就是说这个任务我委派给其他线程去运行,我自己继续往下执剩余代码,其实就是我们平时用的多线程编程。

        我们写程序,干脆把同步和异步发送新订单消息的方法都给写出来,至于说某种场景用同步还是异步,由开发者决定。

1. 同步发送消息

        在com.example.hxds.snm.task包中创建NewOrderMassageTask.java类,在里面定义同步发送消息的代码。

  1. @Component
  2. @Slf4j
  3. public class NewOrderMassageTask {
  4. @Resource
  5. private ConnectionFactory factory;
  6. /**
  7. * 同步发送新订单消息
  8. */
  9. public void sendNewOrderMessage(ArrayList<NewOrderMessage> list) {
  10. int ttl = 1 * 60 * 1000; //新订单消息缓存过期时间1分钟
  11. String exchangeName = "new_order_private"; //交换机的名字
  12. try (
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. ) {
  16. //定义交换机,根据routing key路由消息
  17. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
  18. HashMap param = new HashMap();
  19. for (NewOrderMessage message : list) {
  20. //MQ消息的属性信息
  21. HashMap map = new HashMap();
  22. map.put("orderId", message.getOrderId());
  23. map.put("from", message.getFrom());
  24. map.put("to", message.getTo());
  25. map.put("expectsFee", message.getExpectsFee());
  26. map.put("mileage", message.getMileage());
  27. map.put("minute", message.getMinute());
  28. map.put("distance", message.getDistance());
  29. map.put("favourFee", message.getFavourFee());
  30. //创建消息属性对象
  31. //RabbitMQ的消息除了正文之外,还可以包含很多属性(可自定义)
  32. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().contentEncoding("UTF-8")
  33. .headers(map).expiration(ttl + "").build();
  34. String queueName = "queue_" + message.getUserId(); //队列名字
  35. String routingKey = message.getUserId(); //routing key
  36. //声明队列(持久化缓存消息,消息接收不加锁,消息全部接收完并不删除队列)
  37. channel.queueDeclare(queueName, true, false, false, param);
  38. channel.queueBind(queueName,exchangeName,routingKey);
  39. //向交换机发送消息,并附带routing key
  40. channel.basicPublish(exchangeName, routingKey, properties, ("新订单" + message.getOrderId()).getBytes());
  41. log.debug(message.getUserId() + "的新订单消息发送成功");
  42. }
  43. } catch (Exception e) {
  44. log.error("执行异常", e);
  45. throw new HxdsException("新订单消息发送失败");
  46. }
  47. }
  48. }

2. 异步发送消息

        在主类中我们已经添加了支持异步执行的注解,然后我们又创建了Java线程池,你可以确认一下。如果哪个方法需要异步执行,我们就在方法声明加上@Async注解。那么该方法执行的时候,就会自动委派给线程池的空闲线程,当前主线程会继续往下执行。

  1. @Component
  2. @Slf4j
  3. public class NewOrderMassageTask {
  4. ……
  5. /**
  6. * 异步发送新订单消息
  7. */
  8. @Async
  9. public void sendNewOrderMessageAsync(ArrayList<NewOrderMessage> list) {
  10. sendNewOrderMessage(list);
  11. }
  12. }

        发送新订单消息给适合接单的司机,我倾向于是用异步发送的方式。这是因为有可能附近适合接单的司机比较多,Java程序给这些司机的队列发送消息可能需要一定的耗时,这就会导致createNewOrder()执行时间太长,乘客端迟迟得不到响应,也不知道订单创建成功没有。如果采用异步发送消息就好多了,createNewOrder()函数把发送新订单消息的任务委派给某个空闲线程,自己可以继续往下执行,这样就不会让乘客端小程序等待太长时间,用户体验更好。

五、接收消息

        接收新订单消息这块,我们决不能搞异步接收。主线程已经返回R对象了,你这个异步接收才完事,你说接收到的消息怎么发送给移动端?所以接收消息,我们必须用同步方式。

  1. @Component
  2. @Slf4j
  3. public class NewOrderMassageTask {
  4. ……
  5. /**
  6. * 同步接收新订单消息
  7. */
  8. public List<NewOrderMessage> receiveNewOrderMessage(long userId) {
  9. String exchangeName = "new_order_private"; //交换机名字
  10. String queueName = "queue_" + userId; //队列名字
  11. String routingKey = userId + ""; //routing key
  12. List<NewOrderMessage> list = new ArrayList();
  13. try (Connection connection = factory.newConnection();
  14. Channel privateChannel = connection.createChannel();
  15. ) {
  16. //定义交换机,routing key模式
  17. privateChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
  18. //声明队列(持久化缓存消息,消息接收不加锁,消息全部接收完并不删除队列)
  19. privateChannel.queueDeclare(queueName, true, false, false, null);
  20. //绑定要接收的队列
  21. privateChannel.queueBind(queueName, exchangeName, routingKey);
  22. //为了避免一次性接收太多消息,我们采用限流的方式,每次接收10条消息,然后循环接收
  23. privateChannel.basicQos(0, 10, true);
  24. while (true) {
  25. //从队列中接收消息
  26. GetResponse response = privateChannel.basicGet(queueName, false);
  27. if (response != null) {
  28. //消息属性对象
  29. AMQP.BasicProperties properties = response.getProps();
  30. Map<String, Object> map = properties.getHeaders();
  31. String orderId = MapUtil.getStr(map, "orderId");
  32. String from = MapUtil.getStr(map, "from");
  33. String to = MapUtil.getStr(map, "to");
  34. String expectsFee = MapUtil.getStr(map, "expectsFee");
  35. String mileage = MapUtil.getStr(map, "mileage");
  36. String minute = MapUtil.getStr(map, "minute");
  37. String distance = MapUtil.getStr(map, "distance");
  38. String favourFee = MapUtil.getStr(map, "favourFee");
  39. //把新订单的消息封装到对象中
  40. NewOrderMessage message = new NewOrderMessage();
  41. message.setOrderId(orderId);
  42. message.setFrom(from);
  43. message.setTo(to);
  44. message.setExpectsFee(expectsFee);
  45. message.setMileage(mileage);
  46. message.setMinute(minute);
  47. message.setDistance(distance);
  48. message.setFavourFee(favourFee);
  49. list.add(message);
  50. byte[] body = response.getBody();
  51. String msg = new String(body);
  52. log.debug("从RabbitMQ接收的订单消息:" + msg);
  53. //确认收到消息,让MQ删除该消息
  54. long deliveryTag = response.getEnvelope().getDeliveryTag();
  55. privateChannel.basicAck(deliveryTag, false);
  56. } else {
  57. break;
  58. }
  59. }
  60. ListUtil.reverse(list); //消息倒叙,新消息排在前面
  61. return list;
  62. } catch (Exception e) {
  63. log.error("执行异常", e);
  64. throw new HxdsException("接收新订单失败");
  65. }
  66. }
  67. }

六、删除与清空队列

        最后我们把删除队列和清空队列消息的代码给写一下,都有同步和异步两种方式。

  1. @Component
  2. @Slf4j
  3. public class NewOrderMassageTask {
  4. ……
  5. /**
  6. * 同步删除新订单消息队列
  7. */
  8. public void deleteNewOrderQueue(long userId) {
  9. String exchangeName = "new_order_private"; //交换机名字
  10. String queueName = "queue_" + userId; //队列名字
  11. try (Connection connection = factory.newConnection();
  12. Channel privateChannel = connection.createChannel();
  13. ) {
  14. //定义交换机
  15. privateChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
  16. //删除队列
  17. privateChannel.queueDelete(queueName);
  18. log.debug(userId + "的新订单消息队列成功删除");
  19. } catch (Exception e) {
  20. log.error(userId + "的新订单队列删除失败", e);
  21. throw new HxdsException("新订单队列删除失败");
  22. }
  23. }
  24. /**
  25. * 异步删除新订单消息队列
  26. */
  27. @Async
  28. public void deleteNewOrderQueueAsync(long userId) {
  29. deleteNewOrderQueue(userId);
  30. }
  31. /**
  32. * 同步清空新订单消息队列
  33. */
  34. public void clearNewOrderQueue(long userId) {
  35. String exchangeName = "new_order_private";
  36. String queueName = "queue_" + userId;
  37. try (Connection connection = factory.newConnection();
  38. Channel privateChannel = connection.createChannel();
  39. ) {
  40. privateChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
  41. privateChannel.queuePurge(queueName);
  42. log.debug(userId + "的新订单消息队列清空删除");
  43. } catch (Exception e) {
  44. log.error(userId + "的新订单队列清空失败", e);
  45. throw new HxdsException("新订单队列清空失败");
  46. }
  47. }
  48. /**
  49. * 异步清空新订单消息队列
  50. */
  51. @Async
  52. public void clearNewOrderQueueAsync(long userId) {
  53. clearNewOrderQueue(userId);
  54. }
  55. }

七、程序测试

        因为乘客下单和司机接单业务流程中,需要用到发送消息和接收消息,那么咱们就来测试一下刚才封装的代码。在test目录之下创建测试类。

  1. package com.example.hxds.snm;
  2. import com.example.hxds.snm.entity.NewOrderMessage;
  3. import com.example.hxds.snm.task.NewOrderMassageTask;
  4. import org.junit.jupiter.api.Test;
  5. import org.junit.jupiter.api.Timeout;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import javax.annotation.Resource;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. @SpringBootTest
  11. public class Demo {
  12. @Resource
  13. private NewOrderMassageTask task;
  14. @Test
  15. public void send() {
  16. NewOrderMessage message = new NewOrderMessage();
  17. message.setUserId("9527");
  18. message.setFrom("沈阳北站");
  19. message.setTo("沈阳站");
  20. message.setDistance("3.2");
  21. message.setExpectsFee("46.0");
  22. message.setMileage("18.6");
  23. message.setMinute("18");
  24. message.setFavourFee("0.0");
  25. ArrayList list = new ArrayList() {{
  26. add(message);
  27. }};
  28. task.sendNewOrderMessageAsync(list);
  29. }
  30. @Test
  31. public void recieve() {
  32. List<NewOrderMessage> list = task.receiveNewOrderMessage(9527);
  33. list.forEach(one->{
  34. System.out.println(one.getFrom());
  35. System.out.println(one.getTo());
  36. });
  37. }
  38. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/384351
推荐阅读
相关标签
  

闽ICP备14008679号