赞
踩
一、rabbitmq 配置文件 在web 项目开发过程中,一般分为生产者配置文件和消费者配置文件。废话少说,马上教您整个流程的配置!
1、准备工作:安装好rabbitmq,并在项目中增加配置文件 rabbit.properties 内容如下:
- rmq.ip=192.188.113.114
- rmq.port=5672
- rmq.producer.num=20
- rmq.manager.user=admin
- rmq.manager.password=admin
二、生产者配置文件:producer.xml
1、<!-- 创建连接类 连接安装好的 rabbitmq -->
-
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg value="localhost" />
- <property name="username" value="${rmq.manager.user}" />
- <property name="password" value="${rmq.manager.password}" />
- <property name="host" value="${rmq.ip}" />
- <property name="port" value="${rmq.port}" />
- </bean>
2、spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
3、 spring template 声明, durable:是否持久化 ; exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列.
queue 队列声明 需要发送消息到哪些队列 消息系统监听队列
- <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
- <rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />
4、消息失效后监听队列,6000 为 时间间隔信息 60s int或long类型,解决优先级问题
- <rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue">
- <rabbit:queue-arguments>
- <entry key="x-message-ttl">
- <value type="java.lang.Long">60000</value>
- </entry>
- <entry key="x-dead-letter-exchange" value="test_Exchange"/>
- </rabbit:queue-arguments>
- </rabbit:queue>
5、rabbitmq的三种模式:direct,fanout,topic 三种
direct 消息转换队列 绑定key,意思就是消息与一个特定的路由键匹配,会转发。rabbit:binding:设置消息queue匹配的key。
- <rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">
- <rabbit:bindings>
- <rabbit:binding queue="test_queue" key="test_key" />
- </rabbit:bindings>
- </rabbit:direct-exchange>
fanout 模式:客户端中只要是与该路由绑定在一起的队列都会收到相关消息,这类似广播,发送端不管队列是谁,都由客户端自己去绑定,谁需要数据谁去绑定自己的相应队列。
- <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
- <rabbit:bindings>
- <rabbit:binding queue="test_delay_queue"/>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
topic 模式:发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
<binding queue="testqueue" pattern="*.*.test1"
- <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
- <rabbit:bindings>
- <rabbit:binding queue="test_queue" pattern="test_key.*.*" />
- </rabbit:bindings>
- </rabbit:topic-exchange>
6、生产者(发送端)代码:
- @Resource
- private RabbitTemplate rabbitTemplate;
- public void sendMessage(CommonMessage msg){
- try {
- logger.error("发送信息开始");
- System.out.println(rabbitTemplate.getConnectionFactory().getHost());
- //发送信息 message-exchange 交换机 msg.getSource() 为 test_key
- rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
- logger.error("发送信息结束");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
三、消费者配置:同一个项目中 consumer.xml
1、连接服务配置
- <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" username="${rmq.manager.user}" password="${rmq.manager.password}" port="${rmq.port}" />
- <rabbit:admin connection-factory="connectionFactory" />
2、spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
3、自定义接口类
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>
4、queue 队列声明 需要发送消息到哪些队列 ,消息系统监听队列
<rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />
5、topic 模式 绑定。注意:此处仅写一种模式;根据需求可以配多种模式。
- <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
- <rabbit:bindings>
- <rabbit:binding queue="test_queue" pattern="test_key" />
- </rabbit:bindings>
- </rabbit:topic-exchange>
6、用于消息的监听的代理类MessageListenerAdapter
- <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
- <constructor-arg ref="testHandler" />//类名
- <property name="defaultListenerMethod" value="handlerTest"></property>//方法名
- <property name="messageConverter" ref="jsonMessageConverter"></property>
- </bean>
7、配置监听 acknowledeg = "manual" 设置手动应答 当消息处理失败时:会一直重发 直到消息处理成功,监听容器
acknowledge="auto" concurrency="30" 设置发送次数,最多发送30次
- <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
- <rabbit:listener queues="test_queue" ref="testQueueListenerAdapter" />
- </rabbit:listener-container>
8、消费端代码:TestHandler 类
- public class TestHandler {
- @Override
- public void handlerTest(CommonMessage commonMessage) {
- System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
- }
- }
总结:这是rabbitmq 从生产者到消费者 的整个流程配置,能够直接在web开发中使用,博主亲测使用;如果还有不明白的小伙伴,可以留言,博主定会详细回复!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。