赞
踩
在maven中添加依赖。
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.9.0</version>
- </dependency>
之前我导入了RabbitMQ最新的包(5.5.0),然后了一个错误
Unsupported major.minor version 52.0
是因为我本机装的jdk1.7,但5.x系列的版本需要JDK 8支持,4.x系列的版本支持JDK 6,我又换成了4.X的包。
附stanford parser和jdk版本对应关系:
- J2SE 8 = 52,
- J2SE 7 = 51,
- J2SE 6.0 = 50,
- J2SE 5.0 = 49,
- JDK 1.4 = 48,
- JDK 1.3 = 47,
- JDK 1.2 = 46,
- JDK 1.1 = 45
- package utill;
-
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 获取MQ连接工具
- * @author liucong 2018/11/13
- */
- public class ConnectionUtils {
- /**
- * 获取连接方法
- * @return connection
- */
- public static Connection getConnection() throws IOException, TimeoutException {
- //1.定义连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //2.设置服务地址
- connectionFactory.setHost("127.0.0.1");
- //3.设置端口(AMQP协议的端口,类似Mysql的3306)
- connectionFactory.setPort(5672);
- //4.设置账号,密码,vhost
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- //5.获得连接
- Connection connection = connectionFactory.newConnection();
- return connection;
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package producer;
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import org.junit.Test;
- import utill.ConnectionUtils;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 生产者发送数据到消息队列
- * @author liucong 2018/11/13
- */
-
- public class Producer {
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "QUEUE_test";
-
- /**
- * 发送消息方法
- * @throws IOException
- * @throws TimeoutException
- */
- @Test
- public void sendMsg() throws IOException, TimeoutException {
- //1.通过工具类获取MQ连接
- Connection connection = ConnectionUtils.getConnection();
- //2.从连接中创建通道
-
- Channel channel = connection.createChannel();
- //3.创建队列(让路由器知道往哪个队列里发送消息)
- boolean durable = false;
- boolean exclusive = false;
- boolean autoDelete = false;
- channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete,null);
- //4.创建消息
- String msg = "Hello QUEUE_test !!";
- //5.发送消息
- channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
- System.out.println(msg + "发送成功");
- //6.关闭连接
- channel.close();
- connection.close();
- }
- }
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
发送一条消息试试。
发送成功了。
- package customer;
-
- import com.rabbitmq.client.*;
- import utill.ConnectionUtils;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者消费消息
- * @author liucong 2018/11/13
- */
- public class Customer {
- /**
- * 队列名称
- */
- private static final String QUEUE_NAME = "QUEUE_test1";
-
- public static void main(String[] arge) throws IOException, TimeoutException {
- //1.获取连接
- Connection connection = ConnectionUtils.getConnection();
- //2.创建管道
- Channel channel = connection.createChannel();
- //3.获取消息
- Consumer consumer = new DefaultConsumer(channel){
- /**
- * @param consumerTag consumerTag
- * @param envelope 存放生产者相关信息
- * @param properties properties
- * @param body 消息实体
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body,"UTF-8");
- System.out.println("已接收到" + msg);
- }
- };
- //4.自动回复队列应答(RabbitMQ中的消息确认机制)
- channel.basicConsume(QUEUE_NAME, true, consumer);
- //5.关闭连接
- channel.close();
- connection.close();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
一个简单的消息队列就完成了。
简单队列的不足: 耦合性高 生产消费一一对应,如果有多个消费者想都消费这个消息,队列名称变更时需要同时更改 。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。