赞
踩
在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的序列用来处理一系列的输入,通常是来自用户的。消息队列提供了异步的通信协议,每一个序列中的记录包含了详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息保存在队列中,直到接收者取回它。
消息队列常常保存在链表结构中。拥有权限的进程才可以向消息队列中写入或读取消息
目前,有很多消息队列有很多的实现,包括
JBoss Messing
、JORAM
、Apache
、ActiveMQ
、SunPoen Message Queue
、IBM MQ
、Apache Qpid
和HTTPSQS
当前使用较多的消息队列有:
RabbitMQ
、RocketMQ
、ActiveMQ
、Kafka
、ZeroMQ
、MetaMq
等而部分数据库如:Redis,Mysql,以及phxsql也可以实现消息队列的功能。
MQ是消费者-生产者模型中的一个典型代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
1.AMQP
,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与中间件可传递消息,并不受客户端/中间件影响,不同的开发语言等条件的限制。
2.AMS
,即java消息服务(java Message Service)应用程序接口,是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供服务。常见的消息列队,大部分都实现了JMI API 如:ActiveMQ
,Redis
以及RabbitMQ
等
应用耦合、异步处理、流量削峰
解耦:
传统模式
传统模式缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将D系统接入,系统A还需要修改代码,太麻烦!
中间件模式:
中间件模式优点:
将消息写入列队,需要消息的系统自己从消息列队中订阅,从而系统A不需要做如何修改。
异步
传统模式:
传统模式缺点:
一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式优点:
使用消息队列发送消息,减少耗时。
削峰
传统模式:
传统模式缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常。
中间件模式:
中间件模式的优点:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的最高峰期积压是允许的。
系统可用性低、系统复杂性增加
消息列队,是分布式系统中重要的组件,其通用的使用场景可以简单的描述为:当不需要立即获取结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息列队的时候。
在项目中,将一些无需及时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器请求的响应时间,从而提高了系统的吞吐量。
AMQP,即Advanced Meassage Queueing Protocol,高级消息列队协议,是应用层的一个开发标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息的使用者的存在,反之亦然。
AMQP的主要特征是面向消息、列队、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang
语言编写,支持多种客户端,如:Python
、Ruby
、.NET
、Java
、JMS
、C
、PHP
、ActionScript
、XMPP
、STOMP
等,支持AJAX
。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
总结如下
brew install erlang
erlang对应Rabbit版本:
测试erlang是否安装成功:
官网下载地址:https://www.rabbitmq.com/download.html
开启RabbitMQ图形化管理界面插件:rabbitmq-plugins enable rabbitmq_management
、关闭RabbitMQ图形化管理界面插件:rabbitmq-plugins disable rabbitmq_management
rabbitmq-plugins list
指令查看 rabbitmq 的插件启动情况:开启RabbitMQ服务rabbitmq-service
、关闭RabbitMq服务rabbitmqctl stop
在浏览器访问localhost:15672
进入rabbitmq图形界面管理登陆系统:
默认用户名:guest ,默认密码: guest
登陆之后进入rabbitmq图形界面管理系统:
/web
的虚拟主机:
创建成功:
每次创建虚拟主机guest用户会默认加入虚拟主机
web
的用户:添加成功:
web
用户添加到虚拟主机:添加成功:
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
简单列队:生产者将消息发送到“hello”队列。消费者从该队列接收消息。
/** * 简单队列生产者 * @author haoruijie */ public class Send { //定义队列名称 private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); //端口号 connectionFactory.setPort(5672); //用户名 connectionFactory.setUsername("web"); //用户密码 connectionFactory.setPassword("web"); //虚拟主机名 connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ /** * 第一个参数(queue)绑定队列 * 第二个参数(durable)持久化 * 第三个参数(Exclusive)排他队列 * 第四个参数(Auto-delete)自动删除 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(message); } } }
启动生产者服务:
消息堵塞:
/** * 普通队列消费者 * @author haoruijie */ public class Recv { //定义队列名称 private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); //端口号 connectionFactory.setPort(5672); //用户名 connectionFactory.setUsername("web"); //用户密码 connectionFactory.setPassword("web"); //虚拟主机名 connectionFactory.setVirtualHost("/web"); //创建信道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{}); } }
启动消费者:
消费者消费消息:
工作列队:(一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!)
/** * 工作队列-生产者 * @author haoruijie */ public class Send { //定义队列名称 private static final String QUEUE_NAME = "work_fair"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 for (int i = 0; i < 20; i++) { channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes(StandardCharsets.UTF_8)); System.out.println(message+i); } } } }
启动生产者:
消息堵塞:
/** * 工作队列-公平-消费者 * @author haoruijie */ public class Recv01 { //定义队列名称 private static final String QUEUE_NAME = "work_fair"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //限制消费1条消息,消费完在继续消费下一天消息(限流) int prefetchCount = 1; channel.basicQos(prefetchCount); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); //消费者01接收一条消息后休眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag->{}); } }
启动消费者:每个消费者每隔10 毫秒取一条消息
消费者01:成功取得列队消息->
消费者02:成功取得列队消息->
(消费者02代码和消费者01一样)
一个生产者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
/** * 发布/订阅队列-生产者 */ public class Send { //定义发布队列名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ /** * 绑定交换机 * 1.交换机名称 * 2.交换机的类型,fanout就是广播 (只要) */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(message); } } }
启动成功:成功将消息绑定到交换机上
/** * 发布/订阅队列-消费者 * @author haoruijie */ public class Recv01 { //定义订阅队列名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定 channel.queueBind(queue,EXCHANGE_NAME,""); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
启动订阅消费者:所有订阅消费者都可以获得消息。
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
/** * 路由队列-生产者 */ public class Send { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ /** * 绑定交换机 * 1.交换机名称 * 2.交换机的类型,dorect是路由 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //准备消息 String message = "Hello world"; /** * 发送消息 * 交换机名 * 交换机key */ channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"green",null,message.getBytes(StandardCharsets.UTF_8)); } } }
启动路由生产者:
会将消息发送到名为
EXCHANGE_NAME
的交换机中,分别将消息key设置为orange
和green
/** * 路由队列-消费者 */ public class Secv01 { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定 channel.queueBind(queue,EXCHANGE_NAME,"black"); channel.queueBind(queue,EXCHANGE_NAME,"green"); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
启动消费者:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HXBlxAVB-1634903973672)(/Users/haoruijie/Library/Application Support/typora-user-images/image-20211022135436669.png)]
消费者01会取到交换机名为
EXCHANGE_NAME
,key值为green
和orange
而消费者02只会收到key值为orange
的消息
通过通配符模式来判断路由key通俗的来讲就是模糊匹配。
*.匹配一个字符 #.匹配所有字符
/** * 主题-路由队列-生产者 * @author haoruijie */ public class Send { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ /** * 绑定交换机 * 1.交换机名称 * 2.交换机的类型,TOPIC主题路由列队 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //准备消息 String message1 = "Hello world1"; String message2 = "Hello world2"; String message3 = "Hello world3"; //设置交换机key值 String routingKey1 = "quick.orange.rabbit"; String routingKey2 = "lazy.pink.rabbit"; String routingKey3 = "quick.hello.male"; //发送消息 channel.basicPublish(EXCHANGE_NAME,routingKey1,null,message1.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,routingKey2,null,message2.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,routingKey3,null,message3.getBytes(StandardCharsets.UTF_8)); } } }
启动路由生产者:
会将消息发送到名为
EXCHANGE_NAME
的交换机中,分别将消息key设置为routingKey1
、routingKey2
和routingKey3
/** * 主题-路由队列-消费者 * @author haoruijie */ public class Recv01 { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定绑定key channel.queueBind(queue,EXCHANGE_NAME,"*.orange.*"); channel.queueBind(queue,EXCHANGE_NAME,"lazy.#"); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
启动消费者
运行结果:
消费者01只会匹配
routingKey1
和routingKey2
的消息
使用事务会大幅度降低性能 (一般不会使用) 开启事务会知道生产者是否将消息成功提交到列队里
/** * 事务队列-生产者 * @author haoruijie */ public class Send { //定义队列名称 private static final String QUEUE_NAME = "tx"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //开启事务 channel.txSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); //制造异常(遇到异常事务回滚) int a = 1/0; //提交事务 channel.txCommit(); }catch (Exception e){ //事务回滚 channel.txRollback(); e.printStackTrace(); }finally { //关闭连接 if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
/** * 事务队列消费者 * @author haoruijie */ public class Recv { //定义队列名称 private static final String QUEUE_NAME = "tx"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("yeb"); connectionFactory.setPassword("yeb"); connectionFactory.setVirtualHost("/yeb"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; /** * 消费消息 * 列队名称 * 消费确认 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{}); } }
启动消费者:
启动生产者: 发现异常事务回调 消费者没有消息消费
(确认生产者是否把消息发送到了服务器)
(同步确认会影响性能一般不会使用)
/** * 确认-同步-生产者 (会影响性能) * @author haoruijie */ public class Send { //定义队列名称 private static final String QUEUE_NAME = "sync"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //启动确认模式 channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); //普通确认,只能单条确认 if(channel.waitForConfirms()){ System.out.println("确认成功!"); } //普通批量确认 ,如果有一条不成功就会抛异常,全部成功不会抛异常 //channel.waitForConfirmsOrDie(); }catch (Exception e){ e.printStackTrace(); }finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
启动生产者:
发送消息成功:
(异步确认效率是最高的)
/** * 确认-异步-生产者 (效率最高) */ public class Send { //定义队列名称 private static final String QUEUE_NAME = "async"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("yeb"); connectionFactory.setPassword("yeb"); connectionFactory.setVirtualHost("/yeb"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { final SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<Long>()); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //启动确认模式 channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //天际channel监听 channel.addConfirmListener(new ConfirmListener() { //已确认 @Override public void handleAck(long l, boolean b) throws IOException { //b为true确认多条成功 为false确认单条成功 if (b){ System.out.println("确认多条成功"); set.headSet(l+1L).clear(); }else { System.out.println("确认单条成功"+l); set.remove(l); } } //未确认 @Override public void handleNack(long l, boolean b) throws IOException { //b为true多条未确认 为false单条未确认 if (b){ System.out.println("多条未确认"); set.headSet(l+1L).clear(); }else { System.out.println("单体未确认"+l); set.remove(l); } } }); int i =0; while (i<20){ i++; //准备消息 String message = "Hello world"+i; Long seqNo = channel.getNextPublishSeqNo(); //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); set.add(seqNo); System.out.println("[x] Sent'"+message+"'"); } }catch (Exception e){ e.printStackTrace(); }finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
启动生产者:
确认发送消息成功:
消息发送成功:
#配置端口号
server:
port: 8002
spring:
#Rabbitmq生产出配置
rabbitmq:
#ip
host: 127.0.0.1
#用户名
username: guest
#密码
password: guest
#端口
port: 5672
@Component
public class Test {
@RabbitListener(queues = "hello")
public void demo(String hello){
System.out.println(hello);
}
}
@Configuration
public class RabbitmqConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
}
启动成功hello列队已存在
#配置端口号 server: port: 8001 spring: #Rabbitmq生产出配置 rabbitmq: #ip host: 127.0.0.1 #用户名 username: web #密码 password: web #端口 port: 5672
@Component
public class Test {
@Autowired
private RabbitTemplate template;
public void demo(){
template.convertAndSend("hello","hello world");
}
}
发送被消费者立即消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。