赞
踩
1.简介
所有MQ产品从模型抽象上来说都是一样的过程。消费者订阅某个队列。生产者创建消息,然后发布到队列,最后将消息发送到监听的消费者。
AMQP(Advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。
ActiveMQ是基于JMS(Java Message Service)协议的消息中间件。区别如下:
Rabbit模型如下:
1.Message。消息,是不具体的。由消息头和消息体组成。消息体是不透明的,而消息头是一系列可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(是否持久存储)等
2.Publisher。消息的生产者,也是一个向交换机发布消息的客户端应用程序。
3.Exchanger。交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。
4.Binging。绑定,用于消息队列和交换器之间的管理。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。所以可以将交换器理解成一个由绑定构成的路由表。
5.Queue。消息队列,用来保存消息知道发送给消费者。一个消息可投入一个或对个队列。
6.Connection。网络连接,比如一个TCP连接。
7.Channel。信道,多路复用连接中的一条独立的双向数据流通道,可读可写。一个Connection包括多个channel。因为对于操作系统来说建立和销毁TCP是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接。
8.Consumer。消费者,从消息队列取得消息的客户端应用程序。
9.VirtualHost。虚拟主机。表示一批交换机、消息队列和相关对象。vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、绑定、交换器和权限控制;vhost通过在各个实例间提供逻辑上分离,允许你为不同应用程序安全保密地运行数据;vhost是AMQP概念的基础,必须在连接时进行指定,RabbitMQ包含了默认vhost:“/”。
10.Borker。表示消息队列服务器实体。表示启动一个rabbitmq所包含的进程。
2.使用
1.简单队列模式
不涉及交换机的模型如下:
pom文件引入如下依赖:
com.rabbitmq
amqp-client
5.4.3
1.消息生产者
packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public classProducer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case
ConnectionFactory factory = newConnectionFactory();
factory.setHost("192.168.99.100");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");returnfactory;
}public static void main(String[] args) throwsIOException, TimeoutException {
ConnectionFactory connectionFactory=getConnectionFactory();
Connection newConnection= null;
Channel createChannel= null;try{
newConnection=connectionFactory.newConnection();
createChannel=newConnection.createChannel();/*** 声明一个队列。
* 参数一:队列名称
* 参数二:是否持久化
* 参数三:是否排外 如果排外则这个队列只允许有一个消费者
* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
* 参数五:队列的附加属性
* 注意:
* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
* 2.队列名可以任意取值,但需要与消息接收者一致。
* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);
String message= "测试消息";/*** 发送消息到MQ
* 参数一:交换机名称,为""表示不用交换机
* 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
* 参数三:消息的属性信息
* 参数四:消息内容的字节数组*/createChannel.basicPublish("", "myQueue", null, message.getBytes());
System.out.println("消息发送成功");
}catch(Exception e) {
e.printStackTrace();
}finally{if (createChannel != null) {
createChannel.close();
}if (newConnection != null) {
newConnection.close();
}
}
}
}
注意:5672是rabbitmq暴露的端口,15672是management插件的端口。
发送成功之后可以从15672端口查看,也可以从15672进行消费,如下:
2.消息接收
packagerabbitmq;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.rabbitmq.client.AMQP.BasicProperties;public classConsumer {public staticConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的case
ConnectionFactory factory = newConnectionFactory();
factory.setHost("192.168.99.100");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");returnfactory;
}public static void main(String[] args) throwsIOException, TimeoutException {
ConnectionFactory connectionFactory=getConnectionFactory();
Connection newConnection= null;
Channel createChannel= null;try{
newConnection=connectionFactory.newConnection();
createChannel=newConnection.createChannel();/*** 声明一个队列。
* 参数一:队列名称
* 参数二:是否持久化
* 参数三:是否排外 如果排外则这个队列只允许有一个消费者
* 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
* 参数五:队列的附加属性
* 注意:
* 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
* 2.队列名可以任意取值,但需要与消息接收者一致。
* 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。*/createChannel.queueDeclare("myQueue", true, false, false,null);/*** 接收消息。会持续坚挺,不能关闭channel和Connection
* 参数一:队列名称
* 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
* 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
* 参数四:消息接收者*/createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(createChannel) {
@Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throwsIOException {
String string= new String(body, "UTF-8");
System.out.println("接收到d消息: -》 " +string);
}
});
}catch(Exception e) {
e.printStackTrace();
}finally{
}
}
}
注意:消息的确认模式可以为自动也可以为手动,自动确认读取完会自动从队列删除;手动需要自己ack,如果设为手动也没ack可能会造成消息重复消费。
如果是多个消费者,会从队列以轮询的方式处理消息,这种称为工作队列模式。
补充:这种实际也是用了rabbitmq的一个默认交换机,routing_key为队列名称。也可以理解为是Rabbitmq类型为System的交换机。
测试:修改消费者代码
createChannel.basicConsume("myQueue", true, "", newDefaultConsumer(cre
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。