当前位置:   article > 正文

RabbitMQ管理端代码(开箱即用)_createchannel(false)

createchannel(false)

骨架图:

以下代码开箱即用,直接复制即可(注意MQ配置读取的是个人配置中心的MQ配置信息)

 

依次介绍:

 一、MQ配置:RabbitConfig

通过@Bean注入了Spring  IOC容器中

  1. @Configuration
  2. public class RabbitConfig {
  3. @Value("${mqRabbitHost}")
  4. private String addresses;//设置连接地址
  5. @Value("${mqRabbitUserName}")
  6. public String username;//设置用户名称
  7. @Value("${mqRabbitPassword}")
  8. public String password;//设置用户密码
  9. //权限控制的基本单位,一个VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user或者团队使用
  10. @Value("${mqRabbitVirtualHost}")
  11. private String virtualHost;//设置VirtualHost地址
  12. @Value("${mqRabbitPublisher-confirms}")
  13. private boolean publisherConfirms;//发行者确认
  14. @Value("${mqRabbitApi-url}")
  15. public String URL;//
  16. @Bean
  17. public ConnectionFactory connectionFactory() {
  18. //创建连接工厂
  19. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  20. connectionFactory.setAddresses(addresses);
  21. connectionFactory.setUsername(username);
  22. connectionFactory.setPassword(password);
  23. connectionFactory.setVirtualHost(virtualHost);
  24. /** 如果要进行消息回调,则这里必须要设置为true */
  25. connectionFactory.setPublisherConfirms(publisherConfirms);
  26. return connectionFactory;
  27. }
  28. /**
  29. * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
  30. *
  31. * @return
  32. */
  33. @Bean
  34. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  35. public RabbitTemplate rabbitTemplatenew() {
  36. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  37. return template;
  38. }
  39. public String getUsername() {
  40. return username;
  41. }
  42. public String getPassword() {
  43. return password;
  44. }
  45. }

二、entity

(1)Binding:绑定

  1. public class Binding {
  2. /**
  3. * 绑定源
  4. */
  5. private String source;
  6. /**
  7. * 虚拟主机
  8. */
  9. private String vhost;
  10. /**
  11. * 绑定目标
  12. */
  13. private String destination;
  14. /**
  15. * 绑定的目标的类型
  16. */
  17. private String destination_type;
  18. /**
  19. * 路由键
  20. */
  21. private String routing_key;
  22. /**
  23. * 属性键
  24. */
  25. private String properties_key;
  26. get and set...
  27. }

(2)Exchange:交换机

  1. public class Exchange {
  2. /**
  3. * 是否持久化
  4. */
  5. private Boolean durable = true;
  6. /**
  7. * 交换机名称
  8. */
  9. private String name;
  10. /**
  11. * 原来交换机名称
  12. */
  13. private String oldName;
  14. /**
  15. * 虚拟主机
  16. */
  17. private String vhost;
  18. /**
  19. * 交换机类型
  20. */
  21. private String type = "fanout";
  22. /**
  23. * 是否只能交换机内部使用
  24. */
  25. private String internal;
  26. /**
  27. * 是否自动删除
  28. */
  29. private Boolean auto_delete = false;
  30. get and set...
  31. }

(3)Queue:队列

  1. public class Queue {
  2. /**
  3. * 队列运行状态
  4. */
  5. private String state;
  6. /**
  7. * 队列名称
  8. */
  9. private String name;
  10. /**
  11. * 原来队列名称
  12. */
  13. private String oldName;
  14. /**
  15. * 虚拟主机
  16. */
  17. private String vhost;
  18. /**
  19. * 是否持久化
  20. */
  21. private Boolean durable = true;
  22. /**
  23. * 是否自动删除
  24. */
  25. private Boolean auto_delete = false;
  26. /**
  27. * 是否排外
  28. */
  29. private Boolean exclusive = false;
  30. /**
  31. * 对应的节点
  32. */
  33. private String node;
  34. get and set..
  35. }

三、RabbitMqManageService

  1. @Transactional()
  2. public interface RabbitMqManageService {
  3. /**
  4. * 新增MQ队列
  5. */
  6. ResponseResult queueDeclare(Queue queue);
  7. /**
  8. * 修改MQ队列名称
  9. */
  10. ResponseResult queueModify(Queue queue);
  11. /**
  12. * 删除MQ队列
  13. */
  14. ResponseResult queueDelete(Queue queue);
  15. /**
  16. * 新增MQ交换机
  17. */
  18. ResponseResult exchangeDeclare(Exchange exchange);
  19. /**
  20. * 修改MQ交换机名称
  21. */
  22. ResponseResult exchangeModify(Exchange exchange);
  23. /**
  24. * 删除MQ交换机
  25. */
  26. ResponseResult exchangeDelete(Exchange exchange);
  27. /**
  28. * MQ交换机绑定队列
  29. */
  30. ResponseResult binding(Binding binding);
  31. /**
  32. * 交换机解绑队列
  33. */
  34. ResponseResult unbinding(Binding binding);
  35. }

