赞
踩
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
虚拟主机(virtual host)或(vhost)
交换机(exchange)
队列(queue)
绑定器(bind)
连接(Connection)
通道 (Channel)
# 1.拉取镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker pull rabbitmq:3.8-management
# 2.查看镜像
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker images
# 3.启动
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
a5ca877169d4eda31a9c404f94565c118f1cc64de2cbb398ff17a66e72c682cd
# 4.查看启动情况
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a5ca877169d4 rabbitmq:3.8-management "docker-entrypoint.s…" 4 seconds ago Up 3 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq
问题
# 进入rabbitmq
[root@izbp1j6sv2mtxgnuthuw6bz ~]# docker exec -it rabbitmq bash
# 启动web管理界面
root@a5ca877169d4:/# rabbitmq-plugins enable rabbitmq_management
# 创建账号 root@a5ca877169d4:/# rabbitmqctl add_user admin admin Adding user "admin" ... Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more. # 设置用户角色 root@a5ca877169d4:/# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... # 设置用户权限 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 用户 admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 root@a5ca877169d4:/# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" Setting permissions for user "admin" in vhost "/" ... # 查看当前用户和角色 root@a5ca877169d4:/# rabbitmqctl list_users Listing users ... user tags admin [administrator] guest [administrator]
在本教程的这一部分中,我们将用 Java 编写两个程序;
发送单个消息的生产者和接收消息并将其打印出来的消费者
在下图中,“P”是我们的生产者,“C”是我们的消费者
中间的盒子是一个队列——RabbitMQ 代表消费者保留的消息缓冲区
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("IP"); factory.setUsername("admin"); factory.setPassword("admin"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 也就是是否用完就删除 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 - 延迟、死信等 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息,传递的消息携带的properties * 4.发送消息的消息体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); } }
import com.rabbitmq.client.*; public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("IP"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息........."); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; /** * 消费者消费消息 - 接受消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 * 4.消息被取消时的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("IP"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
import com.oddfar.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; // 这是一个工作线程,相当于之前的消费者 public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.basicQos(1); //消息接受 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("接收到消息:" + receivedMessage); }; //消息被取消 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }; System.out.println("C1 消费者启动等待消费.................. "); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完成:" + message);
}
}
}
server: 服务器
v-host:名称上来说,虚拟主机
exchange: 交换机(消息通常是发送到交换机)
queue: 队列(存放消息的)
bind: 将交换机和队列进行绑定
创建一个队列的时候,可以是非持久化的,也可以是持久化的
- 非持久化:rabbitmq如果重启,该队列就会被删除
- 持久化:重启不影响
- 消息持久化必须要消息队列持久化
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
消息持久化,可以一定程度上去预防消息丢失,需要设置
MessageProperties.PERSISTENT_TEXT_PLAIN
注意:配置了消息持久化,并不能完全保证消息不丢失,只能保证消息到了队列中不消失
消息丢失的方式:
- 发送方发送到队列的时候,丢了
- 交换机到队列中,丢了
- 队列到消费者,丢了,目前所学知识点,队列中还有消息,再次发送
后续解决方案,参考确认Confirm,Return
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
默认情况下,消费者接受到消息的时候(但是通常接收到消息就处理了),就会进行自动应答(ACK),如果一个消费者处理时间长,或者异常了,所以需要手动ACK
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//true表示自动:autoAck
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
//1.channel.basicConsume 将true改为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
//在finally中手动ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
工作模式中,默认轮询,会出现一些问题,导致消费不均匀,比如:十个消息,分别给两个消费者五个,但消费者能力强的先处理完这五个消息会出现空闲的情况,导致消费不均匀
所以我们可以设置一个抓取值,官网给的是一次抓取一个任务(每次抓取一个消息,当上一个消息没有ack的时候,不抓取新的消息)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:这里的抓取值给的是1,这个值太小(消费者能力比较强),但具体还是看服务性能和消息复杂度,通常情况下100-300
多出来的x就是交换机,交换机接收发送方的消息,将消息发送到消息队列中,交换机还有不同的类型
Exchange类型:
在上一个发布订阅中,实现了一对多,但是每一个消息,都会执行我们指定的两个操作,粒度更加细致
对于error级别的信息需要打印到控制台,并保存到本地,对于普通信息,只需要打印到控制台
发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。
这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节。
不仅仅是防止发送方到mq的消息丢失,还可以通过异步的方式来提高效率
- 为了消息不丢失,持久化,持久化是同步的,可以通过confirm的异步确认提高效率
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。