赞
踩
首先了解RabbitMQ的成员
Producer:生产者,将消息发送到Exchange;
Exchange:交换器,将从生产者收到的消息路由到Queen;
Queen:存放消费者消费的消息;
Consumer:消费者,从Queen中获取消息;
BindingKey:绑定键,Exchange与Queen之间的的配置,换句话说就是Exchange将怎样的消息路由到Queen;
RoutingKey:路由键,Producer(生产者)将消息与路由键发送给Exchange(交换器),交换器对比Binding Key(绑定键)与Routing Key(路由键),将消息放到对应的Queen;
简单队列 :
P:消息的⽣产者
C:消息的消费者
红⾊:队列
⽣产者将消息发送到队列,消费者从队列中获取消息。
WORK模式: ⼀个⽣产者、2个消费者。 ⼀个消息只能被⼀个消费者获取。
订阅模式:
解读:
1、1个⽣产者,多个消费者
2、每⼀个消费者都有⾃⼰的⼀个队列
3、⽣产者没有将消息直接发送到队列,⽽是发送到了交换机
4、每个队列都要绑定到交换机
5、⽣产者发送的消息,经过交换机,到达队列,实现,⼀个消息被多个消费者获取的⽬的
路由模式:
解读:
和订阅模式的区别在于,路由模式通过指定key来确认消息路由到那个队列;
2:RabbitMQ配置文件service-product-rabbit-receiver-context.xml
- <!-- 公共部分 -->
- <!-- 创建连接类 连接安装好的 rabbitmq -->
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="localhost" />
- <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
- <property name="username" value="${rmq.manager.user}" />
- <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
- <property name="password" value="${rmq.manager.password}" />
- <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
- <property name="host" value="${rmq.ip}" />
- <!-- port,RabbitMQ服务端口,默认值为5672 -->
- <property name="port" value="${rmq.port}" />
- <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
- <property name="channel-cache-size" value="50" />
- <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
- <property name="cache-mode" value="CHANNEL" />
- </bean>
- <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
- <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
- username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
- <rabbit:admin connection-factory="connectionFactory"/>
-
- <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
- <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
- <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
- <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />
-
- <!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
- <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
- <rabbit:bindings>
- <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
- <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
- <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
-
- <!-- 生产者部分 -->
- <!-- 发送消息的producer类,也就是生产者 -->
- <bean id="msgProducer" class="com.asdf.sdf.ClassA">
- <!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 -->
- <property name="queueName" value="{alert.queue.1}"/>
- </bean>
-
- <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
- <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
- <!-- 或者配置jackson -->
- <!--
- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
- -->
-
- <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
-
- <!-- 消费者部分 -->
- <!-- 自定义接口类 -->
- <bean id="testHandler" class="com.rabbit.TestHandler"></bean>
-
- <!-- 用于消息的监听的代理类MessageListenerAdapter -->
- <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
- <!-- 类名 -->
- <constructor-arg ref="testHandler" />
- <!-- 方法名 -->
- <property name="defaultListenerMethod" value="handlerTest"></property>
- <property name="messageConverter" ref="jsonMessageConverter"></property>
- </bean>
-
- <!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
- <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
- <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
- <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
- <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
- </rabbit:listener-container>
生产者代码:
- @Resource
- private RabbitTemplate rabbitTemplate;
- public void sendMessage(CommonMessage msg){
- try {
- logger.error("发送信息开始");
- System.out.println(rabbitTemplate.getConnectionFactory().getHost());
- //发送信息 message-exchange 交换机 msg.getSource() 为 test_key
- rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
- logger.error("发送信息结束");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
消费者代码:
- public class TestHandler {
- @Override
- public void handlerTest(CommonMessage commonMessage) {
- System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
- }
- }
注:暂时只记录了这些,后面了解到更多的相关知识再补充,如果有什么错误请评论指正,我会立即做出修改。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。