当前位置:   article > 正文

RabbitMQ消息队列实战(2)—— Java调用RabbitMQ的三种方式_java mq的使用

java mq的使用

本文主要介绍Java中调用RabbitMQ的三种方式。三种方式实际上对应了三种不同的抽象级别:

首先,通过Java原生代码来访问RabbitMQ。在这种方式下,需要手动创建Connection,创建Channel,然后通过Channel对象可以显式的创建Exchange、Queue和Bind等等。这种方式的好处就是使得我们能够很显式地了解到整个RabbitMQ操作的生命周期,建议新手可以通过这种方式学习RabiitMQ的入门。

spring-boot-starter-amqp对RabbitMQ的使用进行了进一步的封装,通过这种方式使用集成到spring boot中的RabbitMQ时,我们不再关心Connect和Channel的创建,spring boot会替我们创建好。我们索要做的,只是通过注解的方式创建Exchange、Queue和Bind对象,并把他们交给spring ioc进行管理,然后spring boot又会自动生成这些对象对应的交换机、队列和绑定。

Java中操作RabbitMQ的最后一种方法是通过EDA(Event Driven Achitecture,事件驱动架构)框架的spring cloud stream。spring cloud stream对RabitMQ(准确的说应该是消息队列)封装的更加彻底,我们甚至不用关心使用的消息队列是RabbitMQ还是Kafka(spring cloud stream可以配置RabbitMQ和Kafak两种消息队列,并进行无缝切换)。在使用时spring cloud stream时,只需一个标签就能自动创建RabitMQ的Connection、Chanel,甚至你都不用关心Exchange、Queue和Bind这些在spring-boot-starter-amqp中还需要手动创建的对象,他们就被创建好了。spring cloud stream的强大之处就在于它的封装,但是不足之处也在于它的封装,封装的太强,必然增加了学习成本和调试难度,而且类似RabbitMQ和Kafka这种中间件的使用,一般在系统创建之处就一定确定,进行无缝切换就显得有些鸡肋了。

下面,我们就以代码的方式演示这三种调用RabbitMQ的方式:

一、Java原生代码调用RabbitMQ

1.1 交换机和队列的创建

  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.setHost(this.rabbitMqHost);
  3. factory.setPort(this.rabbitMqPort);
  4. factory.setConnectionTimeout(this.rabbitMqTimeOut);
  5. factory.setUsername(this.rabbitMqUsername);
  6. factory.setPassword(this.rabbitMqPassword);
  7. factory.setVirtualHost("/");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  11. channel.queueDeclare("test-queue", true, false, false, null);
  12. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  13. channel.basicPublish("direct-exchange", "test-queue", null, msg.getBytes("UTF-8"));

上述的代码,创建了一个直连交换机、一个队列,并进行绑定,最后向交换机中发送了一个"Hello World"的字符串。

1~7行,创建了一个ConnectionFactory 对象并进行配置,配置的参数包括RabbitMQ的ip地址(host),端口号(port)、超时(connectionTimeout)等等。

第8行,通过ConnectionFactory 对象,创建了一个Connection 对象,此时已经完成了对RabbitMQ服务器的连接。如果我们通过RabitMQ Magement Web查看,可以看到这个链接。

第9行,创建用来通信的信道Channel。

第10行,声明和创建交换机。这里exchangeDeclare有五个参数。第1个参数指定了交换机的名称;第2个参数指定了交换机的类型:direct、topic或者fanout;第3个参数,指定交换机是否要持久化,如果设置为true,那么交换机的元数据要持久化到内存中;第4个参数,指定交换机在没有队列与其绑定时,是否删除,设置为false表示不删除;最后一个参数是Map<String, Object>类型,用来指定交换机其它一些结构化的参数,我在这里直接设置为null。

第11行,声明了一个名为test-queue的队列。queueDeclare有5个参数:第1个参数指定了队列的名称;第2个参数表示队列是否要持久化,但是需要注意,这里的持久化只是队列名称等这些队列元数据的持久化,不是队列中消息的持久化。第3个参数,表示队列是不是私有的,如果是私有的,只有创建它的应用程序才能从队列消费消息;第4个参数表示队列在没有消费者订阅时是否自动删除;第5个参数是队列的一些结构化信息,比如声明死信队列、磁盘队列会用到。

