赞
踩
推荐阅读官方example,讲的非常非常详细了,按照同步异步订阅等等等类别都分好了,阅读example的体验感也很好。
官方example
官方example同样给的非常详细,但有些并不常用。但官网上的图绝对是学习与理解的利器。
官方example
package net.xdclass.direct;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Recv{
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
package net.xdclass.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String error = "我是订单服务的error日志";
String info = "我是订单服务的info日志";
String debug = "我是订单服务的debug日志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功");
}
}
}
package net.xdclass.pub;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Recv{
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
package net.xdclass.pub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World pub !";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
package net.xdclass.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body=" + new String(body, StandardCharsets.UTF_8));
//手工确认消息消费,不是多条确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//消费,关闭消息自动确认
channel.basicConsume(queueName,false,consumer);
}
}
package net.xdclass.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String error = "我是订单服务的error日志";
String info = "我是订单服务的info日志";
String debug = "我是订单服务的debug日志";
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("topic发送成功");
}
}
}
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词
一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order,
而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的;
如果是 ., 则info.order、error.order都会匹配
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。