RabbitMQ 基于Erlang 实现, 客户端可以用Python | Java | Ruby | PHP | C# | Javascript | Go等语言来实现。这里做个java语言的测试。
首先安装好RabbitMQ 服务端。
maven依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency>
java测试代码如下:
- //定义队列
- EndPoint.java
- public abstract class EndPoint{ protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException{ this.endPointName = endpointName; //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("192.168.163.33"); factory.setPort(5672); factory.setUsername("test"); factory.setPassword("test"); //creating a channel channel = connection.createChannel(); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * Close channel and connection. Not necessary as it happens implicitly any way. * @throws IOException */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } } //生产者 Producer.java public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } } //消费队列 QueueConsumer.java public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} } //测试用例 Main.java public class Main { public Main() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 100000; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception{ new Main(); } }
运行报以下错误
Exception in thread "main" com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:355) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) at com.tony.test.EndPoint.<init>(EndPoint.java:26) at com.tony.test.QueueConsumer.<init>(QueueConsumer.java:16) at com.tony.test.test.<init>(test.java:13) at com.tony.test.test.main(test.java:33) Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:347) ... 6 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:209) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)
网上Rabbit性能测试
性能测试
上图可以看到每秒百万级别的消息进出数量,以及2343条的消息在队列中等待。