赞
踩
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang写成。
MQ是一个互联网架构中常见的解耦利器。
什么时候不使用MQ?
上游实时关注执行结果
什么时候使用MQ?
1)数据驱动的任务依赖,多个任务需要轮流执行,轮流订阅上一个任务。
2)上游不关心多下游执行结果,上游执行完发送到MQ,多下游订阅MQ。
3)异步返回执行时间长
1、死信队列多了一个过期的机制,到期会自动去尝试消费,通过死信交换机做匹配,选择想要的消费失败的消息。
消息中间件是在消息传输过程中保存消息的容器。队列的主要目的是提供路由并保证消息的传递。
特点:
1、异步处理模式
2、多个应用程序调用关系为松耦合关系
传递模型:
1、点多点模型PTP
每个消息只用一个消费者
发送者和接收者没有时间依赖
接受者确认消息接受和处理成功
2、发布-订阅模型Pub/Sub
一对多关系,通过订阅主题,发布者建立一个订阅,订阅者保持持续的活动状态以接收消息。
每个消息可以有多个订阅者
客户端只有订阅后才能接收到消息,有时间依赖。
持久订阅 订阅关系建立后,消息不会消失,不管订阅者是否都在线
非持久订阅 订阅者为了接受消息,必须一直在线
生产者发送消息的流程:
1、生产者连接RabbitMQ,建立TCP连接( Connection),开启通道(Channel)
2、生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
3、生产者声明一个Queue(队列),并设置相关属性,如是否排他、是否持久化、是否自动删除等
4、生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
5、生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
6、相应的交换器根据接收到的 routingKey 查找相匹配的队列。
7、如果找到,则将从生产者发送过来的消息存入相应的队列中。
8、如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
9、关闭通道
10、关闭连接
消费者接收消息的过程:
1、消费者连接到RabbitMQ,建立一个连接(Connection) ,开启一个通道(Channel)
2、消费者向RabbitMQ请求消费相应队列中的消息
3、等待RabbitMQ回应并投递相应队列中的消息,消费者接收消息。
4、消费者确认(ack) 接收到的消息。
5、RabbitMQ 从队列中删除相应己经被确认的消息。
6、关闭通道
7、关闭连接
下载docker离线安装包。下载地址如下:
https://download.docker.com/linux/static/stable/x86_64/
将安装包上传到服务器,解压到指定目录 然后复制到/etc/bin下
cp ./docker/* /usr/bin
cd /etc/systemd/system/
touch docker.service
vim docker.service
复制以下内容
将–insecure-registry=192.168.100.185改成自己的IP
[Unit]
Description=Docker Application Container Engine
Documentation=https://docs.docker.com
After=network-online.target firewalld.service
Wants=network-online.target
[Service]
Type=notify
# the default is not to use systemd for cgroups because the delegate issues still
# exists and systemd currently does not support the cgroup feature set required
# for containers run by docker
ExecStart=/usr/bin/dockerd --selinux-enabled=false --insecure-registry=192.168.100.185
ExecReload=/bin/kill -s HUP $MAINPID
# Having non-zero Limit*s causes performance problems due to accounting overhead
# in the kernel. We recommend using cgroups to do container-local accounting.
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
# Uncomment TasksMax if your systemd version supports it.
# Only systemd 226 and above support this version.
#TasksMax=infinity
TimeoutStartSec=0
# set delegate yes so that systemd does not reset the cgroups of docker containers
Delegate=yes
# kill only the docker process, not all processes in the cgroup
KillMode=process
# restart the docker process if it exits prematurely
Restart=on-failure
StartLimitBurst=3
StartLimitInterval=60s
[Install]
WantedBy=multi-user.target
chmod +x docker.service
systemctl daemon-reload
systemctl start docker
systemctl status docker
# 启动docker
systemctl start docker
# 停止docker
systemctl stop docker
# 重启docker
systemctl restart docker
# 查看docker状态
systemctl status docker
# 开机启动
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help
docker search rabbitmq
Error response from daemon: Get "https://index.docker.io/v1/search?q=rabbitmq&n=25": dial tcp: lookup index.docker.io on 111.111.111.111:53: read udp 192.168.0.185:46483->111.111.111.111:53: i/o timeout
直接修改dns 为8.8.8.8然后重新加载网卡
docker seach rabbitmq
docker pull rabbitmq
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest --hostname myRabbit --name rabbitmq rabbitmq
参数说明:
-d:表示在后台运行容器;
-p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认的用户密码;
–hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
–name rabbitmq:设置容器名称;
rabbitmq:容器使用的镜像名称;
设置docker容器启动时 rabbitMQ自动启动:docker update rabbitmq --restart=always在通过 docker exec -it 容器id /bin/bash 进入容器内部在运行:rabbitmq-plugins enable rabbitmq_management方法二:docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
<!-- maven坐标 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducerUtil {
/**
* rabbitmq相关端口
* 5672: RabbitMQ的通讯端口
* 25672: RabbitMQ的节点间的CLI通讯端口是
* 15672: RabbitMQ HTTP_API的端口,管理员用户才能访问,用于管理RabbitMQ,需要启动Management插件。
* 1883,8883: MQTT插件启动时的端口。
* 61613、61614: STOMP客户端插件启用的时候的端口。
* 15674、15675: 基于webscoket的STOMP端口和MOTT端口
*/
public static void main(String[] args) {
// 获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机名 hostname
factory.setHost("");
factory.setVirtualHost("/");
// 用户名
factory.setUsername("");
// 密码
factory.setPassword("");
// amqp的端口号
factory.setPort(5672);
try(
// 建立TCP连接
Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
) {
// 声明交换器
// 交换器的名称,交换器的类型,是否是持久化,是否是自动删除,属性map集合
channel.exchangeDeclare("ex.demo", BuiltinExchangeType.DIRECT, false, false, null);
// 声明消息队列
// 消息队列名称,是否是持久化,是否是排他的,是否是自动删除,属性map集合
channel.queueDeclare("queue.test1", false, false, true, null);
// 将交换器和消息队列绑定
channel.queueBind("queue.test1", "ex.demo", "hello.demo");
for (int i = 0; i < 10; i++) {
// 发送消息
// 交换器的名字,消息的路由键,消息的属性,消息的字节数组
channel.basicPublish("ex.demo", "hello.demo", null, ("hello rabbitmq!this is " + i).getBytes());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
发送成功可管理界面查看:
未被消费数量
手动向队列推送数据
可能遇到的问题
1、创建账户
2、授权
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
public class RabbitMQConsumerUtil {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
ConnectionFactory factory = new ConnectionFactory();
//指定协议://用户名:密码@IP地址:端口号/虚拟主机
//虚拟主机为/,这里要写它的转义字符:%2f
factory.setUri("amqp://用户名:密码@ip:5672/%2f");
try(
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
) {
// 确保MQ中有该队列,如果没有则创建
channel.queueDeclare("queue.test1", false, false, true, null);
// 拉消息模式
// 指定从哪个消费者消费消息,指定是否自动确认消息
// final GetResponse getResponse = channel.basicGet("queue.test1", true);
// 获取消息体并打印
// final byte[] body = getResponse.getBody();
// System.out.println(new String(body));
// 监听消息,一旦有消息推送过来,就调用处理的回调函数
channel.basicConsume("queue.test1", (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, (consumerTag) -> {});
}catch (Exception e){
e.printStackTrace();
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。