四:RabbitMqManageServiceImpl

  1. @Service
  2. public class RabbitMqManageServiceImpl implements RabbitMqManageService {
  3. @Autowired
  4. private ConnectionFactory connectionFactory;
  5. @Autowired
  6. private RabbitConfig config;
  7. @Override
  8. public ResponseResult queueDeclare(Queue queue) {
  9. // 参数 :
  10. // 队列名称,是否持久化,是否排外,是否自动删除,参数
  11. // 队列名以 mia.服务名称.custom 命名
  12. String name = queue.getName();
  13. // boolean begin = name.startsWith("mia.");
  14. // boolean end = name.endsWith(".custom");
  15. ResponseResult result = new ResponseResult();
  16. // if (!begin || !end) {
  17. // throw new BusinessAccessException("队列命名方式不对");
  18. // }
  19. try {
  20. AMQP.Queue.DeclareOk queueResult =
  21. connectionFactory.createConnection().createChannel(false).queueDeclare(name,
  22. queue.getDurable(), queue.getExclusive(), queue.getAuto_delete(), null);
  23. String resultName = queueResult.getQueue();
  24. if (resultName != null && resultName.equals(name)) {
  25. result.setCode("200");
  26. result.setMsg("success");
  27. }
  28. } catch (AmqpException e) {
  29. e.printStackTrace();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. return result;
  34. }
  35. @Override
  36. public ResponseResult queueModify(Queue queue) {
  37. String newName = queue.getName();
  38. String oldName = queue.getOldName();
  39. if (newName == null || oldName == null) {
  40. throw new BusinessAccessException("请输入正确的参数");
  41. }
  42. queue.setName(oldName);
  43. ResponseResult result = this.queueDelete(queue);
  44. if ("200".equals(result.getCode())) {
  45. queue.setName(newName);
  46. result = this.queueDeclare(queue);
  47. if ("200".equals(result.getCode())) {
  48. return result;
  49. } else {
  50. throw new BusinessAccessException("修改队列失败,请直接新建队列");
  51. }
  52. }
  53. return result;
  54. }
  55. @Override
  56. public ResponseResult queueDelete(Queue queue) {
  57. // 参数 :
  58. // 队列名称,是否持久化,是否排外,是否自动删除,参数
  59. // 队列名以 mia.服务名称.custom 命名
  60. String name = queue.getName();
  61. // boolean begin = name.startsWith("mia.");
  62. // boolean end = name.endsWith(".custom");
  63. ResponseResult result = new ResponseResult();
  64. Queue queryResult = this.getQueue(name);
  65. if (queryResult.getName() == null) {
  66. result.setMsg("not found queue " + name);
  67. return result;
  68. }
  69. // if (!begin || !end) {
  70. // throw new BusinessAccessException("只能删除自定义创建队列");
  71. // }
  72. try {
  73. AMQP.Queue.DeleteOk queueResult =
  74. connectionFactory.createConnection().createChannel(false).queueDelete(name);
  75. if (queueResult.getMessageCount() >= 0) {
  76. result.setCode("200");
  77. result.setMsg("success");
  78. }
  79. } catch (AmqpException e) {
  80. e.printStackTrace();
  81. } catch (IOException e) {
  82. e.printStackTrace();
  83. }
  84. return result;
  85. }
  86. /**
  87. * 根据名称查询某个队列
  88. *
  89. * @param name
  90. * @return
  91. */
  92. public Queue getQueue(String name) {
  93. String vHost = "/";
  94. String url = config.URL + "queues/" + HttpUtil.getURLEncoderString(vHost) + "/" + name;
  95. try {
  96. String content = HttpUtil.get(url);
  97. Queue result = JSON.parseObject(content, Queue.class);
  98. return result;
  99. } catch (Exception e) {
  100. e.printStackTrace();
  101. }
  102. return null;
  103. }
  104. /**
  105. * 新增MQ交换机
  106. *
  107. * @param exchange
  108. */
  109. @Override
  110. public ResponseResult exchangeDeclare(Exchange exchange) {
  111. // 参数 :
  112. // 交换机名称,类型,是否持久化,是否自动删除,参数
  113. // 交换机名以 mia.业务名称.custom 命名
  114. String name = exchange.getName();
  115. // boolean begin = name.startsWith("mia.");
  116. // boolean end = name.endsWith(".custom");
  117. ResponseResult result = new ResponseResult();
  118. // if (!begin || !end) {
  119. // throw new BusinessAccessException("交换机命名方式不对");
  120. // }
  121. String type = exchange.getType();
  122. if (!"direct".equals(type) && !"fanout".equals(type) && !"topic".equals(type)
  123. && !"headers".equals(type)) {
  124. throw new BusinessAccessException("交换机的类型不对");
  125. }
  126. try {
  127. AMQP.Exchange.DeclareOk exchangeResult =
  128. connectionFactory.createConnection().createChannel(false).exchangeDeclare(name, type,
  129. exchange.getDurable(), exchange.getAuto_delete(), null);
  130. if (exchangeResult != null) {
  131. result.setCode("200");
  132. result.setMsg("success");
  133. }
  134. } catch (AmqpException e) {
  135. e.printStackTrace();
  136. } catch (IOException e) {
  137. e.printStackTrace();
  138. }
  139. return result;
  140. }
  141. @Override
  142. public ResponseResult exchangeModify(Exchange exchange) {
  143. String newName = exchange.getName();
  144. String oldName = exchange.getOldName();
  145. if (newName == null || oldName == null) {
  146. throw new BusinessAccessException("请输入正确的参数");
  147. }
  148. exchange.setName(oldName);
  149. ResponseResult result = this.exchangeDelete(exchange);
  150. if ("200".equals(result.getCode())) {
  151. exchange.setName(newName);
  152. result = this.exchangeDeclare(exchange);
  153. if ("200".equals(result.getCode())) {
  154. return result;
  155. } else {
  156. throw new BusinessAccessException("修改交换机失败,请直接新建交换机");
  157. }
  158. }
  159. return result;
  160. }
  161. @Override
  162. public ResponseResult exchangeDelete(Exchange exchange) {
  163. // 参数 :
  164. // 交换机名称,类型,是否持久化,是否自动删除,参数
  165. // 交换机名以 mia.业务名称.custom 命名
  166. String name = exchange.getName();
  167. // boolean begin = name.startsWith("mia.");
  168. // boolean end = name.endsWith(".custom");
  169. ResponseResult result = new ResponseResult();
  170. Exchange queryResult = this.getExchange(name);
  171. if (queryResult.getName() == null) {
  172. result.setMsg("not found exchange " + name);
  173. return result;
  174. }
  175. if (queryResult.getName() == null) {
  176. result.setMsg("not found queue " + name);
  177. return result;
  178. }
  179. // if (!begin || !end) {
  180. // throw new BusinessAccessException("只能删除自定义创建交换机");
  181. // }
  182. try {
  183. AMQP.Exchange.DeleteOk exchangeResult =
  184. connectionFactory.createConnection().createChannel(false).exchangeDelete(name);
  185. if (exchangeResult != null) {
  186. result.setCode("200");
  187. result.setMsg("success");
  188. }
  189. } catch (AmqpException e) {
  190. e.printStackTrace();
  191. } catch (IOException e) {
  192. e.printStackTrace();
  193. }
  194. return result;
  195. }
  196. /**
  197. * 根据名称查询某个交换机
  198. *
  199. * @param name
  200. * @return
  201. */
  202. public Exchange getExchange(String name) {
  203. String vHost = "/";
  204. String url = config.URL + "exchanges/" + HttpUtil.getURLEncoderString(vHost) + "/" + name;
  205. try {
  206. String content = HttpUtil.get(url);
  207. Exchange result = JSON.parseObject(content, Exchange.class);
  208. return result;
  209. } catch (Exception e) {
  210. e.printStackTrace();
  211. }
  212. return null;
  213. }
  214. @Override
  215. public ResponseResult binding(Binding binding) {
  216. // 参数 :
  217. // 队列名称,交换机名称,路由键值,参数
  218. String queueName = binding.getDestination();
  219. String exchangeName = binding.getSource();
  220. String key = binding.getRouting_key();
  221. ResponseResult result = new ResponseResult();
  222. Queue queue = this.getQueue(queueName);
  223. Exchange exchange = this.getExchange(exchangeName);
  224. if (queue.getName() == null || exchange.getName() == null) {
  225. result.setMsg("not found queue or exchange");
  226. return result;
  227. }
  228. if (queueName == null || exchangeName == null || key == null) {
  229. throw new BusinessAccessException("关键参数不能为空");
  230. }
  231. try {
  232. AMQP.Queue.BindOk bindOk = connectionFactory.createConnection().createChannel(false)
  233. .queueBind(queueName, exchangeName, key, null);
  234. if (bindOk != null) {
  235. result.setCode("200");
  236. result.setMsg("success");
  237. }
  238. } catch (AmqpException e) {
  239. e.printStackTrace();
  240. } catch (IOException e) {
  241. e.printStackTrace();
  242. }
  243. return result;
  244. }
  245. @Override
  246. public ResponseResult unbinding(Binding binding) {
  247. // 参数 :
  248. // 队列名称,交换机名称,路由键值,参数
  249. String queueName = binding.getDestination();
  250. String exchangeName = binding.getSource();
  251. String key = binding.getRouting_key();
  252. ResponseResult result = new ResponseResult();
  253. Queue queue = this.getQueue(queueName);
  254. Exchange exchange = this.getExchange(exchangeName);
  255. if (queue.getName() == null || exchange.getName() == null) {
  256. result.setMsg("not found queue or exchange");
  257. return result;
  258. }
  259. if (queueName == null || exchangeName == null || key == null) {
  260. throw new BusinessAccessException("关键参数不能为空");
  261. }
  262. try {
  263. AMQP.Queue.UnbindOk unbindOk = connectionFactory.createConnection().createChannel(false)
  264. .queueUnbind(queueName, exchangeName, key, null);
  265. if (unbindOk != null) {
  266. result.setCode("200");
  267. result.setMsg("success");
  268. }
  269. } catch (AmqpException e) {
  270. e.printStackTrace();
  271. } catch (IOException e) {
  272. e.printStackTrace();
  273. }
  274. return result;
  275. }
  276. }

五、HttpClient封装的Get请求,根据名称查队列、根据名称查交换机

  1. public class HttpUtil {
  2. /**
  3. * HttpClient封装的Get请求
  4. *
  5. * @param url
  6. * @return
  7. */
  8. public static String get(String url) {
  9. // 创建HttpClientBuilder
  10. HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
  11. // HttpClient
  12. CloseableHttpClient httpClient = httpClientBuilder.build();
  13. HttpGet httpGet = new HttpGet(url);
  14. RabbitConfig config = new RabbitConfig();
  15. final String authInfo = config.getUsername() + ":" + config.getPassword();
  16. System.out.println("authInfo : " + authInfo);
  17. try {
  18. String encoding = DatatypeConverter.printBase64Binary("guest:guest".getBytes("utf-8"));
  19. httpGet.setHeader("Authorization", "Basic " + encoding);
  20. } catch (UnsupportedEncodingException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println(httpGet.getRequestLine());
  24. try {
  25. // 执行get请求
  26. HttpResponse httpResponse = httpClient.execute(httpGet);
  27. // 获取响应消息实体
  28. HttpEntity entity = httpResponse.getEntity();
  29. String content = EntityUtils.toString(entity);
  30. // 响应状态
  31. System.out.println("status:" + httpResponse.getStatusLine());
  32. System.out.println("content:" + content);
  33. // 判断响应实体是否为空
  34. if (entity != null) {
  35. System.out.println("contentEncoding:" + entity.getContentEncoding());
  36. }
  37. return content;
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. } finally {
  41. try { // 关闭流并释放资源
  42. httpClient.close();
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. return null;
  48. }
  49. /**
  50. * URL 解码
  51. *
  52. */
  53. public static String getURLDecoderString(String str) {
  54. String result = "";
  55. if (null == str) {
  56. return "";
  57. }
  58. try {
  59. result = java.net.URLDecoder.decode(str, "utf-8");
  60. } catch (UnsupportedEncodingException e) {
  61. e.printStackTrace();
  62. }
  63. return result;
  64. }
  65. /**
  66. * URL 转码
  67. *
  68. */
  69. public static String getURLEncoderString(String str) {
  70. String result = "";
  71. if (null == str) {
  72. return "";
  73. }
  74. try {
  75. result = java.net.URLEncoder.encode(str, "utf-8");
  76. } catch (UnsupportedEncodingException e) {
  77. e.printStackTrace();
  78. }
  79. return result;
  80. }
  81. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/976501
推荐阅读
相关标签
  

闽ICP备14008679号