赞
踩
1 2 3 4 5 6 7 8 9 | brew install rabbitmq ## 进入安装目录 cd /usr/local/Cellar/rabbitmq/3.7.5 # 启动 brew services start rabbitmq # 当前窗口启动 rabbitmq-server |
启动控制台之前需要先开启插件
1 | ./rabbitmq-plugins enable rabbitmq_management |
进入控制台: http://localhost:15672/
用户名和密码:guest,guest
首先是得启动mq
1 2 3 4 5 6 | ## 添加账号 ./rabbitmqctl add_user admin admin ## 添加访问权限 ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" ## 设置超级权限 ./rabbitmqctl set_user_tags admin administrator |
pom引入依赖
1 2 3 4 | <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency> |
开始写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | public class RabbitMqTest { //消息队列名称 private final static String QUEUE_NAME = "hello"; @Test public void send() throws java.io.IOException, TimeoutException { //创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); //生成一个消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message = "Hello World RabbitMQ count: " + i; //发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //关闭消息通道和连接 channel.close(); connection.close(); } @Test public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息信道 Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("[*] Waiting for message. To exist press CTRL+C"); AtomicInteger count = new AtomicInteger(0); //消费者用于获取消息信道绑定的消息队列中的信息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(1000 * 60); } } |
需要注意的一点是:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
首先执行塞入数据,执行完毕之后,可以到控制台进行查看:
可以看到多出了一个Queue,对列名为hello,总共有10条数据
接下来就是消费数据了,执行consumer方法,输出日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | [*] Waiting for message. To exist press CTRL+C [x] Received 'Hello World RabbitMQ count: 0 [x] Done [x] Received 'Hello World RabbitMQ count: 1 [x] Done [x] Received 'Hello World RabbitMQ count: 2 [x] Done [x] Received 'Hello World RabbitMQ count: 3 [x] Done [x] Received 'Hello World RabbitMQ count: 4 [x] Done [x] Received 'Hello World RabbitMQ count: 5 [x] Done [x] Received 'Hello World RabbitMQ count: 6 [x] Done [x] Received 'Hello World RabbitMQ count: 7 [x] Done [x] Received 'Hello World RabbitMQ count: 8 [x] Done [x] Received 'Hello World RabbitMQ count: 9 [x] Done |
回头去查看queue,发现总得数据量为0了
对于ack的问题,如果在消费数据的时候,出现异常,而我不希望数据丢失,这个时候就需要考虑手动ack的机制来保证了
首先需要设置手动ack
1 2 | // 设置autoAck为false channel.basicConsume(QUEUE_NAME, false, consumer); |
其次在消费数据完毕之后,主动ack/nack
1 2 3 4 5 | if (success) { channel.basicAck(envelope.getDeliveryTag(), false); } else { channel.basicNack(envelope.getDeliveryTag(), false, false); } |
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。