当前位置:   article > 正文

Java代码连接RabbitMQ服务器_java 对接rabbitmq

java 对接rabbitmq

目录

1.添加依赖

2.生产者代码

3.消费者代码

4.效果

1.发送消息

2.消费消息

5.注意


1.添加依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.12.0</version>
  5. </dependency>

2.生产者代码

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. String exchangerName = "ex_exchanger_name";
  4. // 交换机名称
  5. String queueName = "ex_queue_name";
  6. // 队列名称
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. // 创建连接工厂
  9. connectionFactory.setHost("ip地址");
  10. // RabbitMQ服务器地址(写自己服务器对应的ip地址)
  11. connectionFactory.setUsername("admin");
  12. // RabbitMQ用户名,这里是自定义用户名
  13. connectionFactory.setPassword("123456");
  14. // RabbitMQ密码,这里是自定义密码
  15. connectionFactory.setPort(5672);
  16. // RabbitMQ端口号
  17. Connection connection = connectionFactory.newConnection();
  18. //创建连接
  19. Channel channel = connection.createChannel();
  20. //创建信道
  21. /**
  22. * 创建交换机
  23. * 1、交换机名称
  24. * 2.交换机类型,direct,topic,fanout和header(这里选择direct)
  25. * 3.指定交换机是否需要持久化,如果设置为true,那么交换机的元数据要持久化
  26. * 4.指定交换机没有队列绑定时是否需要删除,设置为false表示不删除
  27. * 5.Map<String,Object>类型,用来指定我们交换机其它的一些结构化参数,我们在这里直接设置为null
  28. */
  29. channel.exchangeDeclare(exchangerName, BuiltinExchangeType.DIRECT,true,false,null);
  30. /**
  31. *生成一个队列
  32. * 1.队列名称
  33. * 2.队列是否需要持久化(只是队列名称持久化,而非队列中的消息)
  34. * 3.表示队列是否私有,只有创建他的应用程序才能消费消息
  35. * 4.队列在没有消费者订阅的情况下是否自动删除
  36. * 5.队列的一些结构化信息,比如声明死信队列,磁盘队列会用到
  37. */
  38. channel.queueDeclare(queueName,true,false,false,null);
  39. /**
  40. * 将我们的交换机和队列绑定
  41. * 1.队列名称
  42. * 2.交换机名称
  43. * 3.路由键,在我们直连模式下,可以为我们的队列名称
  44. */
  45. channel.queueBind(queueName,exchangerName,queueName);
  46. //发送消息
  47. String message = "hello rabbitmq";
  48. /**
  49. * 发送消息
  50. * 1.发送到哪个交换机
  51. * 2.队列名称
  52. * 3.其它参数信息
  53. * 4.发送消息的消息体
  54. */
  55. channel.basicPublish(exchangerName,queueName,null,message.getBytes());
  56. channel.close();//关闭信道
  57. connection.close();//关闭连接
  58. }
  59. }

3.消费者代码

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. // 创建连接工厂
  5. connectionFactory.setHost("ip地址");
  6. // RabbitMQ服务器地址(写自己服务器对应的ip地址)
  7. connectionFactory.setUsername("admin");
  8. // RabbitMQ用户名,这里是自定义用户名
  9. connectionFactory.setPassword("123456");
  10. // RabbitMQ密码,这里是自定义密码
  11. connectionFactory.setPort(5672);
  12. // RabbitMQ端口号
  13. Connection connection = connectionFactory.newConnection();
  14. //创建连接
  15. Channel channel = connection.createChannel();
  16. //创建信道
  17. DeliverCallback deliverCallback = (consumerTage,message) -> {
  18. System.out.println("接收到消息"+new String(message.getBody()));
  19. };
  20. CancelCallback cancelCallback = consumerTage-> {
  21. System.out.println("消息消费中断");
  22. };
  23. /**
  24. * 消费消息
  25. * 1.消费哪个队列
  26. * 2.消费成功后,是否需要自动应答,如果为true,则是自动应答
  27. * 3.接收消息的一个回调函数
  28. * 4.取消消息的回调函数
  29. */
  30. channel.basicConsume("ex_queue_name",true,deliverCallback,cancelCallback);
  31. channel.close();//关闭信道
  32. connection.close();//关闭连接
  33. }
  34. }

4.效果

为了显示效果,这里需要登录RabbitMQ对应的web登录管理界面:

如果不知如何启动RabbitMQ服务或登录该管理界面,参考之前文章Rabbitmq的安装与使用(Linux版)icon-default.png?t=N6B9https://blog.csdn.net/Kristabo/article/details/131965339

1.发送消息

 启动Producer程序:

可以看到多了一个名称为:"ex_queue_name"的队列,同时多了一条未消费信息:

2.消费消息

启动Consumer程序

 运行后,可以接收到发送的消息内容:

同时在此检查队列情况:

可以发现名称为 "ex_queue_name"的队列中已没有未读消息

5.注意

这里用到的是direct类型的交换机,如果还需要其他类型交换机相关代码参考,可关注公众号【蜗牛变涡流】,回复rabbitMQ获取完整代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/384260?site
推荐阅读
相关标签
  

闽ICP备14008679号