赞
踩
上篇我们已经介绍过了mq的安装以及基本使用,这节我们主要要讲一下mq的使用。在mq中我们会讲解他的5中工作模式:
1、生产者:负责生产消息的。通常用字母 P 表示。
2、 队列就像存放商品的仓库或者商店,是生产商品的工厂和购买商品的用户之间的中转站。队列就像是一个仓库或者流水线。
3、 交换机,在生产者和消息队列之间的交换器,功能类似于网络宽带的交换机,可以根据不同的关键字,将信息发送到不同的队列。
4、routingKey:交换机与队列绑定的中间关联
5、channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
6、消费者: 消费者就好比是从商店购买或从仓库取走商品的人,消费的意思就是接收。消费者是一个程序,主要是等待接收消息。我们的用“C”表示
这上面的几个之后我们在使用mq的时候会用到。
1、首先生产者将消息发送给交换机
2、交换机通过routingKey将消息发送给与之绑定的队列(如果不适用交换机的话,那么就直接将消息发送到队列)
3、消费者监听某个队列,如果有消息就去收取。
4、如果是自认确认机制的话,那么就返回一个确认码给mq服务端,说已经接到,然后mq会将这个消息从队列移除。如果是手动机制的话,那么就当消费者消费完消息后,手动返回一个ack码给mq服务端,如果服务端没有收到该返回码,会认为消息没有送达,会启动重新发送,直到收到确认码为止,这个有点像http的握手连接。确保消息被准确送达。
在mq中简单模式是一种最为基本的mq的使用,这个只是用到我们上面的三种名词,即消费者、生产者和队列,channel不管是哪个模式都必须得有的。从官网拿取了一个图(ps这个图就是我要说的):
这个图就很清楚的看出,简单模式的工作特点,其实就是是生成者直接将消息发送给队列,消费者直接从队列取消息,中间没有其他的东西,并且1对1的。那么代码如何去实现他呢?
在代码中去实现他,也分为上面几个步骤
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
生产者代码
public static void main( String[] args ) throws IOException, TimeoutException, InterruptedException
{
//创建mq的连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// factory.setVirtualHost("/admin");//这个就是你自己设置,默认为/
//创建一个连接
Connection con=factory.newConnection();
//创建一个频道
Channel channel=con.createChannel();
//创建一个队列
channel.queueDeclare("first", true, false, false, null);
for(int i=0;i<10;i++){
channel.basicPublish("", "first", null, ("zjfirst:"+i).getBytes());
System.out.println("Send Message is: ok"); }
channel.close();
con.close();
}
到这里我们生产者发送的消息就发送成功了。
其实这个时候其实你能从我们的页面看到我们的消息,总共发了多少条,如果所示
因为我们这边是没有消费者去消费,所以上面能看到消息的数量,如果你消费者消费完就没有了。
而消费者代码的步骤其实也和生产者差不多,无非都的是,消费者要去今天生产者设置的队列
消费者代码实现
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection con=factory.newConnection();
final Channel channel=con.createChannel();
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
// TODO Auto-generated method stub
String message = new String(body, "UTF-8");
System.err.println("+++++++++ok1:"+message);
};
channel.basicConsume("first", true, consumer); //true代表自动确认消息,false代表手动确认消息;
}
这样消费者就完成了,这个时候启动消费者的代码的时候,就会发现消费者能收到生产者的消息,如图
这个时候再去管理界面看的时候,发现队列的数据已经为0了,因为消费者消费完成了。
这个就是最简单的模式。
其实工作模式,可以理解为简单模式的升级版,他是允许一对多的方式。
可以从图中看到是有多个消费者监听这个队列,那么处理的速度就会快很多。至于代码怎么实现,其实就是在加入了一个消费者去监听这个队列,和平常的没有什么不同的。
但是在这里会出现一个比较大的问题,就是但我们多个消费者去消费消息的时候,他的消息是怎么分配的?默认的是轮询模式。
何为轮询模式?其实就是将第一个消息发给第一个消费者,第二个发给第二个,这样轮询发送。看着是不是还是挺好的,但是如果在某个服务器出现压力的时候,这个时候如果还是轮询模式,是不是不太好啊,轮询模式是不会管你服务器的处理能力,只会平均将消息平分给每一个消费者,这样的话其实是不可取的。因为我们线上的时候,有些服务器好一点,我们就希望他多处理一点,差一点的就少处理一点,那么这个怎么解决呢?在mq中其实也帮我们解决了这个问题,记得之前说消息的确认机制吗,默认我们给他的是自动确认,但是我们可以自己设置为手动确认。mq也帮我们解决了这种问题,我们可以将这种轮询模式改成公平模式,那么处理消息就根据处理的能力来处理。在mq中我们可以通过手动确认来实现这个公平模式。
其实也挺简单的设置的,我们只要设置每个消费者每次只能接收一个消息,然后在没有返回确认码之前就不会接收下一条消息,而消息确认是我们自己手动确认的,那么就完美的解决了这个问题,服务器差的执行慢,那么返回ack也就慢,快的自然就能多处理一点。
至于代码实现如下所示
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection con=factory.newConnection();
final Channel channel=con.createChannel();
channel.basicQos(0,1,false); //这个是每次只处理一条数据,只有接收到ack确认码,才去拿取下一条消息
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
// TODO Auto-generated method stub
String message = new String(body, "UTF-8");
System.err.println("+++++++++ok1:"+message);
channel.basicAck(envelope.getDeliveryTag(), true);//手动返回确认码
}
};
channel.basicConsume("first", false, consumer); //这个设置为false为手动确认
}
这样的话我们发现我们起了两个消费者,然后设置sleep的时候,发现不在是每个消费的数量相同了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。