赞
踩
Rabbitmq基础(一)Rabbitmq简介
Rabbitmq基础(二)安装与配置
生产者:
/** * 发送消息 */ public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.1.1");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/demo");//虚拟机 默认值/ factory.setUsername("yy");//用户名 默认 guest factory.setPassword("abc123");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ String body = "hello rabbitmq~~~"; //6. 发送消息 channel.basicPublish("","hello_world",null,body.getBytes()); //7.释放资源 channel.close(); connection.close(); } }
消费者:
省略多余的代码
//连接参数同上 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello_world",true,false,false,null); // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume("hello_world",true,consumer); //关闭资源?不要 因为消费者是监听功能,需要一直开启
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
代码实现:
生产者:
//连接参数同上 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String exchangeName = "test_fanout"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close();
消费者:
需要多个消费者,这里只写一个
//连接参数同上 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; // 接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("properties:"+properties);*/ System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer);
//连接参数同上 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_direct"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //6. 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ //队列1绑定 error channel.queueBind(queue1Name,exchangeName,"error"); //队列2绑定 info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error..."; //8. 发送消息 channel.basicPublish(exchangeName,"warning",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close();
消费者:
//连接参数同上 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer);
生产者:
//设置参数同上 Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_topic"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //6. 创建队列 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ // routing key 系统的名称.日志的级别。 //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close();
消费者:
//连接参数同上 //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; // 接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息存入数据库......."); } }; channel.basicConsume(queue1Name,true,consumer);
如果喜欢请点赞,收藏
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。