赞
踩
先更新brew在,然后安装rabbitmq
- #更新brew
- brew update
- #安装rabbitmq
- brew install rabbitmq
打开配置文件
vi /usr/local/etc/rabbitmq/rabbitmq-env.conf
修改所以程序都可访问 = 后面为空格
NODE_IP_ADDRESS=
- #方法一:直接brew启动
- # 启动
- brew services start rabbitmq
- # 当前窗口启动
- rabbitmq-server
-
- #方法二:进入程序目录启动
- #程序目录
- cd /usr/local/sbin/
- #启动
- sudo ./rabbitmq-server
启动浏览器控制台之前需要先开启插件(执行一次以后不用再次执行)
待RabbitMQ 的启动完毕之后,另起终端进入
- cd /usr/local/Cellar/rabbitmq/3.7.7_1/sbin
- sudo ./rabbitmq-plugins enable rabbitmq_management
进入控制台: http://localhost:15672/
用户名和密码:guest,guest
浏览器访问端口:15672
程序访问端口:5672
启动成功
1. 添加账号
首先是得启动mq
- # 进入目录
- cd /usr/local/Cellar/rabbitmq/3.7.7_1/sbin
-
- ## 添加账号
- ./rabbitmqctl add_user admin admin
- ## 添加访问权限
- ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- ## 设置超级权限
- ./rabbitmqctl set_user_tags admin administrator
2. 编码实测
pom引入依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- </dependency>
开始写代码
- public class RabbitMqTest {
-
- //消息队列名称
- private final static String QUEUE_NAME = "hello";
-
- @Test
- public void send() throws java.io.IOException, TimeoutException {
-
- //创建连接工程
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
- //创建连接
- Connection connection = factory.newConnection();
-
- //创建消息通道
- Channel channel = connection.createChannel();
-
- //生成一个消息队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
-
-
- for (int i = 0; i < 10; i++) {
- String message = "Hello World RabbitMQ count: " + i;
-
- //发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
- System.out.println(" [x] Sent '" + message + "'");
- }
-
-
- //关闭消息通道和连接
- channel.close();
- connection.close();
-
- }
-
-
- @Test
- public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
-
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
-
- //创建连接
- Connection connection = factory.newConnection();
-
- //创建消息信道
- Channel channel = connection.createChannel();
-
- //消息队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- System.out.println("[*] Waiting for message. To exist press CTRL+C");
-
- AtomicInteger count = new AtomicInteger(0);
-
- //消费者用于获取消息信道绑定的消息队列中的信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
-
- try {
- System.out.println(" [x] Received '" + message);
- } finally {
- System.out.println(" [x] Done");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- channel.basicConsume(QUEUE_NAME, false, consumer);
-
- Thread.sleep(1000 * 60);
- }
- }

需要注意的一点是:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
3. 输出说明
首先执行塞入数据,执行完毕之后,可以到控制台进行查看:
可以看到多出了一个Queue,对列名为hello,总共有10条数据
接下来就是消费数据了,执行consumer方法,输出日志
- [*] Waiting for message. To exist press CTRL+C
- [x] Received 'Hello World RabbitMQ count: 0
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 1
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 2
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 3
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 4
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 5
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 6
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 7
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 8
- [x] Done
- [x] Received 'Hello World RabbitMQ count: 9
- [x] Done

回头去查看queue,发现总得数据量为0了
4. ACK问题
对于ack的问题,如果在消费数据的时候,出现异常,而我不希望数据丢失,这个时候就需要考虑手动ack的机制来保证了
首先需要设置手动ack
- // 设置autoAck为false
- channel.basicConsume(QUEUE_NAME, false, consumer);
其次在消费数据完毕之后,主动ack/nack
- if (success) {
- channel.basicAck(envelope.getDeliveryTag(), false);
- } else {
- channel.basicNack(envelope.getDeliveryTag(), false, false);
- }
转载:https://www.cnblogs.com/yihuihui/p/9095130.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。