赞
踩
1、应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2、实现过程
1)首先建立消息的发布者
public class producer { //队列名称 private static final String QUEUE = "helloworld"; public static void main(String[] args) { //通过连接工程创建新连接和mq连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //设置mq端口 factory.setPort(5672); //设置用户名 factory.setUsername("guest"); //设置密码 factory.setPassword("guest"); //设置虚拟机 一个mq服务可以设置多个虚拟机 一个虚拟机相当于一个mq服务 factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立连接 connection = factory.newConnection(); //设置创建会话通道 所有通信都在通道中完成 channel = connection.createChannel(); /** * 声明队列 如果没有就创建 * 队列参数 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * 1、queue 队列名称 * 2、durable 是否持久化 如果是 mq关闭后重启队列还在 * 3、exclusive 是否独占 只能在该链接中访问 链接关闭自动删除 设为true可以实现临时队列 * 4、autoDelete 自动删除 和exclusive一样 设为true可以实现临时队列 * 5、arguments 队列参数 可以设置扩展参数 比如可以设置存活时间 * */ channel.queueDeclare(QUEUE,true,false,false,null); /** * 发送消息 * 参数 String exchange, String routingKey, BasicProperties props, byte[] body * 1、exchange 交换机 如果不指定则使用默认交换机(设置为 "") * 2、routingKey 路由key 交换机使用路由kye将消息转发给指定消息队列 如果使用默认交换机 则路由key设置为交换机名称 * 3、props 消息属性 * */ String message = "测试生成发送消息--hello"; channel.basicPublish("",QUEUE,null,message.getBytes()); System.out.print(message); }catch (Exception e){ e.printStackTrace(); }finally { //先关闭通道 再关连接 try { channel.close(); }catch (Exception e){ e.printStackTrace(); } try { connection.close(); }catch (Exception e){ e.printStackTrace(); } } }
}
2)设置消息的消费者
public class Consumer { private static final String QUEUE = "helloWorld"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 消费者接收消息调用此方法 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签标签 可以不设 可以在监听队列时设置 * @param envelope 信封 * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 envelope.getExchange(); //消息id mq在通道中(channel)标识消息id 可用于消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 编码格式设置utf-8 String message = new String(body, "UTF-8"); System.out.print(message); } }; //监听队列 //声明队列 channel.queueDeclare(QUEUE,true,false,false,null); /** *参数 * 1、string queue * 2、boolean autoAck * 3、Consumer callback */ channel.basicConsume(QUEUE,true,defaultConsumer); } }
3、测试工作队列模式
程序测试:
1、启动多个消费者。
2、生产者发送多个消息。
总结:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。