赞
踩
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ优势:
首先进入rabbitmq官网(rabbitmq.com)
向下滑动滚轮找到Download+Installation,点击进入
点击Install Windows安装windows版本
向下滑动滚轮找到下图框住的两个下载链接
下载的软件位置,先安装otp.exe,鼠标右键以管理员方式运行。接着选取要安装的路径,然后一路傻瓜式安装 next 下一步,安装即可。不要安装在中文或带空格的文件路径下。
配置系统环境变量:右键此电脑 - 属性 - 高级系统设置 - 环境变量。
接着打开 - 此电脑(文件资源管理器) 找到刚刚我们安装的文件 bin 目录下,复制路径 ctrl+c 切换窗口到环境变量,找到系统变量 path - 编辑
新建 - ctrl + v 粘贴刚才我们复制的路径,然后三次确定,关闭环境变量窗口
安装 RabbitMQ:右键管理员运行,然后选择安装路径,接着一路 next 下一步,遇到弹窗点允许,没有弹窗则无视(不要安装在中文或带空格的文件路径下)。安装完成后也要配置系统环境变量:配置的方法和上面一致,路径是安装的文件下的sbin下。
安装完成后找到安装文件路径,找到 sbin 目录下,全选路径 输入 cmd,打开cmd命令窗口,运行下面命令(这个指令是为了启动web界面)。
rabbitmq-plugins enable rabbitmq_management
然后再通过管理员权限打开一个cmd,输入rabbitmq-service stop,再输入rabbitmq-service start
(如果这里运行这两个命令不成功可能是rabbitmq服务器安装未成功,需要在管理员权限下的cmd执行rabbitmq-service install)
打开浏览器,访问 127.0.0.1:15672,出现管理页面就安装成功了。账户:guest,密码:guest。
第一步:在https://github.com/alanxz链接下载rabbitmq-c静态库。
第二步:使用cmake编译生成适合自己编译环境的工程
(1)填写源代码路径
(2)建立后的路径,build的文件夹一般建立在源代码路径里,也可以放到其他位置
(3)点击Configure按钮
(4)点击Generate按钮
(如出现如下报错,可以去掉ENABLE_SSL_SUPPORT括号里的勾。)
然后再将工程放在vs中编译生成即可在工程的build/librabbitmq/Debug文件夹下找到生成的rabbitmq.4.lib和rabbimq.4.dll了,将dll和lib放在自己所需的工程下即可用AMQP库的接口函数了。
virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
channel:通道,amqp支持一个tcp连接上启用多个mq通信通道,每个通道都可以被作为通信流。
publisher:生产者,是消息产生的源头。
exchange:交换机,可以理解为具有路由表的路由规则。
queues:队列,装载消息的缓存容器。
consumer:消费者,连接到队列并取走消息的客户端。
核心思想:在RabbitMQ中,生产者从不直接将消息发送给队列。
事实上,有些生产者甚至不知道消息是否被送到某个队列中去了。生产者只负责将消息送给交换机,而交换机确切地知道什么消息应该送到哪。
bind:绑定,实际上可以理解为交换机的路由规则。每个消息都有一个称为路由键的属性(routing key),就是一个简单的字符串。一个绑定将【交换机,路由键,消息送达队列】三者绑定在一起,形成一条路由规则。
exchange type:交换机类型:
fanout:不处理路由键,转发到所有绑定的队列上
direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发
topic:路由键模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
注意:这里用户的创建是在服务器的Web界面上创建的!并且用户所拥有的权限还决定能接收的信息。
创建用户方法:
第一步:打开浏览器,访问 127.0.0.1:15672,输入账户:guest,密码:guest。
第二步:点击导航栏里的Admin
第三步:点击Add a user左边的小三角,在username后面输入创建用户的名字,在Password里输出密码,下一行的(confirm)是确认密码,然后在最后一个输入的右侧选择你要创建用户的角色,这个决定用户拥有的权限,在目前我的探究里,如果发送方通信的方式选的是topic且接收方权限是Admin,那么Routingkey的设置将毫无意义,不管设置什么接收方都会接收到这个交换机里的所有信息,设置其他的权限则可以通过Routingkey来选择想要接收到的消息。
第四步:点击Add user,即可成功创建一个用户。
第五步:点击你创建的用户
第六步:设置用户的虚拟主机权限以及TOPIC权限,直接用默认的就行,点击上下两个Set permission就ok了
在Fanout模式下,消息发送流程是这样的:
可以有多个消费者
每个消费者有自己的queue(队列)
每个队列都要绑定到Exchange(交换机)
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
在Direct模型下:
队列与交换机的绑定,不是任意绑定,而是要指定一个RoutingKey(路由key)。消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange会判断每一个绑定的队列,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
在Topic模式下:
Topic模式的Exchange与Direct相比,可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
//创建一个新的AMQP连接,他将被用于RabbitMQ服务器进行通信 amqp_new_connection(); //这行代码创建了一个新的TCP套接字,并将其与之前的AMQP关联起来 amqp_tcp_socket_new(conn); /** * @brief amqp_socket_open() 用来打开一个与AMQP服务器的网络套接字连接。 * @param socket 用于存储套接字连接信息的指针。 * @param host AMQP服务器的主机名或IP地址 * @param port AMQP服务器主机的端口号,默认端口号是5672 */ amqp_socket_open(socket, "127.0.0.1", 5672); /** * @brief amqp_login() 用来来进行与AMQP服务器的身份验证和登录操作。 * @param conn 与AMQP服务器建立的连接。 * @param vhost 虚拟主机的名称,用于指定要连接的AMQP虚拟主机,"/"表示连接到默认的虚拟主机。 * @param channel_max 通道的最大数量。在这里,设置为0表示使用服务器的默认设置。 * @param frame_max 每个AMQP帧的最大字节大小。AMQP_DEFAULT_FRAME_SIZE表示使用默认的帧大小。 * @param heartbeat 心跳超时时间,用于检测与服务器之间的连接状态。在这里,设置为0表示禁用心跳检测。 * @param sasl_method 身份验证机制。AMQP_SASL_METHOD_PLAIN表示使用PLAIN身份验证机制。 * @param username 登录用户名。使用"guest"表示使用默认的用户名。 * @param password 登录密码。使用"guest"表示使用默认的密码。 */ amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); /** * amqp_bytes_t是AMQP库中的一种数据类型,用于表示字节数据。 * 具体而言,amqp_bytes_t是一个结构体类型,包含两个字段: * size_t len:表示字节数据的长度(大小)。 * void* bytes:指向字节数据的指针。 * 通过使用amqp_bytes_t类型,可以方便地处理和传递字节数据,如消息内容、队列名称等。在你的代码片段中,amqp_bytes_t用于作为参数传递给amqp_queue_declare()函数的queue参数。通过使用amqp_bytes_t类型的对象,可以指定要声明的队列名称。在你的例子中,使用了amqp_empty_bytes,表示不指定特定的队列名称,而是让服务器自动生成一个唯一的队列名称。总结起来,amqp_bytes_t是AMQP库中用于表示字节数据的数据类型。它具有表示字节数据长度和指向字节数据的指针的能力,方便在处理和传递字节数据时使用。 */ amqp_bytes_t t; //打开一个通道(channel),编号为1,所有的消息都是通过通道传输的。 const amqp_channel_t KChannel = 1; amqp_channel_open(conn, KChannel); //首先关闭通道,然后关闭连接,最后销毁连接。 amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn);
/** * @brief amqp_queue_declare() 向AMQP服务器发送一个队列声明请求,以创建一个新的消息队列或获取现有队列的信息。 * @param conn 与AMQP服务器建立的连接。 * @param channel 通道编号,用于指定操作的特定通道。 * @param queue 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。 * @param passive 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。 * @param durable 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。 * @param exclusive 表示队列是否为独占队列。如果设置为非零值(例如1),表示队列是独占的,只有声明队列的连接可以访问。如果设置为零值,则队列可以被多个连接访问。 * @param auto_delete 表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。 * @param arguments 可选参数,用于传递额外的队列参数。 */ amqp_queue_declare(m_pConn,m_iChannel_Receive,_queue,_passive,_durable,_exclusive,_auto_delete,amqp_empty_table); /** * @brief amqp_queue_bind() 用于将队列与交换机进行绑定。 * @param conn 与AMQP服务器建立的连接。 * @param channel 通道编号,用于指定操作的特定通道。 * @param queue 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。 * @param exchange 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。 * @param routing_key 绑定的路由键,用于指定消息从交换机路由到队列的条件。消息的路由键和绑定的路由键匹配时,消息会被路由到该队列。 * @param arguments 可选参数,用于传递额外的绑定参数。 */ amqp_queue_bind(m_pConn,m_iChannel_Receive,_queue,_exchange,_routekey,amqp_empty_table); /** * @brief amqp_basic_qos() 用来设置AMQP通道的QoS(Quality of Service)配置。 * @param m_pConn 与AMQP服务器建立的连接。 * @param m_iChannel 通道编号,用于指定操作的特定通道。 * @param prefetch_size 预取大小,用于限制服务器一次性发送给消费者的消息的总大小。在这里,设置为0表示不限制消息的总大小。 * @param prefetch_count 预取计数,用于限制服务器一次性发送给消费者的消息的数量。在这里,设置为1表示每次只获取一条消息。 * @param global 用于指定是否将预取限制应用到整个通道(global=true)还是仅应用于特定消费者(global=false)。在这里,设置为0表示仅应用于特定消费者。 */ amqp_basic_qos(conn, KChannel, 0, /*prefetch_count*/1, 0); /** * @brief amqp_basic_ack() 用来确认(acknowledge)已经成功处理的消息。 * @param m_pConn 与AMQP服务器建立的连接。 * @param m_iChannel 通道编号,用于指定操作的特定通道。 * @param delivery_tag 交付标识(delivery tag),它是一个标识特定消息的整数值。该标识用于唯一标识一条消息。 * @param multiple 用于指示是否同时确认多条消息。设置为false表示只确认当前的一条消息,true为确认多条消息 */ amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, true); /** * @brief 用于启动消费者并订阅消息队列中的消息。 * @param conn 与AMQP服务器建立的连接。 * @param channel 通道编号,用于指定操作的特定通道。 * @param queue 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。 * @param consumer_tag 消费者标识,用于标识特定的消费者。可以为消费者指定一个唯一的标识,也可以使用空字符 (amqp_empty_bytes) 自动分配一个标识。 * @param no_local 表示是否不接收自己发布的消息。如果设置为非零值(例如 1),表示不接收自己发布的消息。如果设置为零值,则接收自己发布的消息。 * @param no_ack 表示是否不需要发送确认消息。如果设置为非零值(例如 1),表示不需要发送确认消息,即消息不需要进行显式的确认。如果设置为零值,则需要发送确认消息,以确保消息的可靠传递和处理。 * @param exclusive 表示是否创建一个独占的消费者。如果设置为非零值(例如 1),表示创建一个独占的消费者,只有当前连接的消费者可以订阅指定队列。如果设置为零值,则可以有多个消费者订阅同一个队列。 * @param arguments 可选参数,用于传递额外的订阅参数。 */ amqp_basic_consume(m_pConn,m_iChannel_Receive,queuename,amqp_empty_bytes,false,false,false,amqp_empty_table); /** * @brief 用于从 AMQP 服务器接收订阅的消息,并将消息存储在 amqp_message_t 类型的结构体中,而不设置接收超时时间。 * @param conn 与 AMQP 服务器建立的连接。 * @param &envelope 指向 amqp_envelope_t 类型的指针,用于存储接收到的消息的包装信息(如交付标识、交换机名称等)。该结构体中的字段将被填充为接收到的消息的相关信息。 * @param timeval* 如果输入nullptr,表示没有设置接收超时时间,即函数将一直阻塞等待接收到消息。 * @param flags 0:表示没有设置特定的接收标志。 */ amqp_consume_message(m_pConn,&envelope,timeout,0);
/** * @brief amqp_exchange_declare() 用于向 AMQP 服务器发送一个交换机声明请求,以创建一个新的交换机或获取现有交换机的信息。 * @param conn 与AMQP服务器建立的连接。 * @param channel 通道编号,用于指定操作的特定通道。 * @param exchange 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。 * @param type 交换机的类型,可以是 "direct"、"fanout"、"topic"。 * @param passive 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。 * @param durable 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。 * @param auto_delete 表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。 * @param internal 表示交换机是否是内部的。如果设置为非零值(例如 1),表示交换机是内部的,只能通过其他交换机进行路由。如果设置为零值,则交换机可以直接接收消息。 * @param arguments 可选参数,用于传递额外的队列参数。 */ amqp_exchange_declare(m_pConn,m_iChannel_Send,_exchange,_type,_passive,_durable,0,0,amqp_empty_table); /** * @brief 用于将C风格字符串转换为AMQP字节序列的函数调用,函数的返回值是一个amqp_bytes_t类型的对象,其中包含转换后的字节序列的长度和指向数据的指针,这个的用途只是进行了一个转换,应该amqp库里的函数输入都不是char*以及string类型,都需要转成amqp_bytes_t类型 * @param cstr const char*类型字符串 */ amqp_cstring_bytes("direct"); /** * @brief 将一个消息发布到 RabbitMQ 服务器. * @param conn 已经打开的与 RabbitMQ 服务器的连接。 * @param replyChannel 消息应该发布到的通道,这通常与消息原始接收的通道相同。 * @param exchange 交换器名字。 * @param routekey 路由键。 * @param mandatory 如果设置为 true,当消息不能被路由到任何队列时,服务器会将消息返回给生产者。如果是false,表示如果消息不能被路由,服务器会丢弃它。 * @param immediate 如果设置为 true,当队列中没有消费者能立即接收到消息时,服务器会将消息返回给生产者。如果是false,表示服务器会将消息存储,直到有消费者能接收它。 * @param properties* 消息的属性。 * @param response_ 要发布的消息体。 */ amqp_basic_publish(m_pConn,m_iChannel_Send,_exchange,_routekey,0,0,NULL,message_bytes);
原文链接:https://blog.csdn.net/qq_43410878/article/details/123656765
原文链接:https://zhuanlan.zhihu.com/p/640374684
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。