赞
踩
官网教程
点对点,一个生产者,一个消费者,一个队列。
特点:
dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>5.73</version>
</dependency>
</dependencies>
public class MQConnectionUtils { private static final String IP = "118.25.188.37"; private static final Integer PORT = 5672; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; public static Connection newConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost(IP); //设置端口号 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setUsername(USERNAME); factory.setPassword(PASSWORD); //创建连接 Connection connection = factory.newConnection(); return connection; } }
public class Producer { private static final String QUEUE_NAME = "mq"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = MQConnectionUtils.newConnection(); // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "直接模式消息发送"; System.out.println("生产者发送消息:" + msg); // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.close(); newConnection.close(); } }
发送消息后mq会出现待消费的消息
public class Customer { private static final String QUEUE_NAME = "mq"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = MQConnectionUtils.newConnection(); // 2.获取通道 Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); } }; // 3.监听队列 true表示自动应答,false表示手动应答 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
消费完后mq消息就没有了
与点对点不同的是,消费者由1个变成了两个,消费者集群了
我们这里启动两个消费者
然后发送10条消息
看看结果:
可以看到实现的是均摊消费
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
这里第二个参数表示应答模式为true,表示自动签收
这里我们将 应答模式设置为false
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
然后向消费者发送10个消息
可以看到消费者接收到了10个消息,但是我现在如果停止消费者
发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
这里我们需要加上这个channel.basicAck(envelope.getDeliveryTag(), false);
public class Customer { private static final String QUEUE_NAME = "mq"; public static void main(String[] args) throws Exception { System.out.println("消费者2启动"); // 1.获取连接 Connection newConnection = MQConnectionUtils.newConnection(); /* 2.获取通道 */ Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //监听队列 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 3.监听队列 true表示自动应答,false表示手动应答 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
这样就表示消费者接受消息成功了
实现:添加如下代码channel.basicQos(1);
public class Customer { private static final String QUEUE_NAME = "mq"; public static void main(String[] args) throws Exception { System.out.println("消费者2启动"); // 1.获取连接 Connection newConnection = MQConnectionUtils.newConnection(); /* 2.获取通道 */ Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //表示一次只消费一个消息 channel.basicQos(1); //监听队列 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); //手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 3.监听队列 true表示自动应答,false表示手动应答 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。