当前位置:   article > 正文

RabbitMQ 配置文件详解(生产者和消费者)_rabbit mq producer怎么配置

rabbit mq producer怎么配置

一、rabbitmq 配置文件 在web 项目开发过程中,一般分为生产者配置文件和消费者配置文件。废话少说,马上教您整个流程的配置!

1、准备工作:安装好rabbitmq,并在项目中增加配置文件   rabbit.properties 内容如下:

  1. rmq.ip=192.188.113.114
  2. rmq.port=5672
  3. rmq.producer.num=20
  4. rmq.manager.user=admin
  5. rmq.manager.password=admin

二、生产者配置文件:producer.xml

1、<!-- 创建连接类 连接安装好的 rabbitmq -->

  1. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  2. <constructor-arg value="localhost" />
  3. <property name="username" value="${rmq.manager.user}" />
  4. <property name="password" value="${rmq.manager.password}" />
  5. <property name="host" value="${rmq.ip}" />
  6. <property name="port" value="${rmq.port}" />
  7. </bean>

2、spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现

<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>

3、 spring template   声明, durable:是否持久化 ; exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列.

       queue 队列声明 需要发送消息到哪些队列 消息系统监听队列

  1. <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
  2. <rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />

4、消息失效后监听队列,6000 为 时间间隔信息 60s   int或long类型,解决优先级问题

  1. <rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue">
  2. <rabbit:queue-arguments>
  3. <entry key="x-message-ttl">
  4. <value type="java.lang.Long">60000</value>
  5. </entry>
  6. <entry key="x-dead-letter-exchange" value="test_Exchange"/>
  7. </rabbit:queue-arguments>
  8. </rabbit:queue>

5、rabbitmq的三种模式:direct,fanout,topic 三种

direct 消息转换队列 绑定key,意思就是消息与一个特定的路由键匹配,会转发。rabbit:binding:设置消息queue匹配的key。

  1. <rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">
  2. <rabbit:bindings>
  3. <rabbit:binding queue="test_queue" key="test_key" />
  4. </rabbit:bindings>
  5. </rabbit:direct-exchange>

fanout 模式:客户端中只要是与该路由绑定在一起的队列都会收到相关消息,这类似广播,发送端不管队列是谁,都由客户端自己去绑定,谁需要数据谁去绑定自己的相应队列。

  1. <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
  2. <rabbit:bindings>
  3. <rabbit:binding queue="test_delay_queue"/>
  4. </rabbit:bindings>
  5. </rabbit:fanout-exchange>

topic 模式:发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

    <binding queue="testqueue" pattern="*.*.test1"

  1. <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
  2. <rabbit:bindings>
  3. <rabbit:binding queue="test_queue" pattern="test_key.*.*" />
  4. </rabbit:bindings>
  5. </rabbit:topic-exchange>

6、生产者(发送端)代码:

  1. @Resource  
  2. private RabbitTemplate rabbitTemplate; 
  3. public void sendMessage(CommonMessage msg){
  4. try {
  5. logger.error("发送信息开始");
  6. System.out.println(rabbitTemplate.getConnectionFactory().getHost());
  7. //发送信息 message-exchange 交换机 msg.getSource() 为 test_key
  8. rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
  9. logger.error("发送信息结束");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. }

三、消费者配置:同一个项目中 consumer.xml

1、连接服务配置

  1. <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" username="${rmq.manager.user}" password="${rmq.manager.password}" port="${rmq.port}" />
  2. <rabbit:admin connection-factory="connectionFactory" />

2、spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现

<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>

3、自定义接口类

<bean id="testHandler" class="com.rabbit.TestHandler"></bean>

4、queue 队列声明 需要发送消息到哪些队列 ,消息系统监听队列

<rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />

5、topic 模式 绑定。注意:此处仅写一种模式;根据需求可以配多种模式。

  1. <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
  2. <rabbit:bindings>
  3. <rabbit:binding queue="test_queue" pattern="test_key" />
  4. </rabbit:bindings>
  5. </rabbit:topic-exchange>

6、用于消息的监听的代理类MessageListenerAdapter

  1. <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
  2. <constructor-arg ref="testHandler" />//类名
  3. <property name="defaultListenerMethod" value="handlerTest"></property>//方法名
  4. <property name="messageConverter" ref="jsonMessageConverter"></property>
  5. </bean>

7、配置监听  acknowledeg = "manual"   设置手动应答  当消息处理失败时:会一直重发  直到消息处理成功,监听容器

acknowledge="auto" concurrency="30"  设置发送次数,最多发送30次

  1. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
  2. <rabbit:listener queues="test_queue" ref="testQueueListenerAdapter" />
  3. </rabbit:listener-container>

8、消费端代码:TestHandler 类

 

  1. public class TestHandler {
  2. @Override
  3. public void handlerTest(CommonMessage commonMessage) {
  4. System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
  5. }
  6. }

总结:这是rabbitmq 从生产者到消费者 的整个流程配置,能够直接在web开发中使用,博主亲测使用;如果还有不明白的小伙伴,可以留言,博主定会详细回复!

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/386673
推荐阅读
相关标签
  

闽ICP备14008679号