赞
踩
RabbitMQ简介、概念、安装、启动、工作模式
rabbitmq的网页管理的端口是15672,如果你是远程操作服务器,输入http://ip:15672,发现连接不上,因为服务器防火墙不允许这个端口远程访问;
放行防火墙端口
# 将mq的tcp监听端口和网页管理端口都设置成允许远程访问
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=5672/tcp
systemctl restart firewalld.service
管理界面介绍
# 输入用户名密码登录后进入主界面
Overview:用来显示流量,端口,节点等信息,以及修改配置文件;
Connections:显示所有的TCP连接;
channels:显示所有的信道连接;
exchanges:显示所有的交换机以及创建删除等;
queues:显示所有的队列以及创建删除等;
admins:显示所有的用户以及用户管理;
创建虚拟主机
rabbitmq有一个默认的用户名和密码,guest和guest,但为了安全考虑,该用户名和密码只允许本地访问,如果是远程操作的话,需要创建新的用户名和密码;
设置标签(用户角色)
rabbitmqctl set_user_tags 用户名 角色
用户角色说明
management:用户可以访问管理插件
policymaker:用户可以访问管理插件,并管理他们有权访问的vhost的策略和参数。
monitoring:用户可以访问管理插件,查看所有连接和通道以及与节点相关的信息。
administrator:用户可以做任何监视可以做的事情,管理用户,vhost和权限,关闭其他用户的连接,并管理所有vhost的政策和参数。
设置权限:设置用户可以访问哪些vhost;配置权限、读权限、写权限
rabbitmqctl set_permissions -p /虚拟主机名称 用户名 '.*' '.*' '.*'
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.113"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); factory.setVirtualHost("/myvhost"); //获取连接 Connection con = factory.newConnection(); //创建通道 Channel channel = con.createChannel(); //声明队列:创建一个队列,如果vhost中没有则创建对列,如果有则直接使用。 //参数一:队列的名称 //参数二:是否持久化 //参数三:是否私有化,true表示私有化,只有第一次访问该队里的消费者才能使用该队列;false表示所有消费者都可以访问队列。 //参数四:表示是否自动删除,当消费者连接断开后是否删除该队列。false表示不删除 //参数五: 其他一些额外参数的配置,比如延迟时间 channel.queueDeclare("simple_queue",true,false,false,null); //发布消息 //参数1:是交换机的名称,暂时不用 //参数2:队列名称 //参数3:额外的参数 //参数4:要投递的消息 channel.basicPublish("","simple_queue",null,"这是一条message".getBytes()); channel.close(); con.close(); System.out.println("消息发送成功!"); }
public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMQUtil.getConnection(); //创建通道 final Channel channel = con.createChannel(); //声明队列:有则直接使用,没有则创建 //2:是否持久化 //3:是否私有化 //4: 是否自动删除 channel.queueDeclare("simple_queue", true, false, false, null); //签收消息 //参数1:队列名称 //参数2: 是否自动签收,false:表示手动编程的方式签收 //参数3:处理签收消息的回调方法 channel.basicConsume("simple_queue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者处理的消息:" + new String(body)); //签收 //参数1:消息的Id //参数2: true 表示签收当前队里中所有小于该Id的消息,false表示只签收当前消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); // channel.close(); // con.close(); }
总共是6种工作模式:
//生产者 public class Ticket { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); //申明交换机,类型为fanout channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT); //发布消息 for (int i = 0; i < 10; i++) { channel.basicPublish("pubsub_exchange","",null,("票务消息"+i).getBytes()); } channel.close(); con.close(); System.out.println("发送成功"); } } //消费者 public class Xc { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); final Channel channel = con.createChannel(); //声明队列 channel.queueDeclare("xc_queue",true,false,false,null); //绑定交换机 channel.queueBind("xc_queue","pubsub_exchange",""); //消费 channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消息: "+ new String(body)); // channel.basicAck(envelope.getDeliveryTag(),false); } }); } } //消费者 public class Qn { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); final Channel channel = con.createChannel(); //声明队列 channel.queueDeclare("qn_queue",true,false,false,null); //绑定交换机 channel.queueBind("qn_queue","pubsub_exchange",""); //消费 channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消息: "+ new String(body)); // channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
public class Ticket { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); //申明交换机,类型为direct channel.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); Map<String,String> map = new HashMap(); map.put("schk20201001","成都-上海"); map.put("schk20201002","成都-北京"); map.put("xmhk20201001","厦门-上海"); map.put("xmhk20201002","厦门-北京"); //发布消息 for (String key : map.keySet()){ channel.basicPublish("route_exchange",key,null,map.get(key).getBytes()); } channel.close(); con.close(); System.out.println("发送成功"); } } public class Xc { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); //声明duilie channel.queueDeclare("xc_queue",true,false,false, null); //绑定交换机 channel.queueBind("xc_queue","route_exchange","schk20201001"); channel.queueBind("xc_queue","route_exchange","schk20201002"); //消费 channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("票务信息:" + new String(body) ); } }); } public class Qn { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); //声明duilie channel.queueDeclare("qn_queue",true,false,false, null); //绑定交换机 channel.queueBind("qn_queue","route_exchange","xmhk20201001"); channel.queueBind("qn_queue","route_exchange","xmhk20201002"); //消费 channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("票务信息:" + new String(body) ); } }); } }
public class Ticket { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); //开启消息确认机制 channel.confirmSelect(); //确认机制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("接收到的消息id:" + deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("拒接的消息id:" + deliveryTag); } }); //消息返回监听 channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return rm) { System.out.println("-----------------------------------------"); System.out.println("交换机:"+rm.getExchange()); System.out.println("消息的状态码:" + rm.getReplyCode()); System.out.println("消息描述:"+rm.getReplyText()); System.out.println("路由key:"+rm.getRoutingKey()); System.out.println("----------------------------------------"); } }); Map<String,String> map = new HashMap(); map.put("schk.20201001","成都-上海"); map.put("schk.20201002","成都-北京"); map.put("xmhk.20201001","厦门-上海"); map.put("xmhk.20201002","厦门-北京"); map.put("gjhk.20201001","北京-东京"); map.put("gjhk.20201002","北京-美国"); //发布消息 for (String key : map.keySet()){ //参数三表示如果没有将对应的消息转发给对应的队列,就强制退回给生产者 channel.basicPublish("topic_exchange",key,true,null,map.get(key).getBytes()); } // channel.close(); // con.close(); System.out.println("发送成功"); } } public class Qn { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); channel.queueDeclare("qn_queue",true,false,false,null); channel.queueBind("qn_queue","topic_exchange","xmhk.#"); channel.basicConsume("qn_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("票务信息: "+ new String(body)); } }); } } public class Xc { public static void main(String[] args) throws IOException, TimeoutException { Connection con = RabbitMqUtil.getConnectin(); Channel channel = con.createChannel(); channel.queueDeclare("xc_queue",true,false,false,null); channel.queueBind("xc_queue","topic_exchange","schk.#"); channel.basicConsume("xc_queue",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("票务信息: "+ new String(body)); } }); } }
channel.confirmSelect();//开启消息的确认机制 //确认机制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { System.out.println("MQ已经接收到消息,消息ID:"+l); } @Override public void handleNack(long l, boolean b) throws IOException { System.out.println("MQ拒收消息,消息ID:"+l); } }); //消息返回监听 channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return r) { System.out.println("-----------------------------------------------------"); System.out.println("返回的消息状态码:"+r.getReplyCode()); System.out.println("消息描述:"+r.getReplyText()); System.out.println("交换机"+r.getExchange()); System.out.println("路由key"+r.getRoutingKey()); System.out.println("-----------------------------------------------------"); } });
//参数3:表示如果没有将消息转发给对应的队列,则强制退回给生成者
channel.basicPublish("topic_exchange",key,true,null,res.get(key).getBytes());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。