赞
踩
本文主要介绍Rabbitmq消息中间件里面的,Exchang(交换机)、交换机类型解释、Queue(队列)大家一起探讨 -
1.RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息中间件),RabbitMQ服务器是用Erlang语言编写的。是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名。
2.AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
1.开源、可靠性。提供可靠性消息投递模式(confirm)、返回模式(return),消息确认模式(ack)
2.灵活的路由
3.集群、高可用
4.支持市面上大部分的语言(java、c、Python、PHP、Swift、GO等等)
1.Exchang
也称为交换机,和路由配合,用于接收生产者的消息,根据路由规则将消息投递到绑定的队列。
2.Routing Key
路由规则,交换机通过路由规则来投递消息到对应的队列
3.Binding
用于将Exchang和Queue、RoutingKey进行绑定,建立关系
4.Queue
消息队列,用于接收Exchang投递的消息
RabbitMQ的交换机有4中类型:direct、fanout、topic、headers
1.direct类型,直连交换机,使用这种类型的交换机,投递消息的时候,生产者生产消息的Routing Key要和队列绑定的Routing Key完全匹配,才能将消息投递到对应的队列。
获取连接工厂:
public class RabbitmqConnectionFactory { /** * 获取一个rabbitmq连接工厂 * @return */ public static ConnectionFactory getConnectionFactory(){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");// 主机 factory.setPort(5672);// 端口号 factory.setVirtualHost("/");// 虚拟机 factory.setUsername("admin");// 用户名 factory.setPassword("admin");// 密码 factory.setAutomaticRecoveryEnabled(true);// 是否支持自动重连 factory.setNetworkRecoveryInterval(3000);// 多久重连一次 return factory; } }
创建消费者:
public class DirectConsumer { // 队列 private static final String QUEUE = "test.direct.queue"; // 交换机 private static final String EXCHANG_NAME = "test.direct.exchang"; // 路由 private static final String ROUTING_KEY = "test.direct"; public static void main(String args[]) throws Exception { // 1、获取连接工厂 ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory(); // 2、创建连接 Connection connection = connectionFactory.newConnection(); // 3、创建通道 Channel channel = connection.createChannel(); // 4、创建交换机指定类型为direct channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT, true, false, false, null); // 创建队列 channel.queueDeclare(QUEUE, true, false, false, null); // 队列、交换机和路由绑定 channel.queueBind(QUEUE, EXCHANG_NAME, ROUTING_KEY); // 5、创建消费者 MyConsumer consumer = new MyConsumer(channel); channel.basicConsume(QUEUE, true, consumer); } }
创建生产者:
public class DirectProducer { // 交换机 private static final String EXCHANG_NAME = "test.direct.exchang"; // 这个路由要和队列绑定的路由完全相同才能投递到 private static final String ROUTING_KEY = "test.direct"; public static void main(String args[]) throws Exception { // 1、获取连接工厂 ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory(); // 2、创建连接 Connection connection = connectionFactory.newConnection(); // 3、创建 通道 Channel channel = connection.createChannel(); // 4、发送消息到交换机,交换机根据路由规则投递到队列 String msg = "this is direct exchang msg!"; channel.basicPublish(EXCHANG_NAME, ROUTING_KEY, null, msg.getBytes()); // 5、关闭 channel.close(); connection.close(); } }
2.fanout,扇形交换机,也可以称为广播交换机,这种类型的交换机可以忽略路由Routing Key,可以将消息投递到所有绑定了这个交换机的队列
3.topic,主题交换机,Routing Key==Binding Key,支持模糊匹配,这种类型的交换机可以支持路由key全匹配,也支持模糊匹配,模糊匹配有两种方式:
test.topic.*,*号可以匹配一个单词,比如test.topic.user,test.topic.order都能匹配到,但是test.topic.user.save就不能匹配。
test.popic.#,#号支持多个单词匹配,比如test.topic.user,test.topic.user.save,test.topic.user.delete都可以匹配。
4、headers,头交换机,这个类型的交换机和fanout类型类似,在没有指定RoutingKey的时候,可以投递到所有绑定这个交换机的队列,这个类型的交换机主要根据header里面的参数来进行路由,在队列和交换机,路由建立绑定关系的时候配置头信息,里面有一个参数:x-match,这个参数有两个属性:
all:一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any:一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
创建消费者:
public class HeadersConsumer { // 队列 public static final String QUEUE = "test.header.queue"; // 交换机 public static final String EXCHANG_NAME = "test.header.exchang"; public static void main(String args[]) throws Exception { // 1、获取连接工厂 ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory(); // 2、创建连接 Connection connection = connectionFactory.newConnection(); // 3、创建通道 Channel channel = connection.createChannel(); // 4、创建队列、交换机、绑定 channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.HEADERS, true); channel.queueDeclare(QUEUE, true, false,false, null); // x-match属性: // all:一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机 // any:一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机 Map<String, Object> headers = new HashMap<>(); headers.put("name", "zhangsan");// 消费者设置了name属性 headers.put("x-match", "all"); channel.queueBind(QUEUE, EXCHANG_NAME, "", headers); // 5、创建消费者 MyConsumer consumer = new MyConsumer(channel); channel.basicConsume(QUEUE, true, consumer); } }
创建生产者:
public class HeadersProducer { // 交换机 public static final String EXCHANG_NAME = "test.header.exchang"; public static void main(String args[]) throws Exception { // 1、获取连接工厂 ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory(); // 2、创建连接 Connection connection = connectionFactory.newConnection(); // 3、创建通道 Channel channel = connection.createChannel(); // 4、发送消息,这里的头信息 Map<String, Object> headers = new HashMap<>(); // 这个属性要根据消费者设置的x-match的属性才能投递到 // 消费者如果设置x-match=all,那么这里的key要和消费者设置的可以完全匹配,才能投递到 // 消费者如果设置x-match=any,那么这里的可以和消费者设置的其中一个key匹配就能投递到 headers.put("name", "zhangsan"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build(); String msg = "this is headers exchang msg !"; channel.basicPublish(EXCHANG_NAME, "", properties, msg.getBytes()); // 5、关闭 channel.close(); connection.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。