赞
踩
RabbitMQ
说明: 生产者将消息交给默认的交换机(AMQP default),交换机将获取到的信息绑定这个生产者对应的队列上,监听当前队列的消费者获取消息,执行消息消费。
应用场景: 短信
发布者
// An highlighted block public static void main(String[] args) throws IOException, TimeoutException { //建立连接 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("localhost"); cf.setPort(5672); Connection connection = cf.newConnection(); //建立信道 Channel channel = connection.createChannel(); //创建交换机 //创建消息队列 /* queue – 队列名 durable – 是否持久化,如果为true则会持久化 (服务器重启后仍存在) exclusive – 是否独占,true则为独占队列,其他链接不能使用 autoDelete – 是否自动删除消息,true则在使用完后删除 arguments – map类型,其他参数 * */ channel.queueDeclare("queue52",false,false,true,null); //发送消息 /* exchange 交换机 routing-key 路由键 properties 属性 body 消息体 */ String message = "苟哥遛狗"; channel.basicPublish("","queue52",null,message.getBytes("UTF-8")); System.out.println("[x] 消息发送成功! "+message); } ;
消费者
// An highlighted block public static void main(String[] args) throws IOException { //建立连接 //建立信道 Channel channel = ConnUtils.connect(); //创建交换机 //创建消息队列 //接收消息 /** * queue 队列名 * deliverCallback 收到消息处理 * cancelCallback 取消消息处理 */ channel.basicConsume(Constants.SIMPLE_QUEUE, (consumerTag,delivery)->{ //处理消息 byte[] body = delivery.getBody(); String message = new String(body,"utf-8"); System.out.println("[x] 收到消息"+message); },(consumerTag)->{}); };
说明: 生产者将消息交给默认的交换机(AMQP default),交换机将获取到的信息绑定这个生产者对应的队列. 由于监听这个队列的消费者较多,并且消息只能有一个被消费,就会造成消息竞争
应用场景: 抢红包,和资源任务调度
说明: 生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑
应用场景:广告
发布者
//建立连接 //建立信道 Channel channel = ConnUtils.connect(); //(1)创建交换机,类型是FANOUT channel.exchangeDeclare(Constants.SUB_EXCHANGE, BuiltinExchangeType.FANOUT); //(2)创建队列移到消费者端执行 //发送消息 /* exchange 交换机 routing-key 路由键 properties 属性 body 消息体 */ Scanner scanner = new Scanner(System.in); while(true) { print("请输入您要发送的消息"); String message = scanner.next(); if("exit".equals(message)){ break; } //(3) 发给交换机,不提供routing-key channel.basicPublish(Constants.SUB_EXCHANGE, "", null, message.getBytes("UTF-8")); print("[x] 消息发送成功! " + message); } } private static void print(String str) { System.out.println("发布者:"+str); }
消费者
public class SubConsumer { private String name; public SubConsumer(String name) { this.name = name; } public void consume() throws IOException { //建立信道 Channel channel = ConnUtils.connect(); //创建交换机 //创建消息队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,Constants.SUB_EXCHANGE,""); //接收消息 /** * queue 队列名 * autoAck 自动确认收到 * deliverCallback 收到消息处理 * cancelCallback 取消消息处理 */ channel.basicConsume(queueName,true, (consumerTag,delivery)->{ //处理消息 byte[] body = delivery.getBody(); String message = new String(body,"utf-8"); System.out.println(name + "[x] 收到消息"+message); },(consumerTag)->{}); } }
发布者
//建立连接 //建立信道 Channel channel = ConnUtils.connect(); //(1)创建交换机,类型是DIRECT channel.exchangeDeclare(Constants.ROUTE_EXCHANGE, BuiltinExchangeType.DIRECT); //创建队列移到消费者端执行 //发送消息 /* exchange 交换机 routing-key 路由键 properties 属性 body 消息体 */ Scanner scanner = new Scanner(System.in); while(true) { print("请输入您要发送的消息"); String message = scanner.next(); print("请输入您的routing-key"); String routingKey = scanner.next(); if("exit".equals(message)){ break; } //(2) 发给交换机,提供routing-key channel.basicPublish(Constants.ROUTE_EXCHANGE, routingKey, null, message.getBytes("UTF-8")); print("[x] 消息发送成功! " + message); } } private static void print(String str) { System.out.println("发布者:"+str); }
下面展示一些 内联代码片
。
消费者
private String name; //(1) 加入bindingKey,用于队列与交换机的绑定 private String bindingKey; public RouteConsumer(String name, String bindingKey) { this.name = name; this.bindingKey = bindingKey; } public void consume() throws IOException { //建立信道 Channel channel = ConnUtils.connect(); //创建交换机 //创建消息队列 String queueName = channel.queueDeclare().getQueue(); //(2)通过bindingkey来绑定交换机 channel.queueBind(queueName,Constants.ROUTE_EXCHANGE,bindingKey); //接收消息 /** * queue 队列名 * autoAck 自动确认收到 * deliverCallback 收到消息处理 * cancelCallback 取消消息处理 */ channel.basicConsume(queueName,true, (consumerTag,delivery)->{ //处理消息 byte[] body = delivery.getBody(); String message = new String(body,"utf-8"); System.out.println(name + "[x] 收到消息"+message); },(consumerTag)->{}); }
测试
public class RouteApp
{
public static void main( String[] args ) throws IOException {
new RouteConsumer("小周","hotpot").consume();
new RouteConsumer("小李","cc").consume();
new RouteConsumer("小朱","hotpot").consume();
}
}
发布者
//建立连接 //建立信道 Channel channel = ConnUtils.connect(); //(1)创建交换机,类型是TOPIC channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); //创建队列移到消费者端执行 //发送消息 /* exchange 交换机 routing-key 路由键 properties 属性 body 消息体 */ Scanner scanner = new Scanner(System.in); while(true) { print("请输入您要发送的消息"); String message = scanner.next(); print("请输入您的routing-key"); String routingKey = scanner.next(); if("exit".equals(message)){ break; } //(2) 发给交换机,提供routing-key channel.basicPublish(Constants.TOPIC_EXCHANGE, routingKey, null, message.getBytes("UTF-8")); print("[x] 消息发送成功! " + message); } } private static void print(String str) { System.out.println("发布者:"+str); }
消费者
public class TopicConsumer { private String name; //(1) 加入bindingKey,用于队列与交换机的绑定 private String bindingKey; public TopicConsumer(String name, String bindingKey) { this.name = name; this.bindingKey = bindingKey; } public void consume() throws IOException { //建立信道 Channel channel = ConnUtils.connect(); //创建交换机 //创建消息队列 String queueName = channel.queueDeclare().getQueue(); //(2)通过bindingkey来绑定交换机 channel.queueBind(queueName,Constants.TOPIC_EXCHANGE,bindingKey); //接收消息 /** * queue 队列名 * autoAck 自动确认收到 * deliverCallback 收到消息处理 * cancelCallback 取消消息处理 */ channel.basicConsume(queueName,true, (consumerTag,delivery)->{ //处理消息 byte[] body = delivery.getBody(); String message = new String(body,"utf-8"); System.out.println(name + "[x] 收到消息"+message); },(consumerTag)->{}); } }
测试
public class TopicApp
{
public static void main( String[] args ) throws IOException {
// 特点.颜色.动物名 lazy.orange.rabbit happy.pink.pig
new TopicConsumer("*.orange.* ","*.orange.*").consume();
new TopicConsumer("*.*.rabbit ","*.*.rabbit").consume();
new TopicConsumer("lazy.# ","lazy.#").consume();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。