赞
踩
1.MQ基础
2.常见消息模型
3.消息的可靠性
4.消息持久化
5.消费者确认机制ACK
6.死信交换机
7.延时交换机
8.消息堆积
9.MQ集群
同步通讯:
异步通信
systemctl start docker # 启动docker服务
docker load -i mq.tar # 上传mq,并且安装
运行MQ
docker run
-e RABBITMQ_DEFAULT_USER=xiaowang
-e RABBITMQ_DEFAULT_PASS=123321
-v mq-plugins:/plugins
–name mq
–hostname mq
-p 15672:15672
-p 5672:5672
-d
rabbitmq:3.8-management
通过ip地址 + 端口访问 http://ip:15672/
消息发送者将消息发送交换机,交换机将消息路由交给队列,队列存储消息,消费者从队列获取消息,处理消息
发送者将信息发出,接收者创建队列查看是否存在当前队列,定义回调函数,将函数与队列做绑定,一般队列有消息,就会调用回调函数
发送者
public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); }
接收者
public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); }
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置AMQP地址
spring:
rabbitmq:
host: # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: xiaowang # 用户名
password: 123321 # 密码
发送者使用RabbitTemplate 发送队列消息
只能发送在已存在的队列
public class SpringAmpqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName = "simple.queue";
String message = "Hello,Spring amqp !!!!!";
rabbitTemplate.convertAndSend(queueName,message);
}
}
监听队列的消息
一旦接收到队列,该队列就会被删除
@RabbitListener(queues = "simple.queue")
public void ListenterSimpleQuery(String message){
System.out.println("消费者接收到发送者的消息:"+message);
}
两个消费者同时处理请求,
spring:
rabbitmq:
host: 192.168.13.133 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: xiaowang # 用户名
password: 123321 # 密码
listener:
simple:
prefetch: 1 # 每次只能获取到一条消息,处理完才能获取第二条消息
发布者发布消息到队列中,消费者获取到这条队列,队列种的数据就会删除.如果不想让队列删除,增加交换机,绑定队列,发布者发送至交换机,由交换机发布到每一个队列中
@Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
@RabbitListener(queues = "fanout.queue1")
public void ListenterSimpleQuery1(String message) throws InterruptedException {
Thread.sleep(20);
System.out.println("消费者接收到发送者的消息1:"+message+"------"+LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void ListenterSimpleQuerytwo2(String message) throws InterruptedException {
Thread.sleep(200);
System.out.println("消费者接收到发送者的消息2:"+message+"-------"+LocalTime.now());
}
// 发送到交换机
@Test
public void testSend(){
String switchName = "itcast.fanout" ;
String message = "hello.every one !!!!" ;
rabbitTemplate.convertAndSend(switchName , "",message);
}
发布者指定交换机,以及发送的key,对应的消费者key,就可以接收到
消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1") , //队列 exchange = @Exchange(name = "itcast.direct"), key = {"red","blue"} )) public void ListeDirectQueue1(String msg){ System.out.println("消费者1发送的消息是"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2") , //队列 exchange = @Exchange(name = "itcast.direct"),//交换机 key = {"red","yellow"} //指定key 访问哪个key就可以发送到哪一个队列 )) public void ListeDirectQueue2(String msg){ System.out.println("消费者2发送的消息是"+msg); }
发布者
// 发送到交换机
@Test
public void testSend2(){
String switchName = "itcast.direct" ;
String message = "hello.every red !!!!" ;
rabbitTemplate.convertAndSend(switchName , "red",message);
}
生产者发送的消息,消费者通过通配符的形式查看是否符合
消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1") , //队列 exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),//交换机 key = {"china.#"} //指定key 访问哪个key就可以发送到哪一个队列 )) public void ListeTopQueue1(String msg){ System.out.println("消费者china1发送的消息是"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2") , //队列 exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),//交换机 key = {"#.news"} //指定key 访问哪个key就可以发送到哪一个队列 )) public void ListeTopQueue2(String msg){ System.out.println("消费者china2发送的消息是"+msg); }
生产者
// 发送到交换机
@Test
public void testTop1(){
String switchName = "itcast.topic" ;
String message = "hello.every China !!!!" ;
rabbitTemplate.convertAndSend(switchName , "china.news",message);
}
@Test
public void testTop2(){
String switchName = "itcast.topic" ;
String message = "hello.天气 GOOd" ;
rabbitTemplate.convertAndSend(switchName , "china.weather",message);
}
引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
声明转化json的bean
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
发送定义转化json
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();//转json
}
可以显示中文
@Test
public void testTop3(){
Map<String,Object> map =new HashMap<>();
map.put("你好",123);
map.put("hello123",123);
rabbitTemplate.convertAndSend( "object.queue",map);
}
消费者监听消息转化为中文
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
消费者监听消息
@RabbitListener(queues = "object.queue")
public void Test(Map<String,Object> map) {
System.out.println("接收到object消息"+map);
}
消息丢失
1.生产者设置文件
2.设置交换机/队列/并绑定
3.判断消息是否发送到交换机
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration // 生产者确认机制,确认是否发送到交换机 public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 从IOC容器中,获取template对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 执行发送者的回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData // 自定义的数据 消息UUID * @param ack // 确认是否发送到交换机中 true是发送到交换机中 * @param cause // 原因 , 没有发送到交换机中的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("发送确认回调触发,当前消息的id{}",correlationData.getId()); if (ack){ log.info("消息已成功发入交换机中----------------"); }else { log.error("消息没有发送到交换机中!!!!!!!!! 原因未{}" , cause); // 这边进行业务重新发送 } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 这个方法触发,代表消息没能正确的路由到队列,被mq返回回来了 * @param message 返回的消息对象 * @param replyCode // 回复的状态编码 * @param replyTest // 回复的内容 * @param exchange // 交换机 * @param routingKey // 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyTest, String exchange, String routingKey) { log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode,replyTest,exchange,routingKey,message.toString()); // 这边进行业务重新发送 } }); } /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public DirectExchange simpleExchange(){ return new DirectExchange("simple.direct",false ,false); } /** * 第1个队列 */ @Bean public Queue simpleQueue(){ return new Queue("fanout.queue" ,false); } /** * 绑定队列和交换机 */ @Bean public Binding binding(){ return BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple"); } }
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CommonConfig { // 设置交换机并绑定队列 @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } // 失败策略 @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
处理消息堆积的方法
特点
实战
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。