当前位置:   article > 正文

Java接收rabbitmQ消息_RabbitMQ消息发送与接收

java rabbitmq接收消息

1.简介

所有MQ产品从模型抽象上来说都是一样的过程。消费者订阅某个队列。生产者创建消息,然后发布到队列,最后将消息发送到监听的消费者。

AMQP(Advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。

ActiveMQ是基于JMS(Java Message Service)协议的消息中间件。区别如下:

88369e219d8018135717de5b764db88f.png

Rabbit模型如下:

08879534a46cddb18016a3ce37156378.png

1.Message。消息,是不具体的。由消息头和消息体组成。消息体是不透明的,而消息头是一系列可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(是否持久存储)等

2.Publisher。消息的生产者,也是一个向交换机发布消息的客户端应用程序。

3.Exchanger。交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。

4.Binging。绑定,用于消息队列和交换器之间的管理。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。所以可以将交换器理解成一个由绑定构成的路由表。

5.Queue。消息队列,用来保存消息知道发送给消费者。一个消息可投入一个或对个队列。

6.Connection。网络连接,比如一个TCP连接。

7.Channel。信道,多路复用连接中的一条独立的双向数据流通道,可读可写。一个Connection包括多个channel。因为对于操作系统来说建立和销毁TCP是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接。

8.Consumer。消费者,从消息队列取得消息的客户端应用程序。

9.VirtualHost。虚拟主机。表示一批交换机、消息队列和相关对象。vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、绑定、交换器和权限控制;vhost通过在各个实例间提供逻辑上分离,允许你为不同应用程序安全保密地运行数据;vhost是AMQP概念的基础,必须在连接时进行指定,RabbitMQ包含了默认vhost:“/”。

10.Borker。表示消息队列服务器实体。表示启动一个rabbitmq所包含的进程。

2.使用

1.简单队列模式

不涉及交换机的模型如下:

2c3f4935a3f5eae2b8129151eb32c1e5.png

pom文件引入如下依赖:

com.rabbitmq

amqp-client

5.4.3

1.消息生产者

packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classProducer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

ConnectionFactory factory = newConnectionFactory();

factory.setHost("192.168.99.100");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");returnfactory;

}public static void main(String[] args) throwsIOException, TimeoutException {

ConnectionFactory connectionFactory=getConnectionFactory();

Connection newConnection= null;

Channel createChannel= null;try{

newConnection=connectionFactory.newConnection();

createChannel=newConnection.createChannel();/*** 声明一个队列。

* 参数一:队列名称

* 参数二:是否持久化

* 参数三:是否排外 如果排外则这个队列只允许有一个消费者

* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

* 参数五:队列的附加属性

* 注意:

* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

* 2.队列名可以任意取值,但需要与消息接收者一致。

* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);

String message= "测试消息";/*** 发送消息到MQ

* 参数一:交换机名称,为""表示不用交换机

* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey

* 参数三:消息的属性信息

* 参数四:消息内容的字节数组*/createChannel.basicPublish("", "myQueue", null, message.getBytes());

System.out.println("消息发送成功");

}catch(Exception e) {

e.printStackTrace();

}finally{if (createChannel != null) {

createChannel.close();

}if (newConnection != null) {

newConnection.close();

}

}

}

}

注意:5672是rabbitmq暴露的端口,15672是management插件的端口。

发送成功之后可以从15672端口查看,也可以从15672进行消费,如下:

eb2712f48ec39aa3ba0b7b01818f9c15.png

2.消息接收

packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.rabbitmq.client.AMQP.BasicProperties;public classConsumer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case

ConnectionFactory factory = newConnectionFactory();

factory.setHost("192.168.99.100");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");returnfactory;

}public static void main(String[] args) throwsIOException, TimeoutException {

ConnectionFactory connectionFactory=getConnectionFactory();

Connection newConnection= null;

Channel createChannel= null;try{

newConnection=connectionFactory.newConnection();

createChannel=newConnection.createChannel();/*** 声明一个队列。

* 参数一:队列名称

* 参数二:是否持久化

* 参数三:是否排外 如果排外则这个队列只允许有一个消费者

* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列

* 参数五:队列的附加属性

* 注意:

* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;

* 2.队列名可以任意取值,但需要与消息接收者一致。

* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);/*** 接收消息。会持续坚挺,不能关闭channel和Connection

* 参数一:队列名称

* 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息

* 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。

* 参数四:消息接收者*/createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(createChannel) {

@Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throwsIOException {

String string= new String(body, "UTF-8");

System.out.println("接收到d消息: -》 " +string);

}

});

}catch(Exception e) {

e.printStackTrace();

}finally{

}

}

}

注意:消息的确认模式可以为自动也可以为手动,自动确认读取完会自动从队列删除;手动需要自己ack,如果设为手动也没ack可能会造成消息重复消费。

如果是多个消费者,会从队列以轮询的方式处理消息,这种称为工作队列模式。

补充:这种实际也是用了rabbitmq的一个默认交换机,routing_key为队列名称。也可以理解为是Rabbitmq类型为System的交换机。

02f579877e6c1be24e882e593950d660.png

测试:修改消费者代码

createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(cre

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/384339
推荐阅读
相关标签
  

闽ICP备14008679号