赞
踩
一般单纯使用spring+rabbitmq或者单独使用rabbitmq时,查看下官网的demo应该问题不大,spring amqp封装rabbitmq之后,初始化, 发送,接收的代码变得更是简洁,只是配置项可能会多一些,如果是xml会有不少,假使是annotations会相对简单。
rabbitmq中有种exchange是topic,官网的示例图说的很清楚,在现实的业务中的话,服务端消息发送到绑定的exchange的时候,routingKey是根据不同业务的匹配规则动态生成,先撇开consumer端的消费来说,producer端获取数据并根据routingKey,发送到绑定的exchange时,这个过程很多情况下,是需要动态绑定生成的。
rabbitmq的原生api实现和spring amqp的api实现,关于动态绑定exchange,routingKey和queue的代码。值得注意,producer端在产生数据的时候,并不需要一定绑定一个或者多个queue;
客户端在消费消息的时候,只需要跟exchange+routingKey建立连接,就可以获取对应的数据了。而接收这些数据的客户端队列需要一个queueName,这个在consumer接收的时候,是必须的。因此,这里我说,queue的名称对于producer是不重要的。(我自己的看法,若有错误,希望指出,谢谢)
当exchange的类型被指定为topic的时候,动态的绑定路由和分发数据,不管对producer还是consumer来说,都是比较重要的。这里,我主要贴一下spring amqp的api的动态绑定的代码,rabbitmq原生api也有
发送消息
1、生产者和 Broker 建立 TCP 连接
2、生产者和 Broker 建立通道
3、生产者通过通道消息发送 Broker,由 Exchange 将消息进行转发
4、Exchange 将消息转发到指定的 Queue(队列)
1、消费者和 Broker 建立 TCP 连接
2、消费者和 Broker 建立通道
3、消费者监听指定的 Queue(队列)
4、当有消息到达 Queue 时 Broker 默认将消息推送给消费者
5、消费者接收到消息
package com.rabbitmq.test; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * @Classname ProducerConfiguration * @Description TODO * @Date 2020/6/18 16:20 * @Created by snear */ public class ProducerConfiguration { private String queueName; private String routingKey; private RabbitTemplate rabbitTemplate; private String exchange; public ProducerConfiguration() { } public ProducerConfiguration(String exchange,String queueName, String routingKey) { this.queueName = queueName; this.routingKey = routingKey; this.exchange=exchange; this.rabbitTemplate = rabbitTemplate(); RabbitAdmin admin = new RabbitAdmin(this.rabbitTemplate.getConnectionFactory()); admin.declareQueue(new Queue(this.queueName)); admin.declareExchange(new TopicExchange(exchange)); // admin.setAutoStartup(true); } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setQueueName(String queueName) { this.queueName = queueName; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } public String getQueueName() { return queueName; } public String getRoutingKey() { return routingKey; } public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //The routing key is set to the name of the queue by the broker for the default exchange. template.setRoutingKey(this.routingKey); //Where we will synchronously receive messages from template.setDefaultReceiveQueue(this.queueName); // template.setMessageConverter(new JsonMessageConverter()); return template; } public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } public void send(String s) { this.rabbitTemplate.convertAndSend(s); } public void send(String exchange,String routingKey,Object msg) { this.rabbitTemplate.convertAndSend(exchange,routingKey,msg); } }
package com.rabbitmq.test; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; /** * @Classname ConsumerConfig * @Description TODO * @Date 2020/6/18 16:28 * @Created by snear */ public class ConsumerConfig { private String queueName; private String routingKey; private int onOfConsumer; public int getOnOfConsumer() { return onOfConsumer; } public void setOnOfConsumer(int onOfConsumer) { this.onOfConsumer = onOfConsumer; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getRoutingKey() { return routingKey; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } public ConsumerConfig(String queueName, String routingKey, int onOfConsumer) throws Exception { this.queueName = queueName; this.routingKey = routingKey; this.onOfConsumer = onOfConsumer; ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames(this.queueName); container.setConcurrentConsumers(this.onOfConsumer); container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler())); container.startConsumers(); } public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } }
package com.rabbitmq.test; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; /** * @Classname ConsumerSimpleMessageListenerContainer * @Description TODO * @Date 2020/6/18 16:29 * @Created by snear */ public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer { public void startConsumers() throws Exception { super.doStart(); } }
package com.rabbitmq.test;
/**
* @Classname ConsumerHandler
* @Description TODO
* @Date 2020/6/18 16:30
* @Created by snear
*/
public class ConsumerHandler {
public void handleMessage(String text) {
System.out.println("Received--------------------------: " + text);
}
}
最后是test:
package com.rabbitmq.test; import java.util.concurrent.TimeUnit; /** * @Classname Test * @Description TODO * @Date 2020/6/18 16:31 * @Created by snear */ public class Test { public static void main(String[] args) throws InterruptedException, Exception { ProducerConfiguration producer = new ProducerConfiguration("e1","q1", "q1"); ConsumerConfig consumer = new ConsumerConfig("q1", "q1", 5); int cout = 0; producer.send("Str: " + cout); /* while (true) { producer.send("Str: " + cout); TimeUnit.SECONDS.sleep(2); cout++; }*/ } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。