赞
踩
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
排队指的是应用程序通过 队列来通信。
队列的使用除去了接收和发送应用程序同时执行的要求。
其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。
这是 RabbitMQ的官网是 http://www.rabbitmq.com
我们先来整体构造一个结构图,这样会方便们更好地去理解RabbitMQ的基本原理。
通过上面这张应用相结合的结构图既能够清晰的看清楚整体的send Message到Receive Message的一个大致的流程
Queue(队列)RabbitMQ的作用是存储消息,队列的特性是先进先出。上图可以清晰地看到Client A和Client B是生产者,生产者生产消息最终被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据。可以简化成表示为:
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。
这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,
就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,
因为前面已经提及到了Queue会平分这些消息给相应的消费者。
这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
注意:这里的prefetchCount=1是指每次从Queue中发送一条消息来。等消费者处理完这条消息后Queue会再发送一条消息给消费者。
从第一个结构图里面可以观察到:消费者Client A和消费者Client B是如何知道我发送的消息是给Queue1还是给Queue2。
首先明确一点就是生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,当然这里还会对不符合路由规则的消息进行丢弃掉,这里指的是后续要谈到的Exchange Type。
那么Exchange是怎样将消息准确的推送到对应的Queue的呢?那么这里的功劳最大的当属Binding,RabbitMQ是通过Binding将Exchange和Queue链接在一起,这样Exchange就知道如何将消息准确的推送到Queue中去。
简单示意图如下所示:
那么一般 在绑定(Binding)Exchange和Queue的同时,一般会指定一个Binding Key,生产者将消息发送给Exchange的时候,
一般会产生一个Routing Key,当Routing Key和Binding Key对应上的时候,消息就会发送到对应的Queue中去。
那么Exchange有四种类型,不同的类型有着不同的策略。
也就是表明不同的类型将决定绑定的Queue不同,换言之就是说生产者发送了一个消息,Routing Key的规则是A,那么生产者会将Routing Key=A的消息推送到Exchange中,
这时候Exchange中会有自己的规则,对应的规则去筛选生产者发来的消息,
如果能够对应上Exchange的内部规则就将消息推送到对应的Queue中去。
那么接下来就来详细讲解下Exchange里面类型。
** fanout**
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
那么对于direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,
会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,
就会将消息传送给这两个队列,
如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,
这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。
topic
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。
它的约定规则是:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),
如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
Exchange规则 | |
---|---|
类型名称 | 类型描述 |
fanout | 把所有发送到该Exchange的消息路由到所有与它绑定的Queue中 |
direct | Routing Key==Binding Key |
topic | 模糊匹配 |
headers | Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 |
ConnectionFactory、Connection、Channel ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。 Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。 ConnectionFactory为Connection的制造工厂。 Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的, 包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。 Connection就是建立一个TCP连接,生产者和消费者的都是通过TCP的连接到RabbitMQ Server中的, 这个后续会再程序中体现出来。 Channel虚拟连接,建立在上面TCP连接的基础上,数据流动都是通过Channel来进行的。 为什么不是直接建立在TCP的基础上进行数据流动呢?如果建立在TCP的基础上进行数据流动,建立和关闭TCP连接有代价。 频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。 但是,在TCP连接中建立Channel是没有上述代价的。
那么下面就进行一下实验者生产环境下的基本管理与操作
首先:安装成服务的就用这个来启动
[root@controller ~]# service rabbitmq-server restart
Redirecting to /bin/systemctl restart rabbitmq-server.service
[root@controller ~]#
Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。
先来畅快的阅读一下rabbitmq的help文档
[root@controller ~]# rabbitmqctl help Usage: rabbitmqctl [-n <node>] [-t <timeout>] [-q] <command> [<command options>] Options: -n node -q -t timeout Default node is "rabbit@server", where server is the local host. On a host named "server.example.com", the node name of the RabbitMQ Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME has been set to some non-default value at broker startup time). The output of hostname -s is usually the correct suffix to use after the "@" sign. See rabbitmq-server(1) for details of configuring the RabbitMQ broker. Quiet output mode is selected with the "-q" flag. Informational messages are suppressed when quiet mode is in effect. Operation timeout in seconds. Only applicable to "list" commands. Default is "infinity". Commands: stop [<pid_file>] stop_app start_app wait <pid_file> reset force_reset rotate_logs <suffix> hipe_compile <directory> join_cluster <clusternode> [--ram] cluster_status change_cluster_node_type disc | ram forget_cluster_node [--offline] rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...] update_cluster_nodes clusternode force_boot sync_queue [-p <vhost>] queue cancel_sync_queue [-p <vhost>] queue purge_queue [-p <vhost>] queue set_cluster_name name add_user <username> <password> delete_user <username> change_password <username> <newpassword> clear_password <username> authenticate_user <username> <password> set_user_tags <username> <tag> ... list_users add_vhost <vhost> delete_vhost <vhost> list_vhosts [<vhostinfoitem> ...] set_permissions [-p <vhost>] <user> <conf> <write> <read> clear_permissions [-p <vhost>] <username> list_permissions [-p <vhost>] list_user_permissions <username> set_parameter [-p <vhost>] <component_name> <name> <value> clear_parameter [-p <vhost>] <component_name> <key> list_parameters [-p <vhost>] set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> clear_policy [-p <vhost>] <name> list_policies [-p <vhost>] list_queues [-p <vhost>] [<queueinfoitem> ...] list_exchanges [-p <vhost>] [<exchangeinfoitem> ...] list_bindings [-p <vhost>] [<bindinginfoitem> ...] list_connections [<connectioninfoitem> ...] list_channels [<channelinfoitem> ...] list_consumers [-p <vhost>] status node_health_check environment report eval <expr> close_connection <connectionpid> <explanation> trace_on [-p <vhost>] trace_off [-p <vhost>] set_vm_memory_high_watermark <fraction> set_vm_memory_high_watermark absolute <memory_limit> set_disk_free_limit <disk_limit> set_disk_free_limit mem_relative <fraction> <vhostinfoitem> must be a member of the list [name, tracing]. The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is "/". <queueinfoitem> must be a member of the list [name, durable, auto_delete, arguments, policy, pid, owner_pid, exclusive, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages, messages_ready_ram, messages_unacknowledged_ram, messages_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, message_bytes_persistent, head_message_timestamp, disk_reads, disk_writes, consumers, consumer_utilisation, memory, slave_pids, synchronised_slave_pids, state]. <exchangeinfoitem> must be a member of the list [name, type, durable, auto_delete, internal, arguments, policy]. <bindinginfoitem> must be a member of the list [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]. <connectioninfoitem> must be a member of the list [pid, name, port, host, peer_port, peer_host, ssl, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, peer_cert_subject, peer_cert_issuer, peer_cert_validity, state, channels, protocol, auth_mechanism, user, vhost, timeout, frame_max, channel_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, connected_at]. <channelinfoitem> must be a member of the list [pid, connection, name, number, user, vhost, transactional, confirm, consumer_count, messages_unacknowledged, messages_uncommitted, acks_uncommitted, messages_unconfirmed, prefetch_count, global_prefetch_count]. [root@controller ~]#
下面我们来进行一些简单的介绍
1). 服务器启动与关闭
启动: rabbitmq-server –detached
关闭:rabbitmqctl stop
若单机有多个实例,则在rabbitmqctlh后加–n 指定名称
2). 插件管理
开启某个插件:rabbitmq-pluginsenable xxx
关闭某个插件:rabbitmq-pluginsdisablexxx
注意:重启服务器后生效。
3).virtual_host管理
新建virtual_host: rabbitmqctladd_vhost xxx
撤销virtual_host:rabbitmqctl delete_vhost xxx
4). 用户管理
新建用户:rabbitmqctl add_user xxxpwd
删除用户: rabbitmqctl delete_user xxx
改密码: rabbimqctlchange_password {username} {newpassword}
设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}
Tag可以为 administrator,monitoring, management
5). 权限管理
权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read} Vhostpath Vhost路径 user 用户名 Conf 一个正则表达式match哪些配置资源能够被该用户访问。 Write 一个正则表达式match哪些配置资源能够被该用户读。 Read 一个正则表达式match哪些配置资源能够被该用户访问。
6). 获取服务器状态信息
服务器状态:rabbitmqctl status 队列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...] Queueinfoitem可以为:name,durable,auto_delete,arguments,messages_ready, messages_unacknowledged,messages,consumers,memory Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...] Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments. Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...] Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments Connection信息:rabbitmqctllist_connections [connectioninfoitem ...] Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。 Channel信息:rabbitmqctl list_channels[channelinfoitem ...] Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked
查看所有队列信息
[root@controller ~]#rabbitmqctl list_queues
关闭应用
[root@controller ~]# rabbitmqctl stop_app
- 1
启动应用,和上述关闭命令配合使用,达到清空队列的目的
[root@controller ~]# rabbitmqctl start_app
- 1
清除所有队列
[root@controller ~]# rabbitmqctl reset
- 1
rabbitmq常用命令
rabbitmq-server start 或者 service rabbitmq-server start #启动rabbitmq rabbitmqctl list_queues #分别查看当前系统种存在的Exchange和Exchange上绑定的Queue信息。 rabbitmqctl status #查看运行信息 rabbitmqctl stop #停止运行rabbitmq rabbitmq-plugins enable rabbitmq_management 启动rabbitmq的图形管理界面,这个操作必须重启rabbitmq, 然后在web中 http://127.0.0.1:15672 用户名和密码都是guest guest。如果局域网无法访问设置防火墙过滤规则或关闭防火墙。 rabbitmqctl list_exchanges rabbitmqctl list_bindings
rabbitmq安装Management Plugin
[root@controller ~]# rabbitmq-plugins enable rabbitmq_management
然后,重启rabbitmq:
service rabbitmq-server stop
service rabbitmq-server start
查看其状态
[root@controller ~]# rabbitmqctl status Status of node rabbit@controller ... [{pid,19854}, {running_applications,[{rabbit,"RabbitMQ","3.6.5"}, {os_mon,"CPO CXC 138 46","2.4"}, {rabbit_common,[],"3.6.5"}, {xmerl,"XML parser","1.3.10"}, {mnesia,"MNESIA CXC 138 12","4.13.4"}, {ranch,"Socket acceptor pool for TCP protocols.", "1.2.1"}, {sasl,"SASL CXC 138 11","2.7"}, {stdlib,"ERTS CXC 138 10","2.8"}, {kernel,"ERTS CXC 138 10","4.2"}]}, {os,{unix,linux}}, {erlang_version,"Erlang/OTP 18 [erts-7.3.1.2] [source] [64-bit] [smp:3:3] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory,[{total,364037264}, {connection_readers,958744}, {connection_writers,328736}, {connection_channels,1092584}, {connection_other,3488368}, {queue_procs,1912704}, {queue_slave_procs,0}, {plugins,0}, {other_proc,20687048}, {mnesia,421112}, {mgmt_db,0}, {msg_index,106664}, {other_ets,1140648}, {binary,307468624}, {code,19689812}, {atom,719761}, {other_system,6022459}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,3281313792}, {disk_free_limit,50000000}, {disk_free,85445070848}, {file_descriptors,[{total_limit,924}, {total_used,119}, {sockets_limit,829}, {sockets_used,117}]}, {processes,[{limit,1048576},{used,1663}]}, {run_queue,0}, {uptime,669}, {kernel,{net_ticktime,60}}] [root@controller ~]#
参考文档:
https://blog.csdn.net/whycold/article/details/41119807
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。