赞
踩
rabbitmq 学习-1-AMQP介绍
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,
为面向消息的中间件设计。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP在消息提供者和客户端的行为进行了强制规定,使得不同卖商之间真正实现了互操作能力。
JMS是早期消息中间件进行标准化的一个尝试,它仅仅是在API级进行了规范,离创建互操作能力还差很
远。
与JMS不同,AMQP是一个Wire级的协议,它描述了在网络上传输的数据的格式,以字节为流。因此任何
遵守此数据格式的工具,其创建和解释消息,都能与其他兼容工具进行互操作。
AMQP规范的版本:
0-8 是2006年6月发布
0-9 于2006年12月发布
0-9-1 于2008年11月发布
0-10 于2009年下半年发布
1.0 draft (文档还是草案)
AMQP的实现有:
1)OpenAMQ
AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS。
2)Apache Qpid
Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET。
3)Redhat Enterprise MRG
实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有
Web控制台,还有许多企业级特征,客户端支持C++、Ruby、Java、JMS、Python和.NET。
4)RabbitMQ
一 个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、
Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在
Ubuntu、FreeBSD平台。
5)AMQP Infrastructure
Linux下,包括Broker、管理工具、Agent和客户端。
6)?MQ
一个高性能的消息平台,在分布式消息网络可作为兼容AMQP的Broker节点,绑定了多种语言,包括
Python、C、C++、Lisp、Ruby等。
7)Zyre
是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力。
RabbitMQ 是一个由 Erlang 写成的 Advanced Message Queuing Protocol (AMQP) 实现,AMQP 的出现
其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR
的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实
现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、
Redhat、iMatix 等联合制定了 AMQP 的公开标准。
Introduction:
RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP
standard. It is licensed under the open source Mozilla Public License and has a platform-neutral
distribution, plus platform-specific packages and bundles for easy installation.
Category:消息服务器 Tags: mq, AMQP,
下载与文档(Download Links & Docs):
● RabbitMQ 下载 : http://www.rabbitmq.com/download.html
rabbitmq java api guide:http://www.rabbitmq.com/api-guide.html
rabbitmq 学习-2-安装
Windows
1,下载
下载erlang:erlang.org
下载rabbitmq:http://www.rabbitmq.com/download.html
2,安装
先安装erlang,C:\Program Files\erl5.7.5
设置环境变量:ERLANG_HOME=C:\Program Files\erl5.7.5
解压rabbitmq,进入dos下,
>cd C:\rabbitmq_server-1.7.2\sbin
>rabbitmq-service install
3,启动
>cd C:\rabbitmq_server-1.7.2\sbin
>rabbitmq-service start
或都直接双击C:\rabbitmq_server-1.7.2\sbin\rabbitmq-service.bat
Linux
1,下载erlang for linux版本,安装
tar -xzf otp_src_R12B-5.tar.gz
cd otp_src_R12B-5
./configure --prefix=/usr
make && make install 或者先make再make install
2,下载rabbitmq for linux,最好直接下载:rabbitmq-server-generic-unix-1.7.2.tar.gz,如果下载其它
的,还要装simplejson(装simplejson得装python,还要能使用yum命令,比较复杂)
若下载的是rabbitmq-server-generic-unix-1.7.2.tar.gz,可以直接解压,像windows上一样启动,然后使用
即可
rabbitmq 学习-3-初试1
本例是一个简单的异步发送消息实例
1,发送端
@Test(groups = { "sunjun" })
public class RabbitmqTest {
private static Connection connection;
static {
ConnectionParameters params = new ConnectionParameters();
ConnectionFactory factory = new ConnectionFactory(params);
try {
connection = factory.newConnection("192.168.18.21",
AMQP.PROTOCOL.PORT);
} catch (IOException e) {
e.printStackTrace();
}
public void testSend() {
try {
Channel channel = connection.createChannel();
System.out.println(channel.toString());
Assert.assertNotNull(channel);
byte[] messageBodyBytes = "hello world".getBytes();
channel.basicPublish("exchangeName", "routingKey",
MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
} catch (Exception e) {
e.printStackTrace();
}
}
2,接收端
@Test(groups = { "sunjun" })
public class RabbitmqTestB {
private static Connection connection;
static {
ConnectionParameters params = new ConnectionParameters();
ConnectionFactory factory = new ConnectionFactory(params);
try {
connection = factory.newConnection("localhost", AMQP.PROTOCOL.PORT);
} catch (IOException e) {
e.printStackTrace();
}
public void testReceive() {
try {
Channel channel = connection.createChannel();
System.out.println(channel.toString());
Assert.assertNotNull(channel);
channel.exchangeDeclare("exchangeName", "direct");
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
boolean noAck = false;
GetResponse response = channel.basicGet("queueName", true);
if (response == null) {
System.out.println("No message retrieved.");
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
System.out.println(new String(body));
}
} catch (IOException e) {
e.printStackTrace();
}
}
先执行下发送端,再执行下接收端,输出:hello world
ok
rabbitmq 学习-4-初试2
RpcClient,RpcServer同步发送接收消息
Channel.basicPublish,Channel.basicGet异步发送接收消息
本例是一个简单的同步发送消息实例
1,发送端
public class Publish {
private static Connection connection;
static {
ConnectionParameters params = new ConnectionParameters();
ConnectionFactory factory = new ConnectionFactory(params);
try {
connection = factory.newConnection("localhost", AMQP.PROTOCOL.PORT);
} catch (IOException e) {
e.printStackTrace();
}
public static void main(String[] args) {
try {
Channel channel = connection.createChannel();
RpcClient rpc = new RpcClient(channel, "exchangeName", "routingKey");
byte[] primitiveCall = rpc.primitiveCall("hello world".getBytes());
System.out.println(new String(primitiveCall));
primitiveCall = rpc.primitiveCall("hello world2".getBytes());
System.out.println(new String(primitiveCall));
rpc = new RpcClient(channel, "exchangeName", "routingKey2");
primitiveCall = rpc.primitiveCall("hello world2".getBytes());
System.out.println(new String(primitiveCall));
System.out.println("publish success.");
} catch (Exception e) {
e.printStackTrace();
}
}
2,接收端
public class Receive {
private static Connection connection;
static {
ConnectionParameters params = new ConnectionParameters();
ConnectionFactory factory = new ConnectionFactory(params);
try {
connection = factory.newConnection("localhost", AMQP.PROTOCOL.PORT);
} catch (IOException e) {
e.printStackTrace();
}
public static void main(String[] args) {
try {
Channel channel = connection.createChannel();
System.out.println(channel.toString());
channel.exchangeDeclare("exchangeName", "topic");
channel.exchangeDeclare("exchangeName2", "topic");
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
channel.queueBind("queueName", "exchangeName", "routingKey2");
channel.queueBind("queueName", "exchangeName2", "routingKey2");
channel.queueBind("queueName", "exchangeName2", "routingKey");
//queue 与 exchange 是多对多的,可以把同一queue和exchange以多个不同的routing进行bind,这样就
会有多个routing,而不是一个,虽然说这些rout 是绑定相同的 exchange, queue
final RpcServer rpcServer = new RpcServer(channel, "queueName") {
@Override
public byte[] handleCall(byte[] requestBody, AMQP.BasicProperties replyProperties) {
System.out.println("receive msg: " + new String(requestBody));
return "return message".getBytes();
}
};
Runnable main = new Runnable() {
@Override
public void run() {
try {
throw rpcServer.mainloop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
new Thread(main).start();
System.out.println("receive success.");
} catch (IOException e) {
e.printStackTrace();
}
}
rabbitmq 学习-5-server管理
RabbitMQ Server Administrator's Guide
http://www.rabbitmq.com/admin-guide.html#installation
1, Rabbitmq DB
第一次启动,会检查是否有数据库,没有则创建 一个db,存放在
C:\Documents and Settings\Administrator\Application Data\RabbitMQ,DB会存储关于user,virtual
host,持久化消息等信息
此目录下有两个文件夹:db,log
并会使用到这些资源:
virtual host:/
user:guest/guest
guest会分配所有的权限到virtual host / 上
2, 命令行管理工具rabbitmqctl
Rabbitmqctl是rabbitmq的一个命令行管理工具,它用来对某个机器上(host)的节点(node)进行管理,本机
默认的node名称是”rabbit”,hostname可以使用hostname –s查看
执行相关命令时,可明确指定一个节点,例如:
rabbitmqctl –n node_name@host_name add_user username password
这个命令将在指定的机器(host_name)的节点(node_name)上创建一个用户
在启动rabbitmq的时命令行中,可以查看到node名称
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
RabbitMQ 1.6.0 (AMQP 8-0)
Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and R
abbit Technologies Ltd.
Licensed under the MPL. See http://www.rabbitmq.com/
node : rabbit@WWW-65592D80C4A //这里就是node_name@host_name
log : C:/Documents and Settings/Administrator/Application Data/RabbitMQ/
log/rabbit.log //日志目录
sasl log : C:/Documents and Settings/Administrator/Application Data/RabbitMQ/
log/rabbit-sasl.log //日志目录
database dir: c:/Documents and Settings/Administrator/Application Data/RabbitMQ/
db/rabbit-mnesia //db目录
starting database ...done
starting core processes ...done
starting recovery ...done
starting persister ...done
starting guid generator ...done
starting builtin applications ...done
starting TCP listeners ...done
broker running
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3, 查看node状态
rabbitmqctl -q status
输出:
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[{running_applications,[{rabbit,"RabbitMQ","1.6.0"},
{mnesia,"MNESIA CXC 138 12","4.4.10"},
{os_mon,"CPO CXC 138 46","2.2.2"},
{sasl,"SASL CXC 138 11","2.1.6"},
{stdlib,"ERTS CXC 138 10","1.16.2"},
{kernel,"ERTS CXC 138 10","2.13.2"}]},
{nodes,['rabbit@WWW-65592D80C4A']},
{running_nodes,['rabbit@WWW-65592D80C4A']}]
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4, 关闭rabbitmq
rabbitmqctl stop
直接关闭rabbitmq,关闭rabbitmq节点(elr进程也关掉了),需要通过rabbitmq-server 才能重新启动
rabbitmqctl stop_app
关闭rabbitmq应用程序,但是erl进行还在,可以通过rabbitmqctl start_app恢复
rabbitmqctl start_app
启动rabbitmq应用程序
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
E:\rabbitmq_server-1.6.0\sbin>rabbitmqctl stop_app
Stopping node 'rabbit@WWW-65592D80C4A' ...
...done.
E:\rabbitmq_server-1.6.0\sbin>rabbitmqctl stop
Stopping and halting node 'rabbit@WWW-65592D80C4A' ...
...done.
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
5, 查看rabbitmq状态
rabbitmqctl status
rabbitmqctl –q status
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Status of node 'rabbit@WWW-65592D80C4A' ...
[{running_applications,[{rabbit,"RabbitMQ","1.6.0"},
{mnesia,"MNESIA CXC 138 12","4.4.10"},
{os_mon,"CPO CXC 138 46","2.2.2"},
{sasl,"SASL CXC 138 11","2.1.6"},
{stdlib,"ERTS CXC 138 10","1.16.2"},
{kernel,"ERTS CXC 138 10","2.13.2"}]},
{nodes,['rabbit@WWW-65592D80C4A']},
{running_nodes,['rabbit@WWW-65592D80C4A']}]
...done.
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6, 重置rabbitmq
rabbitmqctl reset
rabbitmqctl force_reset
使用force_reset与reset不同之外在于:它会无条件的重置rabbitmq,不管rabbitmq database state,和
rabbitmq已经在集群环境中配置,这个命令应该在迫不得已的情况去使用
重置rabbitmq,会有以下影响:
1,从集群节点中删除
2,删除db数据,如user, vhost信息,持久化消息
重置成功后,必须stop rabbitmq,例如使用:rabbitmqctl stop_app
7, 修改日志文件后缀名
rabbitmqctl rotate_logs suffix_name
修改日志文件后缀名,执行以后,可以马上看到log目录下创建了指定后缀的日志文件:
rabbit.log.rabbit.log
rabbit-sasl.log.rabbit.log
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
E:\rabbitmq_server-1.6.0\sbin>rabbitmqctl rotate_logs .rabbit.log
Rotating logs to files with suffix ".rabbit.log" ...
...done.
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
8, 集群管理
rabbitmqctl cluster clusternode ...
9, rabbitmqctl 所有命令:
Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...]
Available commands:
stop - stops the RabbitMQ application and halts the node
stop_app - stops the RabbitMQ application, leaving the node running
start_app - starts the RabbitMQ application on an already-running node
reset - resets node to default configuration, deleting all data
force_reset
cluster <ClusterNode> ...
status
rotate_logs [Suffix]
close_connection <ConnectionPid> <ExplanationString>
add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
list_channels [<ChannelInfoItem> ...]
list_consumers [-p <VHostPath>]
Quiet output mode is selected with the "-q" flag. Informational
messages are suppressed when quiet mode is in effect.
<node> should be the name of the master node of the RabbitMQ
cluster. It defaults to the node named "rabbit" on the local
host. On a host named "server.example.com", the master node will
usually be rabbit@server (unless RABBITMQ_NODENAME has been set to
some non-default value at broker startup time). The output of hostname
-s is usually the correct suffix to use after the "@" sign.
The list_queues, list_exchanges and list_bindings commands accept an
optional virtual host parameter for which to display results. The
default value is "/".
<QueueInfoItem> must be a member of the list [name, durable,
auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid,
exclusive_consumer_tag, messages_ready, messages_unacknowledged,
messages_uncommitted, messages, acks_uncommitted, consumers,
transactions, memory]. The default is to display name and (number of)
messages.
<ExchangeInfoItem> must be a member of the list [name, type, durable,
auto_delete, arguments]. The default is to display name and type.
The output format for "list_bindings" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [pid, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout,
frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend]. The default is to display user, peer_address, peer_port
and state.
<ChannelInfoItem> must be a member of the list [pid, connection,
number, user, vhost, transactional, consumer_count,
messages_unacknowledged, acks_uncommitted, prefetch_count]. The
default is to display pid, user, transactional, consumer_count,
messages_unacknowledged.
The output format for "list_consumers" is a list of rows containing,
in order, the queue name, channel process id, consumer tag, and a
boolean indicating whether acknowledgements are expected from the
consumer.
rabbitmq 学习-6-rabbitmq基础
rabbitmq的中文资料真少,和同事lucas经过两周的学习,讨论,测试,终于搞清了部分rabbitmq的知
识,先总结一下
1,Connection
连 接,与rabbitmq server建立的一个连接,由ConnectionFactory创建,虽然创建时指定 了多个server
address,但每个connection只与一个物理的server进行连接,此连接是基于Socket进行连接的,
这个可以相似的理解为像一个DB Connection。
ConnectionParameters params = new ConnectionParameters();
params.setUsername(userName);
params.setPassword(password);
params.setVirtualHost(virtualHost);
params.setRequestedHeartbeat(0);
ConnectionFactory factory = new ConnectionFactory(params);
Connection conn = factory.newConnection(hostName,
AMQP.PROTOCOL.PORT);
2,Channel
建立在connection基础上的一个通道,相对于connection来说,它是轻量级的。可以这样理解,它就像是
hibernate里面的session一样,相对于DB Connection来说,session就是一个轻量级的东西。
Channel channel = conn.createChannel();
注:尽量避免在多线程中使用一个channel,Channel javadoc有如下说明:
While a Channel can be used by multiple threads, it's important to ensure
that only one thread executes a command at once. Concurrent execution of
commands will likely cause an UnexpectedFrameError to be thrown.
另官方Java Client API Guide里面也同样提到
Channel thread-safety
In general, Channel instances should not be used by more than one thread simultaneously: application
code should maintain a clear notion of thread ownership for Channel instances. If more than one
thread needs to access a particular Channel instance, the application should enforce mutual
exclusion itself, for example by synchronising on the Channel.
Symptoms of incorrect serialisation of Channel operations include, but are not limited to,
de>IllegalStateExceptionde>s with the message "cannot execute more than one synchronous
AMQP command at a time", and de>UnexpectedFrameErrorde>s.
3,Exchange,Queue,RoutingKey
先看下面一张图
蓝色-- Client(相对于Rabbitmq Server来说)
绿色--Exchange
红色—Queue
- 交换器(Exchange),它是发送消息的实体。
- 队列(Queue),这是接收消息的实体。
- 绑定器(Bind),将交换器和队列连接起来,并且封装消息的路由信息。
Exchange指向Queue的黑色线—RoutingKey,可以将它简单的理解为一条连接Exchange和Queue的路线
Exchange和Queue都需要通过channel来进行定义,而RoutingKey则只需要在binding时取个名字就行
了。
这一块的理解是不正确的,具体参见 rabbitmq 学习-8- Exchange Queue RoutingKey关系说明
左边的Client向右边的Client发送消息,流程:
1, 获取Conection
2, 获取Channel
3, 定义Exchange,Queue
4, 使用一个RoutingKey将Queue Binding到一个Exchange上
5, 通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
6, 接收方在接收时也是获取connection,接着获取channel,然后指定一个Queue直接到它关心的
Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消
息就OK了
一个Client发送消息,哪些Client可以收到消息,其核心就在于Exchange,RoutingKey,Queue的关系
上。
Exchange RoutingKey Queue
1 E1 R1 Q1
2 R2 Q2
3 E2 R3 Q1
4 R4 Q2
5 E1 R5 Q1
6 E2 R6 Q1
我们可以这样理解,RoutingKey就像是个中间表,将两个表的数据进行多对多关联,只不过对于相同的
Exchange和Queue,可以使用不同的RoutingKey重复关联多次。
注:本人现在使用的是rabbitmq server 1.7.2,它使用的AMQP 0.8协议,最新的1.0里面有些东西有变
化,比如Exchange type,1.0里面还有个system
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
Exchange Type 说明
fanout 所有bind到此exchange的queue都可以接收消息
direct 通过routingKey和exchange决定的那个唯一的queue可以接收消
息
topic 所有符合routingKey(此时可以是一个表达式)的routingKey所
bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于
使用fanout
rabbitmq 学习-7-rabbitmq 支持场景
What messaging scenarios are supported by AMQP and RabbitMQ?
AMQP is a very general system that can be configured to cover a great variety of messaging
middleware use-cases. For example:
● Point-to-point communication
● One of the simplest and most common scenarios is for a message producer to transmit a
message addressed to a particular message consumer. AMQP covers this scenario by allowing
queues to be named and to be bound to a "direct" exchange, which routes messages to queues
by name.
● One-to-many broadcasting (including multicast)
● In this scenario, the broadcasters publish messages to an AMQP "fanout" exchange, and
subscribers create and subscribe to their own private AMQP queues, which forward published
messages on to them, with one copy per queue.
● Multicast is addressed at the broker implementation level. AMQP clients need not be made
aware of transport-level optimisations such as multicast: broker clusters are free to use whatever
such low-level optimisations are available from configuration to configuration.
● Multiple optimisations are possible, since AMQP separates routing logic (exchanges and
bindings) from message queueing (queues). Multicast relates only to routing from message
publishers to message queues, and as a routing optimisation can be completely physically
decoupled from AMQP's logical semantics. Further optimisations include physical separation of
exchange from queue or even colocation of queue with a consumer application.
● Transactional publication and acknowledgement
● AMQP supports transactional publication, where an AMQP channel is opened, transactional
mode is selected, messages are published and acknowledged, and the transaction is committed.
The system guarantees atomicity and durability properties for transactional message activity.
● High-speed transient message flows
● Messages are individually flagged as transient or persistent in AMQP at the time of publication.
By sending messages outside the transactional part of the protocol, in non-persistent mode, an
application can achieve very high throughput and low latency.
● Reliable persistent message delivery
● Messages that are published in persistent mode are logged to disk for durability. If the server is
restarted, the system ensures that received persistent messages are not lost. The transactional
part of the protocol provides the final piece of the puzzle, by allowing the server to communicate
its definite receipt of a set of published messages.
● Store-and-forward
● Store-and-forward is implemented by delivering messages marked as "persistent" to AMQP's
durable queues. Published, persistent messages delivered to durable queues are stored on disk
until a consumer retrieves and deletes them.
● Wide area messaging
● Because routing logic is decoupled from message delivery, RabbitMQ is able to support
extended broker clustering across WANs. Some of the approaches include AJAX-style access
to AMQP resources, and spanning-tree pseudo-multicast implemented internally to a RabbitMQ
cluster.
● File streaming
● The AMQP protocol, version 0-8, supports file streaming by way of the de>filede> content class.
Very large files are transferred to a temporary area on the broker before being routed to queues
for download by consumers.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。