第12行,创建了一个bind对象,将交换机和队列进行绑定,queueBind的三个参数中:第1个参数指定了队列名称,第2个参数指定了交换机名称,第3个参数是路由键,在直连模式下为队列名称。

第13行,发送消息,在直连模式下需要指定:直连交换机名称(参数1);路由键(参数2,也就是目标队列名称);参数3类型为BasicProperties,可以为消息附带一些额外的附件,比如在使用RabbitMQ远程RPC调用模式发送消息时可以用到,这里直接设置为null。参数4就是要发送的消息转换成的二进制数组。

上面就是一个创建direct exchange和queue并发送消息的例子,如果要使用topic exchange或者fanout exchange只需要一些小小的改动即可。

比如创建topic exchange要明确指明交换机的类型为topic:

 channel.exchangeDeclare("topic-exchange", "topic", true, false, null);

绑定时指定主题为路由键:

 channel.queueBind("test-queue", "topic-exchange", "fruit");

发送消息时指定主题为路由键:

channel.basicPublish("topic-exchange", "fruit", null, msg.getBytes("UTF-8"));

再比如创建fanout exchange要明确指明交换机的类型为fanout:

channel.exchangeDeclare("fanout-exchange", "fanout", true, false, null);

绑定时指定路由键为空:

channel.queueBind("test-queue", "fanout-exchange", "");

发布消息时指定路由键为空:

channel.basicPublish("fanout-exchange", "", null, msg.getBytes("UTF-8"));

1.2 消费者订阅队列的消息

在上面的例子中,我们演示了创建direct、topic和fanout三种类型的exchange以及关联好了队列,现在,我们创建一个消费者来订阅队列里面的消息。首先实现Consumer接口:

  1. public class TestConsumer implements Consumer {
  2. @Override
  3. public void handleConsumeOk(String s) {
  4. System.out.println(s);
  5. }
  6. @Override
  7. public void handleCancelOk(String s) {}
  8. @Override
  9. public void handleCancel(String s) throws IOException {}
  10. @Override
  11. public void handleShutdownSignal(String s, ShutdownSignalException e) {}
  12. @Override
  13. public void handleRecoverOk(String s) {}
  14. @Override
  15. public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
  16. String str = new String(bytes);
  17. System.out.println("接受到的字符串是:" + str);
  18. }
  19. }

