赞
踩
几种消息队列的区别:
安装
1.RabbitMQ 依赖 erlang,所以先安装 erlang 环境,配置环境变量:
2.在 RabbitMQ 的 sbin 目录下,执行命令
rabbitmq-plugins enable rabbitmq_management \\开启管理后台的插件
rabbitmq-server start //cmd窗口运行,要保持开启则直接启动服务即可
如果上一步报错:ERROR: node with name “rabbit” already running on “localhost”
先 rabbitmqctl stop 再 rabbitmq-server start
3.此时我们可以通过浏览器访问:http://localhost:15672 就可以看到 RabbitMQ 的后台,默认用户和密码都是guest
主界面:
1.概述
2.连接,我们还没有程序连接到这个 RabbitMQ
添加一个用户
vhost 管理
我添加了一个 root 的管理员账号,可以看到在 vhost 一栏是没有权限的,vhost 就相当于一个数据库,我们需要给用户授权。
1.老规矩,先引入JAR
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
2.连接工具类
public class ConnectionUtil{ private static Logger logger = Logger.getLogger(ConnectionUtil.class); public static Connection getConnection(){ try{ Connection connection = null; //定义一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务端地址(域名地址/ip) factory.setHost("127.0.0.1"); //设置服务器端口号 factory.setPort(5672); //设置虚拟主机(相当于数据库中的库) factory.setVirtualHost("/vhost_test"); //设置用户名 factory.setUsername("root"); //设置密码 factory.setPassword("123456"); connection = factory.newConnection(); return connection; } catch (Exception e){ return null; } } }
接下我们就可以通过这个工具类获得连接,创建频道和声明队列了,可以分为以下几种:
1.生产者-队列-消费者: 生产者一一对应消费
生产者:
/** * 生产者 * @author admin */ public class Producter { public static void main(String[] args) { try{ //获取连接 Connection connection = ConnectionUtil.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("test_queue", false, false, false, null); String message = "hello mq"; //发送消息 channel.basicPublish("", "test_queue", null, message.getBytes("utf-8")); System.out.println("[send]:" + message); channel.close(); connection.close(); } catch (Exception e){ e.printStackTrace(); } } }
消费者:
/** * 消费者 * @author admin */ public class Consumer { public static void main(String[] args) { try{ //获取连接 Connection connection = ConnectionUtil.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("test_queue", false, false, false, null); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ //当消息到达时执行回调方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{ String message = new String(body, "utf-8"); System.out.println("[Receive]:" + message); } }; //监听队列 channel.basicConsume("test_queue", true, consumer); } catch (Exception e){ e.printStackTrace(); } } }
运行生产者,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。