当前位置:   article > 正文

rabbitmq发送&接收消息

发送端配置:

rabbitmq-send-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd 
        http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

 

<!-- 回调 -->

<bean id="testConfirmCallback" class="com.yunos.service.impl.TestConfirmCallback" />

 

<bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="addresses" value="localhost:5672" />
        <property name="username" value="guest" />
        <property name="password" value="guest" />
        <property name="channelCacheSize" value="5" />
        <property name="virtualHost" value="/" />
        <property name="requestedHeartBeat" value="5" />

<!--         开启发送方的回调 -->
        <property name="publisherConfirms" value="true" />
    </bean>

    <!-- 生产者配置开始 -->
    <!-- rabbitmq server服务管理页面:http://localhost:15672/ -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- exchage名称为hello.direct ,hello.direct需在rabbitmq服务管理页面exchanges定义 -->
        <property name="exchange" value="hello.direct" />

<!--         使用confirm时,应关闭事物方式 -->
        <property name="channelTransacted" value="false" />
    </bean>
    <bean id="rabbitMqService" class="com.yunos.service.impl.RabbitMqServiceImpl">
        <property name="rabbitTemplate" ref="rabbitTemplate" />
    </bean>
    <!-- 生产者配置结束 -->

 

com.yunos.service.impl.RabbitMqServiceImpl.java

package com.yunos.service.impl;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import com.yunos.domain.Message;
import com.yunos.service.RabbitMqService;

/**
 * 发送消息
 *
 */
public class RabbitMqServiceImpl implements RabbitMqService {
   
    public  RabbitTemplate rabbitTemplate;

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public boolean send(Message message) {
       try {
            // exchange="hello.direct"
            // routingKey="hello.world.quene"
            rabbitTemplate.convertAndSend("hello.direct", "hello.world.quene",
                    message);
            // 如果是fanout类型的exchange,则只需如下发送消息即可
            // rabbitTemplate.convertAndSend(message);
            //设置回调,当消息成功发送到rabbitmq服务端时testConfirmCallback.confirm方法会被调用
            rabbitTemplate.setConfirmCallback(testConfirmCallback);
           
        } catch (AmqpException e) {
            System.out.println(e);
            return false;
        }
        return true;
    }

}

 

回调函数com.yunos.service.impl.TestConfirmCallback.java:

package com.yunos.service.impl;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;


/**
 * 发送端回调函数
 *
 */
public  class TestConfirmCallback implements ConfirmCallback {

    @Override
    public  void confirm(CorrelationData correlationData, boolean ack) {
        System.out.println(ack);
        System.out.println(correlationData);
    }

}


接收端配置:

rabbitmq-receive-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd 
        http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">


   <bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="addresses" value="localhost:5672" />
        <property name="username" value="guest" />
        <property name="password" value="guest" />
        <property name="channelCacheSize" value="5" />
        <property name="virtualHost" value="/" />
        <property name="requestedHeartBeat" value="5" />
    </bean>


    <!-- 消费者配置开始 -->
    <!-- rabbitmq server服务管理页面:http://localhost:15672/ -->

    <!-- 声明消息转换器为SimpleMessageConverter -->
    <bean id="messageConverter"
        class="org.springframework.amqp.support.converter.SimpleMessageConverter">
    </bean>

    <!-- 用于接收消息的处理类 -->
    <bean name="rabbitMqMessageHandler" class="com.yunos.service.impl.RabbitMqMessageHandler">
    </bean>


    <!-- 用于消息的监听的代理类MessageListenerAdapter -->
    <bean id="messageListenerAdapter"
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="rabbitMqMessageHandler" />
        <property name="defaultListenerMethod" value="handleMessage" />
        <property name="messageConverter" ref="messageConverter" />
    </bean>

    <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->
    <bean id="listenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- 监听test.quene消息队列 test.quene需在rabbitmq服务管理页面quene里定义 -->
        <property name="queueNames" value="test.quene" />
        <property name="messageListener" ref="messageListenerAdapter" />
    </bean>
    <!-- 监听生产者发送的消息结束 -->

</beans>

 

com.yunos.service.impl.RabbitMqMessageHandler.java:

package com.yunos.service.impl;

import com.google.gson.Gson;
import com.yunos.domain.Message;

/**
 * 处理消息
 *
 */
public class RabbitMqMessageHandler {
   
     public void handleMessage(Message message){
         Gson gson=new Gson();
         System.out.println("receive message: "+gson.toJson(message));
     }

}

需要在rabbitmq server中配置:

1.在Quenes是中增加一个quene,名称为test.quene

2.在exchanges中增加一个exchange,名称为hello.direct,类型为direct

3.在新建的exchange hello.direct添加bindings

添加完成后可以看到绑定关系:

上述绑定表示:将发送到hello.direct的消息按照routKey="hello.world.quene"匹配到test.quene队列。即,消息发送者按照 rabbitTemplate.convertAndSend("hello.direct", "hello.world.quene",message)发送;则rabbitmq将此消息发送到test.quene队列中,这样接收端就可以从test.quene中获取到消息了。

 

exchange有3中类型direct ,fanout和topic,不同的Exchange会表现出不同路由行为:

(详细参见:http://my.oschina.net/wy08/blog/186202)

a) 如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。

b)   如果是  Fanout  类型,则会将消息发送给所有与该  Exchange  定义过  Binding  的所有  Queues  中去,其实是一种广播行为。

c)如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。

 

如果都选择了持久化,如果接收端当掉了,则已发送的消息会保存在rabbitmq服务中,当接收端重新可用时,接收端可以重新接受到这些消息。未发送成功的消息可以在服务管理页面Quenes中的test.quene中看到:

 

如果有2个接收端监听同一个 QUENE,则2个接收端只能接受部分发送的消息,同一条消息只能被一个客户端接收到,另一个接收端接受不到。

 

如果发送端使用的exchange是fanout类型的,则只需这样发送消息:

rabbitTemplate.convertAndSend(message);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/139267?site
推荐阅读
相关标签
  

闽ICP备14008679号