Consumer接口的方法有6个,在这里我们只用到了2个,handleConsumeOk在消费者获取到消息后调用,而handleDelivery是在调用handleConsumeOk后调用。我们业务的主要逻辑在handleDelivery中,因为在这个方法之中,我们可以获取到消息,并进行相应的处理。实现了自己的消费者,接下来需要用该消费者订阅队列:

  1. // 创建连接和信道
  2. ConnectionFactory factory = new ConnectionFactory();
  3. factory.setHost(this.rabbitMqHost);
  4. factory.setPort(this.rabbitMqPort);
  5. factory.setConnectionTimeout(this.rabbitMqTimeOut);
  6. factory.setUsername(this.rabbitMqUsername);
  7. factory.setPassword(this.rabbitMqPassword);
  8. factory.setVirtualHost("/");
  9. Connection connection = null;
  10. try {
  11. connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. Consumer consumer = new TestConsumer();
  14. channel.basicConsume("test-queue", true, consumer);
  15. while (true) {
  16. Thread.sleep(3600000);
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. } finally {
  21. if (connection != null && connection.isOpen()) {
  22. connection.close();
  23. }
  24. }

第1~12行,和生产者创建Connection的过程一致。

第13行,创建了一个TestConsumer对象。

第14行,订阅队列test-queue中的消息。basicConsume方法有三个参数:第1个参数是指明要订阅的通道的名称;第2个参数指明是否自动ack,如果是true,这个方法结束后会自动进行ack,如果是false,需要额外手动的ack;第3个参数就是装配的消费者。

第15~17行,没有业务上的功能,只是单纯不让程序结束。

当运行了消费者以后,就可以看到,消费者消费了队列中的消息。

消费者打印消息:

队列中的消息已经清空:

以上就是通过Java原生的代码调用RabbitMQ的例子,接下来,我们学习下另外一种调用RabbitMQ的方式,通过 spring-boot-starter-amqp调用。

二、 spring-boot-starter-amqp调用RabbitMQ

2.1 生产者

首先,先要创建spring boot代码工程,并且pom文件中引入spring-boot-starter-amqp的依赖。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

创建RabbitMQ的配置:

  1. @Configuration
  2. public class RabbitConfig {
  3. @Value("${spring.rabbitmq.host}")
  4. private String host;
  5. @Value("${spring.rabbitmq.port}")
  6. private int port;
  7. @Value("${spring.rabbitmq.username}")
  8. private String username;
  9. @Value("${spring.rabbitmq.password}")
  10. private String password;
  11. @Bean
  12. public ConnectionFactory connectionFactory() {
  13. CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
  14. connectionFactory.setUsername(username);
  15. connectionFactory.setPassword(password);
  16. connectionFactory.setVirtualHost("/");
  17. connectionFactory.setPublisherConfirms(true);
  18. return connectionFactory;
  19. }
  20. @Bean
  21. public RabbitTemplate rabbitTemplate() {
  22. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  23. return template;
  24. }
  25. //队列 起名:TestDirectQueue
  26. @Bean
  27. public Queue TestDirectQueue() {
  28. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  29. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  30. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  31. // return new Queue("TestDirectQueue",true,true,false);
  32. //一般设置一下队列的持久化就好,其余两个就是默认false
  33. return new Queue("TestDirectQueue", true, false, false);
  34. }
  35. //Direct交换机 起名:TestDirectExchange
  36. @Bean
  37. DirectExchange TestDirectExchange() {
  38. // return new DirectExchange("TestDirectExchange",true,true);
  39. return new DirectExchange("TestDirectExchange", true, false);
  40. }
  41. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectQueue
  42. @Bean
  43. Binding bindingDirect() {
  44. return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
  45. }
  46. }

可以看到,在上面的代码中我们创建了connectionFactory、rabbitTemplate、TestDirectQueue、TestDirectExchange和bindingDirect,我想从bean name大家也都能猜到,这是在创建RabbitMq的连接工厂、交换机、队列和绑定等等。其中rabbitTemplate我们在上文中没有接触过,实际上这就是spring boot对RabbitMQ根据bean创建信道、交换机等基本组件的封装,利用模板方法模式,将创建过程进行了隐藏,也对消息的发布和订阅过程进行了隐藏。

完成了上述配置之后,运行程序,是不是就自动创建了这些交换机、通道等等呢?也许你会回答是,但是笔者在运行程序后,查看RabbitMQ的web,发现还是一片空白:

是我们没有创建成功?其实不然,是因为rabbitTemplate在创建这些组件时,是采用的懒加载模式,只有在发送消息之前,才会去真正创建这些交换机、通道等等。所以,接下来,我们创建一个工具类,并在单元测试中通过该工具类发送消息:

  1. @Service
  2. public class RbmqServiceImpl implements RbmqService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Override
  6. public void sendMsg(String str) {
  7. Message message=new Message(str.getBytes());
  8. rabbitTemplate.send("TestDirectExchange","TestDirectQueue",message);
  9. }
  10. }

在单元测试中测试消息的发送:

  1. @Test
  2. public void sendMsgTest() {
  3. rbmqService.sendMsg("Hello World");
  4. }

此时,再去看RabbitMQ的管理Web,发现交换机和队列都创建完成,而且队列中也缓存了消息:

2.2 消费者

前面介绍了使用spring boot集成的RabbitMQ组件创建生产者的方法,下面我们介绍下,消费者的创建。

配置类RabbitConfig的定义,基本和生产者一致,这里不再赘述,我们重点介绍下使用@RabbitListener标签监听队列的方法。首先,还是创建一个工具类:

  1. @Service
  2. public class RbmqServiceImpl implements RbmqService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @RabbitListener(queues = "TestDirectQueue")
  6. @Override
  7. public void consumer(String msg) {
  8. System.out.println("接受到的消息是:" + msg);
  9. }
  10. }

可以看到,通过RabbitListener标签,我们直接实现了订阅TestDirectQueue队列。此时运行程序,会接受并打印我们用生产者发送的消息:

@RabbitListener标签除了指定监听的队列之外,还可以创建交换机和队列,并进行绑定,然后再开启监听:

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(value = "TestTopicQueue", durable = "true"),
  3. exchange = @Exchange(value = "orderTopicExchange", type = ExchangeTypes.TOPIC),
  4. key = "TestTopicRouting")
  5. )

