赞
踩
RabbitMQ提供了自己的管理界面,可以通过管理界面来完成VirtualHost、Exchange、queue的创建。
创建交换机的时候需要指定虚拟主机以及交换机的类型(direct(路由模式)、fanout(广播)、headers、topic)
direct:Exchange通过消息携带的路由键来将消息分发到对应的队列中
fanout:Exchange将消息分发到所有绑定到交换机的队列中
headers:Exchange通过判断消息头的值是否与绑定的值相匹配来分发消息
x-match为any时,消息头的任意一个值匹配就可以满足条件
x-match为all时,消息头的所有值匹配才能满足条件
topic:Exchange将满足路由规则的消息分发到对应的队列
创建队列时往往需要绑定到交换机上。
这是我们的虚拟主机以及交换机、队列就创建好了。我们可以使用java来操作。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
public static Connection getConnection() throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();//MQ采用工厂模式来完成连接的创建
//2.在工厂对象中设置连接信息(ip,port,virtualhost,username,password)
factory.setHost("xxxxxx");//设置MQ安装的服务器ip地址
factory.setPort(5672);//设置端口号
factory.setVirtualHost("host1");//设置虚拟主机名称
//MQ通过用户来管理
factory.setUsername("xxxxxx");//设置用户名称
factory.setPassword("123456");//设置用户密码
//3.通过工厂对象获取连接
Connection connection = factory.newConnection();
return connection;
}
//获取连接
Connection connection = MQUtil.getConnection();
//mq提供Channel来将处理消息
//创建Channel
Channel channel = connection.createChannel();
String msg = "hello world";
//basicPublish将消息发送到指定的交换机
channel.basicPublish("ex3", "a", null, msg.getBytes());
//关闭连接
channel.close();
connection.close();
//获取与MQ的连接 Connection connection = MQUtil.getConnection(); //创建Channel Channel channel = connection.createChannel(); //通过basicConsumer方法从指定队列中获取消息,消息生产者会通过ex2交换机中的key值将消息发送到queue6中,因为在创建queue6时绑定到交换机ex3中,指定的路由键为a。 //consumer参数是消息接收之后的处理方法 channel.basicConsume("queue6",true,consumer); //获取Consumer Consumer consumer = new DefaultConsumer(channel){ //重写handleDelivery方法(这个方法是消息的处理过程) //body参数就是接受到的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //将消息转换成String类型然后打印 String msg = new String(body); System.out.println(msg); } };
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
server:
port: 8001
spring:
application:
name: producer
rabbitmq:
host: xxxxxx #mq服务器的ip地址
port: 5672
virtual-host: host3
username: xxxxxx
password: xxxxxx
SpringBoot提供了AmqpTemplate来操作RabbitMQ
//注入AmqpTemplate
private AmqpTemplate amqpTemplate;
//通过converAndSend方法发送消息
String msg = "hello world";
amqpTemplate.converAndSend("ex3","b",msg);
编写配置文件
server:
port: 8002
spring:
application:
name: consumer
rabbitmq:
host: xxxxxx #mq服务器的ip地址
port: 5672
virtual-host: host3
username: xxxxxx
password: xxxxxx
SpringBoot中提供监听器来监听队列
@RabbitListener注解监听队列
@RabbitHandle注解作用于方法,用来处理消息
@RabbitListener(queues = "queue7")
public class MsgService {
@RabbitHandler
public void getMsg(String msg){
System.out.println(msg);
}
}
以上就是java操作RabbitMQ的基本内容。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。