赞
踩
目录
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.12.0</version>
- </dependency>
- public class Producer {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- String exchangerName = "ex_exchanger_name";
- // 交换机名称
- String queueName = "ex_queue_name";
- // 队列名称
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 创建连接工厂
- connectionFactory.setHost("ip地址");
- // RabbitMQ服务器地址(写自己服务器对应的ip地址)
- connectionFactory.setUsername("admin");
- // RabbitMQ用户名,这里是自定义用户名
- connectionFactory.setPassword("123456");
- // RabbitMQ密码,这里是自定义密码
- connectionFactory.setPort(5672);
- // RabbitMQ端口号
-
- Connection connection = connectionFactory.newConnection();
- //创建连接
- Channel channel = connection.createChannel();
- //创建信道
-
- /**
- * 创建交换机
- * 1、交换机名称
- * 2.交换机类型,direct,topic,fanout和header(这里选择direct)
- * 3.指定交换机是否需要持久化,如果设置为true,那么交换机的元数据要持久化
- * 4.指定交换机没有队列绑定时是否需要删除,设置为false表示不删除
- * 5.Map<String,Object>类型,用来指定我们交换机其它的一些结构化参数,我们在这里直接设置为null
- */
- channel.exchangeDeclare(exchangerName, BuiltinExchangeType.DIRECT,true,false,null);
-
- /**
- *生成一个队列
- * 1.队列名称
- * 2.队列是否需要持久化(只是队列名称持久化,而非队列中的消息)
- * 3.表示队列是否私有,只有创建他的应用程序才能消费消息
- * 4.队列在没有消费者订阅的情况下是否自动删除
- * 5.队列的一些结构化信息,比如声明死信队列,磁盘队列会用到
- */
- channel.queueDeclare(queueName,true,false,false,null);
-
- /**
- * 将我们的交换机和队列绑定
- * 1.队列名称
- * 2.交换机名称
- * 3.路由键,在我们直连模式下,可以为我们的队列名称
- */
-
- channel.queueBind(queueName,exchangerName,queueName);
-
- //发送消息
- String message = "hello rabbitmq";
-
- /**
- * 发送消息
- * 1.发送到哪个交换机
- * 2.队列名称
- * 3.其它参数信息
- * 4.发送消息的消息体
- */
- channel.basicPublish(exchangerName,queueName,null,message.getBytes());
-
- channel.close();//关闭信道
- connection.close();//关闭连接
- }
-
- }
- public class Consumer {
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 创建连接工厂
- connectionFactory.setHost("ip地址");
- // RabbitMQ服务器地址(写自己服务器对应的ip地址)
- connectionFactory.setUsername("admin");
- // RabbitMQ用户名,这里是自定义用户名
- connectionFactory.setPassword("123456");
- // RabbitMQ密码,这里是自定义密码
- connectionFactory.setPort(5672);
- // RabbitMQ端口号
-
-
- Connection connection = connectionFactory.newConnection();
- //创建连接
- Channel channel = connection.createChannel();
- //创建信道
-
- DeliverCallback deliverCallback = (consumerTage,message) -> {
-
- System.out.println("接收到消息"+new String(message.getBody()));
-
- };
-
- CancelCallback cancelCallback = consumerTage-> {
- System.out.println("消息消费中断");
- };
-
- /**
- * 消费消息
- * 1.消费哪个队列
- * 2.消费成功后,是否需要自动应答,如果为true,则是自动应答
- * 3.接收消息的一个回调函数
- * 4.取消消息的回调函数
- */
-
- channel.basicConsume("ex_queue_name",true,deliverCallback,cancelCallback);
-
- channel.close();//关闭信道
-
- connection.close();//关闭连接
-
- }
- }
为了显示效果,这里需要登录RabbitMQ对应的web登录管理界面:
如果不知如何启动RabbitMQ服务或登录该管理界面,参考之前文章Rabbitmq的安装与使用(Linux版)https://blog.csdn.net/Kristabo/article/details/131965339
启动Producer程序:
可以看到多了一个名称为:"ex_queue_name"的队列,同时多了一条未消费信息:
启动Consumer程序
运行后,可以接收到发送的消息内容:
同时在此检查队列情况:
可以发现名称为 "ex_queue_name"的队列中已没有未读消息
这里用到的是direct类型的交换机,如果还需要其他类型交换机相关代码参考,可关注公众号【蜗牛变涡流】,回复rabbitMQ获取完整代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。