赞
踩
一个生产者对应一个消费者。
1、生产者
Connection connection= ConnectionUtil.getConnection("127.0.10.1",
5672,"/","admin_p","123");
Channel channel =connection.createChannel();
channel.queueDeclare(QUERY_NAME, false, false, false, null);
String message ="hello word 22222";
// 参数意思
channel.basicPublish("", QUERY_NAME, null, message.getBytes());
channel.close();
connection.close();
2、消费者
Connection connection= ConnectionUtil.getConnection("127.0.0.1", 5672 , "/", "admin_p", "123"); Channel channel = connection.createChannel(); channel.queueDeclare(QUERY_NAME,false,false,false,null); QueueingConsumer queueingConsumer =new QueueingConsumer(channel); //消费 channel.basicConsume(QUERY_NAME,true,queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message=new String(delivery.getBody()); System.out.print("获取到的消息----------------"+message); }
一个生产者对应多个消费者
1、生产者
package rabbmitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Produce { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException { // 连接 Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672 , "/", "admin_p", "123"); // 通道 Channel cha =connection.createChannel(); //队列 cha.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i=0;i<10;i++) { String message ="work"+i; cha.basicPublish("", QUEUE_NAME, null,message.getBytes()); System.out.println(message); Thread.sleep(10); } cha.close(); connection.close(); } }
2、消费者
消费者1
package rabbmitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Consumer { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException { Connection con= ConnectionUtil.getConnection("127.0.0.1",5672,"/", "admin_p", "123"); Channel channel =con.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //同一时刻只会给消费者发送一条消息 channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery(); String mess=new String(delivery.getBody()); System.out.println(mess); Thread.sleep(10); // 返回确认状太 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2
package rabbmitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class ConsumerTwo { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, InterruptedException { Connection con = ConnectionUtil.getConnection("127.0.0.1",5672,"/", "admin_p", "123"); Channel channel = con.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); // 能者多劳模式 channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery =queueingConsumer.nextDelivery(); System.out.println("------"+new String(delivery.getBody())); Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
交换器分为四种,分别是:direct、fanout、topic和 headers。
3.1、direct模式
如果路由键完全匹配的话,消息才会被投放到相应的队列。
>生产者
package rabbmitmq.root; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class rProduce { public static final String EX_CHANGENAME="ex_direct"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.exchangeDeclare(EX_CHANGENAME, "direct"); String message ="hell0 word direct"; channel.basicPublish(EX_CHANGENAME, "update", false, null, message.getBytes()); channel.close(); connection.close(); } }
消费者1
package rabbmitmq.root; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Consomer1 { public static final String Queen_NAME="direct1"; public static final String EX_CHANGENAME="ex_direct"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.queueDeclare(Queen_NAME, false, false ,false , null); channel.queueBind(Queen_NAME, EX_CHANGENAME,"select"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(Queen_NAME, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery =queueingConsumer.nextDelivery(); String message =new String(delivery.getBody()); System.out.println("1========="+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2
package rabbmitmq.root; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Consumer2 { public static final String Queen_NAME="direct2"; public static final String EX_CHANGENAME="ex_direct"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel = connection.createChannel(); channel.queueDeclare(Queen_NAME, false, false, false, null); channel.queueBind(Queen_NAME, EX_CHANGENAME, "update"); QueueingConsumer queueingConsumer= new QueueingConsumer(channel); channel.basicConsume(Queen_NAME, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery =queueingConsumer.nextDelivery(); String message =new String(delivery.getBody()); System.out.println("2========="+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
3.2、fanout模式
当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
生产者
package rabbmitmq.exchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class xProduce { private final static String ex_changename="fanout_exchange"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.exchangeDeclare(ex_changename,"fanout"); String message="hello word fanout oooo"; channel.basicPublish(ex_changename, "", null, message.getBytes()); channel.close(); connection.close(); } }
消费者1
package rabbmitmq.exchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class xConsumer { private final static String QUEUE_NAME = "fanout_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, queueingConsumer); while(true) { QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery(); System.out.println("1"+new String(delivery.getBody())); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2
package rabbmitmq.exchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class xConsumer2 { private final static String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection("127.0.0.0", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, queueingConsumer); while(true) { QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery(); System.out.println("2"+new String(delivery.getBody())); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
3.3、topic模式
设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
生产者
package rabbmitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class tProduce { private static final String Exchangename="ex_topic"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.exchangeDeclare(Exchangename, "topic"); String message = "hello topic"; channel.basicPublish(Exchangename, "update.*", null, message.getBytes()); channel.close(); connection.close(); } }
消费者1
package rabbmitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Consumer1 { private static final String Exchangename="ex_topic"; private static final String QueenName="topic_1"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p", "123"); Channel channel =connection.createChannel(); channel.queueDeclare(QueenName, false, false, false, null); channel.queueBind(QueenName, Exchangename,"update.*"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QueenName, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery(); String outMessage=new String(delivery.getBody()); System.out.println("1================="+outMessage); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2
package rabbmitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import rabbmitmq.Untity.ConnectionUtil; import java.io.IOException; public class Consumer2 { private static final String Exchangename="ex_topic"; private static final String QueenName="topic_2"; public static void main(String[] args) throws IOException, InterruptedException { Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "admin_p","123"); Channel channel = connection.createChannel(); channel.queueDeclare(QueenName, false, false, false, null); channel.queueBind(QueenName, Exchangename, "select.*"); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QueenName, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery(); String outMessage =new String(delivery.getBody()); System.out.println("2============"+outMessage); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。