赞
踩
RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/
我们在Centos虚拟机中使用Docker来安装
docker pull rabbitmq
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \ # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \ # 设置环境变量密码
--name mq \ # 队列名称
--hostname mq1 \ #配置主机名
-p 15672:15672 \ # MQ管理端口
-p 5672:5672 \ #MQ消息传输端口
-d \ # 后台运行
rabbitmq
交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的
在RabbitMQ中:
channel
:操作MQ的工具
exchange
:路由消息到队列中
queue
:缓存消息
virtual host:
虚拟主机,是对queue,exchange等资源的逻辑分组
这两种并没有用到交换机,而是直接到达队列
Fanout Exchange:
广播Direct Exchange:
路由Topic Exchange:
主题publisher:
消息发布者,将消息发送到队列queue
queue:
消息队列,负责接收并缓存消息
consumer:
订阅队列,处理队列中的消息
java模型(消息发布者)
@Test public void test() throws IOException,TimeoutException{ //1.建立连接,与消息队列进行连接 ConnetionFactory factory =new ConnetionFactory(); //设置连接参数,主机名,端口号,vhost,用户名,密码 factory.setHost(192.168.75.136); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword(""); //建立连接 Connection connection =factory.newConnection(); //创建通道Channel,就可以向队列发送消息了 Channel channel =connection.createChannel(); //创建队列 String queuename="hlh"; channel.queueDeclare(queuename,false,false,false,null); //发送消息 String message="hello"; channel.basicPublish("",queuename,null,message.getBytes()); //关闭通道和连接 channel.close(); connection.close(); }
java模型(消息消费者)
//1.建立连接,与消息队列进行连接 ConnetionFactory factory =new ConnetionFactory(); //设置连接参数,主机名,端口号,vhost,用户名,密码 factory.setHost(192.168.75.136); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword(""); //建立连接 Connection connection =factory.newConnection(); //创建通道Channel,就可以向队列发送消息了 Channel channel =connection.createChannel(); //创建队列 String queuename="hlh"; channel.queueDeclare(queuename,false,false,false,null); //订阅消息 channel.basicConsume(queuename,true,new DefaultConsumer(channel){ @Override //处理消息的代码,绑定函数,有了消息才执行 public void handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,byte[] body)throws IOException{ //处理消息 String message=new String(body); } })
注意:上边生产者消费者都创建了队列:
这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复
是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现
spring-amqp
的依赖spring:
rabbitmq:
host: 192.168.75.136 #主机名
port: 5672 #端口
virtual-host: / #虚拟主机
username: itcast #用户名
password: #密码
public class springamqptest{
@Autowired
private RabbitTemplate rabbittemplate;
@Test
public void test(){
String queuename="hlh.queue";
String message="hello";
rabbittemplate.convertAndSend(queuename,message);
}
}
@Component
public class SpringrabbitListener {
@RabbitListener(queues="hlh.queue")
public void listenSimple(String msg) throws InterruptedException{
//消费逻辑代码
}
}
注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能
Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积
一个消息队列绑定多个消费者
假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:
@Component
public class SpringrabbitListener {
@RabbitListener(queues="hlh.queue")
public void listenSimple(String msg) throws InterruptedException{
//消费逻辑代码
}
@RabbitListener(queues="hlh.queue")
public void listenSimple2(String msg) throws InterruptedException{
//消费逻辑代码
}
}
通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除
指的每个消费者每次取多少条消息:
可以通过配置进行配置:
spring:
rabbitmq:
host: 192.168.75.136
port: 5672
virtual-host: /
username: itcast
password:
listener:
simple:
prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息
发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
交换机的作用:
SpringAMQP提过了声明交换机,队列,绑定关系的API:
Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机
@Configuration public class FanoutConfig{ // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //声明一个队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } // 绑定队列跟交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } }
此时的生产者如何发送消息:
public void test(){
//给出交换机名称
String exchangeName="itcast.fanout";
String message="hello";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
监听者如何收到消息
@RabbitListener(queues="fanout.queue1")
public void listener(String msg){
//处理得到的消息
}
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
一个队列可以指定多个Key
我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解
@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){
//进行消息的处理
}
在生产者生产时:
public void test(){
//给出交换机名称
String exchangeName="itcast.fanout";
String message="hello";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.
分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:
代指0个或多个单词
*:
代指一个单词
同样也是使用 @RabbitListener进行声明
@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){
//进行消息的处理
}
生产者生产消息:
public void test(){
//给出交换机名称
String exchangeName="itcast.fanout";
String message="hello";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
这样发送的消息就会使用自定义的转换类型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。