以上就是使用spring-boot-starter-amqp集成RabbitMQ的方法,最后,我们学习下使用spring cloud stream来操作RabbitMQ。

三、spring cloud stream调用RabbitMQ

3.1 生产者

新建一个spring-boot工程,pom文件中引入以下依赖:

  1. <dependencyManagement>
  2. <dependencies>
  3. <!-- spring-cloud-dependencies start-->
  4. <dependency>
  5. <groupId>org.springframework.cloud</groupId>
  6. <artifactId>spring-cloud-dependencies</artifactId>
  7. <version>Finchley.RELEASE</version>
  8. <type>pom</type>
  9. <scope>import</scope>
  10. </dependency>
  11. <!-- spring-cloud-dependencies end-->
  12. </dependencies>
  13. </dependencyManagement>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.springframework.cloud</groupId>
  17. <artifactId>spring-cloud-stream</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.cloud</groupId>
  21. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  22. </dependency>
  23. </dependencies>

同样在配置文件application.yml中进行RabbitMQ链接信息的配置:

  1. server:
  2. port: 8022
  3. spring:
  4. #配置rabbitMq 服务器
  5. rabbitmq:
  6. host: 127.0.0.1
  7. port: 5672
  8. username: guest
  9. password: guest
  10. #虚拟host 可以不设置,使用server默认host
  11. #virtual-host: JCcccHost

使用spring cloud stream方式调用RabbitMQ,首先要创建绑定接口:

  1. public interface OutputMessageBinding {
  2. /** Topic 名称*/
  3. String OUTPUT = "message-center";
  4. @Output(OUTPUT)
  5. MessageChannel output();
  6. }

这里需要说明一下:

第5行通过Output标签指定了消息的输出exchange,这里会创建一个名称为message-center-out的exchange,而且类型为topic。通过Spring cloud stream创建的exchange默认的类型都是topic。

接下来,我们需要再次创建一个工具类:

  1. @Service
  2. @EnableBinding(OutputMessageBinding.class)
  3. public class RbmqServiceImpl implements RbmqService {
  4. @Resource
  5. private OutputMessageBinding outputMessageBinding;
  6. @Override
  7. public void sendMsg(String msg) {
  8. outputMessageBinding.output().send(MessageBuilder.withPayload(msg).build());
  9. }
  10. }

第2行中的EnableBinding就是为了激活绑定类OutputMessageBinding。OutputMessageBinding被激活之后会产生一个名称为outputMessageBinding的bean托管到IOC中,然后在第6行获取到了这个bean。在第10行中,获取outputMessageBinding的output对象进行消息的发送。

在单元测试中对发送信息的接口进行调用:

  1. @SpringBootTest
  2. @RunWith(SpringJUnit4ClassRunner.class)
  3. public class RbmqTest {
  4. @Autowired
  5. RbmqService rbmqService;
  6. @Test
  7. public void sendMsgTest() {
  8. rbmqService.sendMsg("Hello World");
  9. }
  10. }

然后在web上可以看到新建的topic类型的exchange:

3.1 消费者

消费者的创建和使用过程其实和生产者比较类似,首先也是需要创建一个绑定接口:

  1. public interface InputMessageBinding {
  2. String INPUT = "message-center";
  3. @Input(INPUT)
  4. SubscribableChannel input();
  5. }

在消费者的绑定接口中,使用@Input标签用来表明该对象为消费者对象。接下来,同样也需要创建一个工具类并注入到IOC容器中:

  1. @Service
  2. @EnableBinding({InputMessageBinding.class})
  3. public class RbmqServiceImpl implements RbmqService {
  4. @StreamListener(InputMessageBinding.INPUT)
  5. @Override
  6. public void consume(String msg) {
  7. System.out.println("接受到的消息是:" + msg);
  8. }
  9. }

可以看到,在第8行,我们使用了StreamListener标签监听了创建的InputMessageBinding的INPUT字段,StreamListener会在内部进行处理,实际上监听的是名名为message-center+随机字符串的队列,而队列和message-center也自动进行了绑定:

看下上面红色圈出部分的队列和交换机的绑定,你会发现,绑定的路由键为'#',表示路由匹配任意规则,也就是说从名为message-center的exchange发出的消息都会路由到该队列上。

至此,Java中三种操作RabbitMQ的方式都已经介绍完毕。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/384343
推荐阅读
相关标签
  

闽ICP备14008679号