赞
踩
Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层, 不依赖于特定的 AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用 spring-rabbit 来实现。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
包括 3 个 jar 包: Amqp-client-3.3.4.jar; Spring-amqp.jar; Spring.rabbit.jar;
思考:
Java API 方式编程,有什么问题?
Spring 封装 RabbitMQ 的时候,它做了什么事情?
// 声明一个交换机
rabbitAdmin.declareExchange(new DirectExchange("ADMIN_EXCHANGE", false, false));
// 声明一个队列
rabbitAdmin.declareQueue(new Queue("ADMIN_QUEUE", false, false, false));
// 声明一个绑定
rabbitAdmin.declareBinding( new Binding("ADMIN_QUEUE", Binding.DestinationType.QUEUE,
"ADMIN_EXCHANGE", "admin", null));
为什么我们在配置文件(Spring)或者配置类(SpringBoot)里面定义了交换机、 队列、绑定关系,并没有直接调用 Channel 的 declare 的方法,Spring 在启动的时候就可以帮我们创建这些元数据?这些事情就是由 RabbitAdmin 完成的。
RabbitAdmin 实现了 InitializingBean 接口,里面有唯一的一个方法 afterPropertiesSet(),这个方法会在 RabbitAdmin 的属性值设置完的时候被调用。
在 afterPropertiesSet ()方法中,调用了一个 initialize()方法。这里面创建了三个Collection,用来盛放交换机、队列、绑定关系。
最后依次声明返回类型为 Exchange、Queue 和 Binding 这些 Bean,底层还是调
用了 Channel 的 declare 的方法。
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
ReturnCallBack
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
return rabbitTemplate;
}
ConfirmCallBack
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
{
public void confirm (CorrelationData correlationData,boolean ack, String cause){
if (ack) {
System.out.println("消息确认成功");
} else {
System.out.println("消息确认失败");
}
}
});
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(getSecondQueue(), getThirdQueue()); //监听的队列 container.setConcurrentConsumers(1); // 最小消费者数 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); return container; }
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setAutoStartup(true);
return factory;
}
整合代码
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new CachingConnectionFactory(new URI("amqp://guest:guest@localhost:5672")); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); SimpleMessageListenerContainer container = factory.createListenerContainer(); // 不用工厂模式也可以创建 // SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setQueueNames("BASIC_SECOND_QUEUE"); container.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("收到消息:" + message); } }); container.start(); AmqpTemplate template = new RabbitTemplate(connectionFactory); template.convertAndSend("BASIC_SECOND_QUEUE", "msg 1"); }
在调用 RabbitTemplate 的 convertAndSend()方法发送消息时,会使用MessageConvertor 进行消息的序列化,默认使用 SimpleMessageConverter。 在某些情况下,我们需要选择其他的高效的序列化工具。如果我们不想在每次发送消息时自己处理消息,就可以直接定义一个MessageConvertor。
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
MessageConvertor 如何工作?
调用了 RabbitTemplate 的 convertAndSend() 方法时会使用对应的MessageConvertor 进行消息的序列化和反序列化。
有哪些 MessageConvertor?
在 Spring 中提供了一个默认的转换器:SimpleMessageConverter。 Jackson2JsonMessageConverter(RbbitMQ 自带):将对象转换为 json,然后再转换成字节数组进行传递。
如何自定义 MessageConverter?
例如:我们要使用 Gson 格式化消息:
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest" host="127.0.0.1" port="5672"/> <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/> <rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/> <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin"> <rabbit:bindings> <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey"> </rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <rabbit:template id="amqpTemplate" exchange="${gupao.exchange}" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> <bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver"/> </rabbit:listener-container>
3 个交换机与 4 个队列绑定。4 个消费者分别监听 4 个队列。 生产者发送 4 条消息,4 个队列收到 5 条消息。消费者打印出 5 条消息。
RabbitConfig.java
定义交换机
@Bean("vipDirectExchange")
public DirectExchange getDirectExchange() {
return new DirectExchange(directExchange);
}
@Bean("vipTopicExchange")
public TopicExchange getTopicExchange() {
return new TopicExchange(topicExchange);
}
@Bean("vipFanoutExchange")
public FanoutExchange getFanoutExchange() {
return new FanoutExchange(fanoutExchange);
}
定义队列
@Bean("vipFirstQueue") public Queue getFirstQueue() { return new Queue(firstQueue); } @Bean("vipSecondQueue") public Queue getSecondQueue() { return new Queue(secondQueue); } @Bean("vipThirdQueue") public Queue getThirdQueue() { return new Queue(thirdQueue); } @Bean("vipFourthQueue") public Queue getFourthQueue() { return new Queue(fourthQueue); }
定义绑定
@Bean public Binding bindFirst(@Qualifier("vipFirstQueue") Queue queue, @Qualifier("vipDirectExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("hc.x"); } @Bean public Binding bindSecond(@Qualifier("vipSecondQueue") Queue queue, @Qualifier("vipTopicExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("*.xhc.*"); } @Bean public Binding bindThird(@Qualifier("vipThirdQueue") Queue queue, @Qualifier("vipFanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding bindFourth(@Qualifier("vipFourthQueue") Queue queue, @Qualifier("vipFanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); }
定义监听(后面三个消费者省略); 在消费者类中可以有多个处理(不同类型的消息)的方法。
FirstConsumer.java
@Component
@PropertySource("classpath:xhc.properties")
@RabbitListener(queues = "${com.xhc.firstqueue}")
public class FirstConsumer {
@RabbitHandler
public void process(@Payload Merchant merchant) {
System.out.println("First Queue received msg : " + merchant.getName());
}
}
注入 RabbitTemplate 发送消息
RabbitSender.java
@Autowired
AmqpTemplate template;
public void send() throws JsonProcessingException {
Merchant merchant = new Merchant(1001, "a direct msg : aaa", "bbb");
template.convertAndSend(directExchange, directRoutingKey, merchant);
template.convertAndSend(topicExchange, topicRoutingKey1, "a topic msg : 1.xhc.2");
template.convertAndSend(topicExchange, topicRoutingKey2, "a topic msg : 3.xhc.4");
}
// 发送 JSON 字符串
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(merchant);
System.out.println(json);
template.convertAndSend(fanoutExchange,"",json);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。