赞
踩
public class Consumer { public static void main(String[] args) throws Exception{ //1 创建一个connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.159"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2通过连接工场创建连接 Connection connection = connectionFactory.newConnection(); //3通过connection创建channel Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String routingKey = "qos.#"; String queueName = "test_qos_queue"; channel.exchangeDeclare(exchangeName,"topic",true,false,null); channel.queueDeclare(queueName,true,false,false,null); channel.queueBind(queueName,exchangeName,routingKey); //第一个参数:限制消息的大小 第二个:限制一次接收的消息的个数 第三次:是否进行批量 channel.basicQos(0,1,false); //如果要使用限流方式 必须关闭自动签收下面第二行的false channel.basicConsume(queueName,false,new MyConsumer(channel)); } }
public class Producter { public static void main(String[] args) throws Exception{ //1 创建一个connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.159"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2通过连接工场创建连接 Connection connection = connectionFactory.newConnection(); //3通过connection创建channel Channel channel = connection.createChannel(); //开启消息的确认模式 channel.confirmSelect(); String exchangeName = "test_qos_exchange"; String routingKey = "qos.save"; //发送消息 String msg = "hello"; // channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes()); for(int i=0;i<5;i++){ channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes()); } } }
public class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel=channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("consumerTag: "+consumerTag); System.err.println("envelope "+envelope); System.err.println("properties "+properties); System.err.println("body "+new String(body)); //确认签收消息,并返回确认接收到消息,会返回broker一个应答 // channel.basicAck(envelope.getDeliveryTag(),false); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。