当前位置:   article > 正文

MQ消息队列(二)RabbitMQ_rabbitmq缓存数据

rabbitmq缓存数据

首先了解RabbitMQ的成员

Producer:生产者,将消息发送到Exchange;

Exchange:交换器,将从生产者收到的消息路由到Queen;

Queen:存放消费者消费的消息;

Consumer:消费者,从Queen中获取消息;

BindingKey:绑定键,Exchange与Queen之间的的配置,换句话说就是Exchange将怎样的消息路由到Queen;

RoutingKey:路由键,Producer(生产者)将消息与路由键发送给Exchange(交换器),交换器对比Binding Key(绑定键)与Routing Key(路由键),将消息放到对应的Queen;

 简单队列 :

P:消息的⽣产者

C:消息的消费者

红⾊:队列

⽣产者将消息发送到队列,消费者从队列中获取消息。

 WORK模式: ⼀个⽣产者、2个消费者。 ⼀个消息只能被⼀个消费者获取。

 订阅模式:

解读:

1、1个⽣产者,多个消费者

2、每⼀个消费者都有⾃⼰的⼀个队列

3、⽣产者没有将消息直接发送到队列,⽽是发送到了交换机

4、每个队列都要绑定到交换机

5、⽣产者发送的消息,经过交换机,到达队列,实现,⼀个消息被多个消费者获取的⽬的

路由模式

解读:

和订阅模式的区别在于,路由模式通过指定key来确认消息路由到那个队列;

 spring集成RabbitMQ

2:RabbitMQ配置文件service-product-rabbit-receiver-context.xml 

  1. <!-- 公共部分 -->
  2. <!-- 创建连接类 连接安装好的 rabbitmq -->
  3. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  4. <constructor-arg value="localhost" />
  5. <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
  6. <property name="username" value="${rmq.manager.user}" />
  7. <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
  8. <property name="password" value="${rmq.manager.password}" />
  9. <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
  10. <property name="host" value="${rmq.ip}" />
  11. <!-- port,RabbitMQ服务端口,默认值为5672 -->
  12. <property name="port" value="${rmq.port}" />
  13. <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
  14. <property name="channel-cache-size" value="50" />
  15. <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
  16. <property name="cache-mode" value="CHANNEL" />
  17. </bean>
  18. <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
  19. <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
  20. username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
  21. <rabbit:admin connection-factory="connectionFactory"/>
  22. <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
  23. <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
  24. <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
  25. <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />
  26. <!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
  27. <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
  28. <rabbit:bindings>
  29. <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
  30. <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
  31. <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
  32. </rabbit:bindings>
  33. </rabbit:fanout-exchange>
  34. <!-- 生产者部分 -->
  35. <!-- 发送消息的producer类,也就是生产者 -->
  36. <bean id="msgProducer" class="com.asdf.sdf.ClassA">
  37. <!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 -->
  38. <property name="queueName" value="{alert.queue.1}"/>
  39. </bean>
  40. <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
  41. <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
  42. <!-- 或者配置jackson -->
  43. <!--
  44. <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
  45. -->
  46. <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
  47. <!-- 消费者部分 -->
  48. <!-- 自定义接口类 -->
  49. <bean id="testHandler" class="com.rabbit.TestHandler"></bean>
  50. <!-- 用于消息的监听的代理类MessageListenerAdapter -->
  51. <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
  52. <!-- 类名 -->
  53.     <constructor-arg ref="testHandler" />
  54. <!-- 方法名 -->
  55.     <property name="defaultListenerMethod" value="handlerTest"></property>
  56. <property name="messageConverter" ref="jsonMessageConverter"></property>
  57. </bean>
  58. <!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
  59. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
  60. <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
  61.     <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
  62.     <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
  63. </rabbit:listener-container>

生产者代码:

  1. @Resource
  2. private RabbitTemplate rabbitTemplate;
  3. public void sendMessage(CommonMessage msg){
  4. try {
  5. logger.error("发送信息开始");
  6. System.out.println(rabbitTemplate.getConnectionFactory().getHost());
  7. //发送信息 message-exchange 交换机 msg.getSource() 为 test_key
  8. rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
  9. logger.error("发送信息结束");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. }

消费者代码:

  1. public class TestHandler {
  2. @Override
  3. public void handlerTest(CommonMessage commonMessage) {
  4. System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
  5. }
  6. }

 注:暂时只记录了这些,后面了解到更多的相关知识再补充,如果有什么错误请评论指正,我会立即做出修改。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/139097
推荐阅读
相关标签
  

闽ICP备14008679号