赞
踩
引言
最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!
1.先来介绍RabbitMQ中的成员
下面是各个成员的作用图解
引入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
2.先来介绍Exchange
这里将着重于介绍Exchange和Queue的各个参数解释
先来看看Exchange中都有哪些属性
下面这个类用于创建一个与RabbitMQ的Connection(连接),该Connection用于创建Channel(信道),Channel是消息读写的通道,也就是我们的操作都会在Channel的基础之上进行
2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)
进入RabbitMQ可视化界面可以看到,RabbitMQ已经为我们创建了exchange.0,类型为direct
具体释意
name 名称 type 类型 Features 特征 Message rate in 消息速率输入 Message rate out 消息速率输出
2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,之前我们已经创建了exchange.0,那么我们再创建一次会怎样
exchangeDeclare(String exchange, String type, boolean durable)
运行成功,并没有报错,因为只要你设置的的设置是一样的,那么就不会报错,如果设置的不一样,那么就会报错,后面会进行验证
这里我们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化
接下来验证这个持久化有什么作用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
重新进入可视化界面,Exchange就只剩下持久化的了
2.3接下来是五个参数的
多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
下面创建了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
由于这里是没有绑定Queue的,那么exchange.4将在创建后就被删除掉?
执行上面的代码
exchange.4还活的好好的,这是因为我们必须在绑定Queue之后再失去绑定才会被删除,否则为什么不直接抛异常,接下来进行验证
下面直接通过可视化工具创建一个名称为queue.4的Queue
英文释义
Name 名称 Features 特征 Status 状态 Ready 是否准备好 Unacked 未确认 Total 总计 incoming 进来的 deliver 传送 get 得到 ack 确认
2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments
exclusive:是否排他,如果未true,则只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除
在执行完下面代码,查看可视化界面,发现queue中并没有exclusive.queue,因为在connection关闭后,该queue也会自动删除
创建实例
package com.tiandy.illegal.util.mq; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.*; import com.tiandy.illegal.bo.CLS_ManageService; import com.tiandy.illegal.bo.CLS_ManageServiceImpl; import com.tiandy.illegal.util.CLS_ILLEGAL_Error; import com.tiandy.illegal.vo.CLS_VO_Message; import com.tiandy.illegal.vo.CLS_VO_Record; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ResourceBundle; public class RabbitMQSend { //rabbitmq连接 public static Connection connection = null; //rabbitmq通道 public static Channel channel = null; //连接状态标识 public static boolean connectStatus = false; // 配置 static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig"); // 交换机 exchangeTemp private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange"); // 队列名 queue_vbs_vehicle_record private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue"); // service CLS_ManageService cls_manageService = new CLS_ManageServiceImpl(); static ConnectionFactory factory = null; public void initialize() { try { //连接工厂 if (null == factory) { factory = new ConnectionFactory(); factory= RabbitMQUtil.getRabbitMQConnectionFactory(); // 关闭通道与连接 closeConnection(); connection = factory.newConnection(); channel = connection.createChannel(); // 声明交换机 // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true); connectStatus = true; } } catch (Exception e) { connectStatus = false; e.printStackTrace(); // log.error("RabbitMQSend method initialize:" + e.getMessage(), e); } } //关闭连接 public void closeConnection() { try { if (channel != null) { if (channel.isOpen()) { channel.close(); channel = null; } } } catch (Exception e) { //log.error("RabbitMQSend closeChannel error " + e); e.printStackTrace(); } try { if (connection != null) { if (connection.isOpen()) { connection.close(); connection = null; } } } catch (Exception e) { // log.error("RabbitMQSend closeConnection error " + e); e.printStackTrace(); } } /** * 监听消息队列,获取数据 */ public void queueDeclareExchange() { //声明交换机 try { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-length", 100000); // 设置最大存储消息数 // 声明交换机 (交换机参数) channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true); // 消息持久化 (队列参数) channel.queueDeclare(rabbitmq_queue, true, false, false, args); // 交换机与队列绑定 channel.queueBind(rabbitmq_queue, rabbitmq_exchange, ""); // 消费者限制 //channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { int inRecord=0; // 插入记录数量 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息 String msg = new String(body, "UTF-8"); // 判断数据是否允许接入 int check = checkMessage(msg); if (check == CLS_ILLEGAL_Error.ERROR_OK) { // 消息转换至VO CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg); // 判断数据,分开处理白车牌数据与其他数据,每次新增一条 int count = cls_manageService.decideData(msgVo); if(count>0){ inRecord+=count; System.out.println(" 已消费消息:"+envelope.getDeliveryTag()+" 插入记录数:" + inRecord); } } // 单条消息确认(第几条,是否多条) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 设置消息手动确认 (队列名,是否自动确认,consumer) channel.basicConsume(rabbitmq_queue, false, consumer); } catch (IOException e) { e.printStackTrace(); } } /** * 方法说明:监测接收信息 * * @param message * @return @修改人及日期: @修改描述: @其他: */ public int checkMessage(String message) { // TODO 监测数据格式及是否允许接入 int check = 0; CLS_VO_Message vo_Message = null; try { vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class); } catch (Exception e) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getTotal_info() == null) { return CLS_ILLEGAL_Error.ERROR_PARAM; } CLS_VO_Record total_info = vo_Message.getTotal_info(); if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } return check; } }
至此,简单的参数讲解和应用就总结完了!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。