发送端配置:
rabbitmq-send-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 回调 -->
<bean id="testConfirmCallback" class="com.yunos.service.impl.TestConfirmCallback" />
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="localhost:5672" />
<property name="username" value="guest" />
<property name="password" value="guest" />
<property name="channelCacheSize" value="5" />
<property name="virtualHost" value="/" />
<property name="requestedHeartBeat" value="5" />
<!-- 开启发送方的回调 -->
<property name="publisherConfirms" value="true" />
</bean>
<!-- 生产者配置开始 -->
<!-- rabbitmq server服务管理页面:http://localhost:15672/ -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<!-- exchage名称为hello.direct ,hello.direct需在rabbitmq服务管理页面exchanges定义 -->
<property name="exchange" value="hello.direct" />
<!-- 使用confirm时,应关闭事物方式 -->
<property name="channelTransacted" value="false" />
</bean>
<bean id="rabbitMqService" class="com.yunos.service.impl.RabbitMqServiceImpl">
<property name="rabbitTemplate" ref="rabbitTemplate" />
</bean>
<!-- 生产者配置结束 -->
com.yunos.service.impl.RabbitMqServiceImpl.java
package com.yunos.service.impl;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.yunos.domain.Message;
import com.yunos.service.RabbitMqService;
/**
* 发送消息
*
*/
public class RabbitMqServiceImpl implements RabbitMqService {
public RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public boolean send(Message message) {
try {
// exchange="hello.direct"
// routingKey="hello.world.quene"
rabbitTemplate.convertAndSend("hello.direct", "hello.world.quene",
message);
// 如果是fanout类型的exchange,则只需如下发送消息即可
// rabbitTemplate.convertAndSend(message);
//设置回调,当消息成功发送到rabbitmq服务端时testConfirmCallback.confirm方法会被调用
rabbitTemplate.setConfirmCallback(testConfirmCallback);
} catch (AmqpException e) {
System.out.println(e);
return false;
}
return true;
}
}
回调函数com.yunos.service.impl.TestConfirmCallback.java:
package com.yunos.service.impl;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* 发送端回调函数
*
*/
public class TestConfirmCallback implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack) {
System.out.println(ack);
System.out.println(correlationData);
}
}
接收端配置:
rabbitmq-receive-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="localhost:5672" />
<property name="username" value="guest" />
<property name="password" value="guest" />
<property name="channelCacheSize" value="5" />
<property name="virtualHost" value="/" />
<property name="requestedHeartBeat" value="5" />
</bean>
<!-- 消费者配置开始 -->
<!-- rabbitmq server服务管理页面:http://localhost:15672/ -->
<!-- 声明消息转换器为SimpleMessageConverter -->
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter">
</bean>
<!-- 用于接收消息的处理类 -->
<bean name="rabbitMqMessageHandler" class="com.yunos.service.impl.RabbitMqMessageHandler">
</bean>
<!-- 用于消息的监听的代理类MessageListenerAdapter -->
<bean id="messageListenerAdapter"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rabbitMqMessageHandler" />
<property name="defaultListenerMethod" value="handleMessage" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->
<bean id="listenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 监听test.quene消息队列 test.quene需在rabbitmq服务管理页面quene里定义 -->
<property name="queueNames" value="test.quene" />
<property name="messageListener" ref="messageListenerAdapter" />
</bean>
<!-- 监听生产者发送的消息结束 -->
</beans>
com.yunos.service.impl.RabbitMqMessageHandler.java:
package com.yunos.service.impl;
import com.google.gson.Gson;
import com.yunos.domain.Message;
/**
* 处理消息
*
*/
public class RabbitMqMessageHandler {
public void handleMessage(Message message){
Gson gson=new Gson();
System.out.println("receive message: "+gson.toJson(message));
}
}
需要在rabbitmq server中配置:
1.在Quenes是中增加一个quene,名称为test.quene
2.在exchanges中增加一个exchange,名称为hello.direct,类型为direct
3.在新建的exchange hello.direct添加bindings
添加完成后可以看到绑定关系:
上述绑定表示:将发送到hello.direct的消息按照routKey="hello.world.quene"匹配到test.quene队列。即,消息发送者按照 rabbitTemplate.convertAndSend("hello.direct", "hello.world.quene",message)发送;则rabbitmq将此消息发送到test.quene队列中,这样接收端就可以从test.quene中获取到消息了。
exchange有3中类型direct ,fanout和topic,不同的Exchange会表现出不同路由行为:
(详细参见:http://my.oschina.net/wy08/blog/186202)
a) 如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
b) 如果是 Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。
c)如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。
如果都选择了持久化,如果接收端当掉了,则已发送的消息会保存在rabbitmq服务中,当接收端重新可用时,接收端可以重新接受到这些消息。未发送成功的消息可以在服务管理页面Quenes中的test.quene中看到:
如果有2个接收端监听同一个 QUENE,则2个接收端只能接受部分发送的消息,同一条消息只能被一个客户端接收到,另一个接收端接受不到。
如果发送端使用的exchange是fanout类型的,则只需这样发送消息:
rabbitTemplate.convertAndSend(message);