赞
踩
通过@Bean注入了Spring IOC容器中
- @Configuration
- public class RabbitConfig {
-
- @Value("${mqRabbitHost}")
- private String addresses;//设置连接地址
-
- @Value("${mqRabbitUserName}")
- public String username;//设置用户名称
-
- @Value("${mqRabbitPassword}")
- public String password;//设置用户密码
-
- //权限控制的基本单位,一个VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user或者团队使用
- @Value("${mqRabbitVirtualHost}")
- private String virtualHost;//设置VirtualHost地址
-
- @Value("${mqRabbitPublisher-confirms}")
- private boolean publisherConfirms;//发行者确认
-
- @Value("${mqRabbitApi-url}")
- public String URL;//
-
- @Bean
- public ConnectionFactory connectionFactory() {
- //创建连接工厂
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses(addresses);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- /** 如果要进行消息回调,则这里必须要设置为true */
- connectionFactory.setPublisherConfirms(publisherConfirms);
- return connectionFactory;
- }
-
-
- /**
- * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
- *
- * @return
- */
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- public RabbitTemplate rabbitTemplatenew() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
-
- }
(1)Binding:绑定
- public class Binding {
-
- /**
- * 绑定源
- */
- private String source;
- /**
- * 虚拟主机
- */
- private String vhost;
- /**
- * 绑定目标
- */
- private String destination;
- /**
- * 绑定的目标的类型
- */
- private String destination_type;
- /**
- * 路由键
- */
- private String routing_key;
- /**
- * 属性键
- */
- private String properties_key;
-
- get and set...
- }
(2)Exchange:交换机
- public class Exchange {
-
- /**
- * 是否持久化
- */
- private Boolean durable = true;
- /**
- * 交换机名称
- */
- private String name;
- /**
- * 原来交换机名称
- */
- private String oldName;
- /**
- * 虚拟主机
- */
- private String vhost;
- /**
- * 交换机类型
- */
- private String type = "fanout";
- /**
- * 是否只能交换机内部使用
- */
- private String internal;
- /**
- * 是否自动删除
- */
- private Boolean auto_delete = false;
-
- get and set...
- }
(3)Queue:队列
- public class Queue {
- /**
- * 队列运行状态
- */
- private String state;
- /**
- * 队列名称
- */
- private String name;
- /**
- * 原来队列名称
- */
- private String oldName;
- /**
- * 虚拟主机
- */
- private String vhost;
- /**
- * 是否持久化
- */
- private Boolean durable = true;
- /**
- * 是否自动删除
- */
- private Boolean auto_delete = false;
- /**
- * 是否排外
- */
- private Boolean exclusive = false;
- /**
- * 对应的节点
- */
- private String node;
-
- get and set..
- }
- @Transactional()
- public interface RabbitMqManageService {
- /**
- * 新增MQ队列
- */
- ResponseResult queueDeclare(Queue queue);
- /**
- * 修改MQ队列名称
- */
- ResponseResult queueModify(Queue queue);
- /**
- * 删除MQ队列
- */
- ResponseResult queueDelete(Queue queue);
- /**
- * 新增MQ交换机
- */
- ResponseResult exchangeDeclare(Exchange exchange);
- /**
- * 修改MQ交换机名称
- */
- ResponseResult exchangeModify(Exchange exchange);
- /**
- * 删除MQ交换机
- */
- ResponseResult exchangeDelete(Exchange exchange);
- /**
- * MQ交换机绑定队列
- */
- ResponseResult binding(Binding binding);
- /**
- * 交换机解绑队列
- */
- ResponseResult unbinding(Binding binding);
-
- }
- @Service
- public class RabbitMqManageServiceImpl implements RabbitMqManageService {
-
- @Autowired
- private ConnectionFactory connectionFactory;
-
- @Autowired
- private RabbitConfig config;
-
- @Override
- public ResponseResult queueDeclare(Queue queue) {
- // 参数 :
- // 队列名称,是否持久化,是否排外,是否自动删除,参数
- // 队列名以 mia.服务名称.custom 命名
- String name = queue.getName();
- // boolean begin = name.startsWith("mia.");
- // boolean end = name.endsWith(".custom");
-
- ResponseResult result = new ResponseResult();
-
- // if (!begin || !end) {
- // throw new BusinessAccessException("队列命名方式不对");
- // }
- try {
- AMQP.Queue.DeclareOk queueResult =
- connectionFactory.createConnection().createChannel(false).queueDeclare(name,
- queue.getDurable(), queue.getExclusive(), queue.getAuto_delete(), null);
- String resultName = queueResult.getQueue();
- if (resultName != null && resultName.equals(name)) {
- result.setCode("200");
- result.setMsg("success");
- }
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- @Override
- public ResponseResult queueModify(Queue queue) {
- String newName = queue.getName();
- String oldName = queue.getOldName();
- if (newName == null || oldName == null) {
- throw new BusinessAccessException("请输入正确的参数");
- }
- queue.setName(oldName);
- ResponseResult result = this.queueDelete(queue);
- if ("200".equals(result.getCode())) {
- queue.setName(newName);
- result = this.queueDeclare(queue);
- if ("200".equals(result.getCode())) {
- return result;
- } else {
- throw new BusinessAccessException("修改队列失败,请直接新建队列");
- }
- }
- return result;
- }
-
- @Override
- public ResponseResult queueDelete(Queue queue) {
- // 参数 :
- // 队列名称,是否持久化,是否排外,是否自动删除,参数
- // 队列名以 mia.服务名称.custom 命名
- String name = queue.getName();
- // boolean begin = name.startsWith("mia.");
- // boolean end = name.endsWith(".custom");
-
- ResponseResult result = new ResponseResult();
-
- Queue queryResult = this.getQueue(name);
- if (queryResult.getName() == null) {
- result.setMsg("not found queue " + name);
- return result;
- }
-
- // if (!begin || !end) {
- // throw new BusinessAccessException("只能删除自定义创建队列");
- // }
- try {
- AMQP.Queue.DeleteOk queueResult =
- connectionFactory.createConnection().createChannel(false).queueDelete(name);
- if (queueResult.getMessageCount() >= 0) {
- result.setCode("200");
- result.setMsg("success");
- }
-
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- /**
- * 根据名称查询某个队列
- *
- * @param name
- * @return
- */
- public Queue getQueue(String name) {
- String vHost = "/";
- String url = config.URL + "queues/" + HttpUtil.getURLEncoderString(vHost) + "/" + name;
- try {
- String content = HttpUtil.get(url);
- Queue result = JSON.parseObject(content, Queue.class);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 新增MQ交换机
- *
- * @param exchange
- */
- @Override
- public ResponseResult exchangeDeclare(Exchange exchange) {
- // 参数 :
- // 交换机名称,类型,是否持久化,是否自动删除,参数
- // 交换机名以 mia.业务名称.custom 命名
-
- String name = exchange.getName();
-
- // boolean begin = name.startsWith("mia.");
- // boolean end = name.endsWith(".custom");
-
- ResponseResult result = new ResponseResult();
-
- // if (!begin || !end) {
- // throw new BusinessAccessException("交换机命名方式不对");
- // }
-
- String type = exchange.getType();
- if (!"direct".equals(type) && !"fanout".equals(type) && !"topic".equals(type)
- && !"headers".equals(type)) {
- throw new BusinessAccessException("交换机的类型不对");
- }
-
- try {
- AMQP.Exchange.DeclareOk exchangeResult =
- connectionFactory.createConnection().createChannel(false).exchangeDeclare(name, type,
- exchange.getDurable(), exchange.getAuto_delete(), null);
- if (exchangeResult != null) {
- result.setCode("200");
- result.setMsg("success");
- }
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- @Override
- public ResponseResult exchangeModify(Exchange exchange) {
- String newName = exchange.getName();
- String oldName = exchange.getOldName();
- if (newName == null || oldName == null) {
- throw new BusinessAccessException("请输入正确的参数");
- }
- exchange.setName(oldName);
- ResponseResult result = this.exchangeDelete(exchange);
- if ("200".equals(result.getCode())) {
- exchange.setName(newName);
- result = this.exchangeDeclare(exchange);
- if ("200".equals(result.getCode())) {
- return result;
- } else {
- throw new BusinessAccessException("修改交换机失败,请直接新建交换机");
- }
- }
- return result;
- }
-
- @Override
- public ResponseResult exchangeDelete(Exchange exchange) {
- // 参数 :
- // 交换机名称,类型,是否持久化,是否自动删除,参数
- // 交换机名以 mia.业务名称.custom 命名
-
- String name = exchange.getName();
- // boolean begin = name.startsWith("mia.");
- // boolean end = name.endsWith(".custom");
-
- ResponseResult result = new ResponseResult();
-
- Exchange queryResult = this.getExchange(name);
- if (queryResult.getName() == null) {
- result.setMsg("not found exchange " + name);
- return result;
- }
-
- if (queryResult.getName() == null) {
- result.setMsg("not found queue " + name);
- return result;
- }
-
-
- // if (!begin || !end) {
- // throw new BusinessAccessException("只能删除自定义创建交换机");
- // }
- try {
- AMQP.Exchange.DeleteOk exchangeResult =
- connectionFactory.createConnection().createChannel(false).exchangeDelete(name);
- if (exchangeResult != null) {
- result.setCode("200");
- result.setMsg("success");
- }
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- /**
- * 根据名称查询某个交换机
- *
- * @param name
- * @return
- */
- public Exchange getExchange(String name) {
-
- String vHost = "/";
-
- String url = config.URL + "exchanges/" + HttpUtil.getURLEncoderString(vHost) + "/" + name;
-
- try {
- String content = HttpUtil.get(url);
- Exchange result = JSON.parseObject(content, Exchange.class);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- @Override
- public ResponseResult binding(Binding binding) {
- // 参数 :
- // 队列名称,交换机名称,路由键值,参数
-
- String queueName = binding.getDestination();
- String exchangeName = binding.getSource();
- String key = binding.getRouting_key();
-
- ResponseResult result = new ResponseResult();
- Queue queue = this.getQueue(queueName);
- Exchange exchange = this.getExchange(exchangeName);
- if (queue.getName() == null || exchange.getName() == null) {
- result.setMsg("not found queue or exchange");
- return result;
- }
-
- if (queueName == null || exchangeName == null || key == null) {
- throw new BusinessAccessException("关键参数不能为空");
- }
-
- try {
- AMQP.Queue.BindOk bindOk = connectionFactory.createConnection().createChannel(false)
- .queueBind(queueName, exchangeName, key, null);
- if (bindOk != null) {
- result.setCode("200");
- result.setMsg("success");
- }
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- @Override
- public ResponseResult unbinding(Binding binding) {
- // 参数 :
- // 队列名称,交换机名称,路由键值,参数
-
- String queueName = binding.getDestination();
-
- String exchangeName = binding.getSource();
-
- String key = binding.getRouting_key();
-
- ResponseResult result = new ResponseResult();
-
- Queue queue = this.getQueue(queueName);
-
- Exchange exchange = this.getExchange(exchangeName);
- if (queue.getName() == null || exchange.getName() == null) {
- result.setMsg("not found queue or exchange");
- return result;
- }
-
- if (queueName == null || exchangeName == null || key == null) {
- throw new BusinessAccessException("关键参数不能为空");
- }
-
- try {
- AMQP.Queue.UnbindOk unbindOk = connectionFactory.createConnection().createChannel(false)
- .queueUnbind(queueName, exchangeName, key, null);
- if (unbindOk != null) {
- result.setCode("200");
- result.setMsg("success");
- }
-
- } catch (AmqpException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- }
- public class HttpUtil {
-
- /**
- * HttpClient封装的Get请求
- *
- * @param url
- * @return
- */
- public static String get(String url) {
- // 创建HttpClientBuilder
- HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
-
- // HttpClient
- CloseableHttpClient httpClient = httpClientBuilder.build();
-
- HttpGet httpGet = new HttpGet(url);
-
- RabbitConfig config = new RabbitConfig();
-
- final String authInfo = config.getUsername() + ":" + config.getPassword();
-
- System.out.println("authInfo : " + authInfo);
-
- try {
- String encoding = DatatypeConverter.printBase64Binary("guest:guest".getBytes("utf-8"));
- httpGet.setHeader("Authorization", "Basic " + encoding);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
-
- System.out.println(httpGet.getRequestLine());
- try {
- // 执行get请求
- HttpResponse httpResponse = httpClient.execute(httpGet);
- // 获取响应消息实体
- HttpEntity entity = httpResponse.getEntity();
-
- String content = EntityUtils.toString(entity);
-
- // 响应状态
- System.out.println("status:" + httpResponse.getStatusLine());
- System.out.println("content:" + content);
-
- // 判断响应实体是否为空
- if (entity != null) {
- System.out.println("contentEncoding:" + entity.getContentEncoding());
- }
- return content;
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try { // 关闭流并释放资源
- httpClient.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- return null;
- }
-
- /**
- * URL 解码
- *
- */
- public static String getURLDecoderString(String str) {
- String result = "";
- if (null == str) {
- return "";
- }
- try {
- result = java.net.URLDecoder.decode(str, "utf-8");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- /**
- * URL 转码
- *
- */
- public static String getURLEncoderString(String str) {
- String result = "";
- if (null == str) {
- return "";
- }
- try {
- result = java.net.URLEncoder.encode(str, "utf-8");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- return result;
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。