赞
踩
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。
1.基本消息模型:生产者–>队列–>一个消费者
2.work消息模型:生产者–>队列–>多个消费者共同消费
3.订阅模型-Fanout:广播,将消息交给所有绑定到交换机的队列,每个消费者都可以收到同一条消息
4.订阅模型-Direct:定向,把消息交给符合指定 rotingKey 的队列(路由模式)
5.订阅模型-Topic:通配符,把消息交给符合routing pattern(主题模式)的队列
(3、4、5这三种都属于订阅模型,只不过进行路由的方式不同)
下述均为引入rabbitmq的自有依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.2.0</version>
- </dependency>
第一种:简单队列模式
单个发送者(生产者)将消息发送到队列(每个队列都有一个唯一名字)单个接受者(消费者)获取消息
生产者示例代码:
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.*;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 简单队列模式(生产者)
- */
- public class SimpleModeProducer {
-
- private static String QUEUE_NAME = "SIMPLE_MODE";
-
- public static void main(String[] args) throws Exception {
- Map map =new HashMap();
- map.put("text", "测试内容");
-
- //1.连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
- //2.创建一个连接
- Connection connection = factory.newConnection();
- //3.创建一个管道
- Channel channel = connection.createChannel();
- //4.定义创建一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //5.发送消息(注意发送和接受段相同字符集否则出现乱码)
- channel.basicPublish("", QUEUE_NAME, null, ser(map));
- System.out.println("生产成功!!");
- channel.close();
- connection.close();
- }
-
- /**
- * 将对象序列化为字节数组
- */
- private static byte[] ser(Object obj) throws IOException{
- ByteArrayOutputStream baos =new ByteArrayOutputStream();
- ObjectOutputStream oos =new ObjectOutputStream(baos);
- oos.writeObject(obj);
- return baos.toByteArray();
- }
- }
消费者示例代码:
- import com.rabbitmq.client.*;
- import java.io.ByteArrayInputStream;
- import java.io.ObjectInputStream;
- import java.util.Map;
-
- /**
- * 简单队列模式(消费者)
- */
- public class SimpleModeConsumer {
-
- private static String QUEUE_NAME = "SIMPLE_MODE";
-
- public static void main(String[] args)throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- //MQ的服务ip
- factory.setHost("127.0.0.1");
- //端口
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
- //创建一个连接
- Connection connection = factory.newConnection();
- //创建一个管道
- Channel channel = connection.createChannel();
- //创建一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
- try {
- Map map = (Map) dser(body);
- System.out.println(map.get("text").toString());
- System.out.println("接收成功");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
-
- /**
- * 反序列化
- */
- private static Object dser(byte[] src) throws Exception {
- // 从字节数组读取数据
- ByteArrayInputStream bis = new ByteArrayInputStream(src);
- // 把字节数组反序列化成对象
- ObjectInputStream ois = new ObjectInputStream(bis);
- return ois.readObject();
- }
- }
先启动消费者,然后再启动生产者,其输出结果如下:
第二种:工作队列模式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。