赞
踩
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
在ApplicationTests测试类中添加测试方法,进行测试。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put(“desc”, “信息描述…”);
messageProperties.getHeaders().put(“type”, “自定义消息类型…”);
//消息体,与参数
Message message = new Message(“Hello RabbitMQ”.getBytes(), messageProperties);
//转换并发送
//MessagePostProcessor 在消息发送完毕后再做一次转换进行再加工,匿名接口,需要重写方法
rabbitTemplate.convertAndSend(“topic001”, “spring.amqp”, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println(“------添加额外的设置---------”);
message.getMessageProperties().getHeaders().put(“desc”, “额外修改的信息描述”);
message.getMessageProperties().getHeaders().put(“attr”, “额外新加的属性”);
return message;
}
});
}
运行前,可以看到queue001
中是没有消息的。
运行testSendMessage()方法。并获取消息。
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(“text/plain”);
Message message = new Message(“mq 消息1234”.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.abc”, message);
rabbitTemplate.convertAndSend(“topic001”, “spring.amqp”, “hello object message send!”);
rabbitTemplate.convertAndSend(“topic002”, “rabbit.abc”, “hello object message send!”);
}
我们往topic001中发送了两条消息,topic002中发送了一条消息。运行testSendMessage2() 接下来再查看下管控台
。
可以看到topic001中已经有了三条消息,刚才发送的消息也还在。GetMessage并不是消费消息,而只是获取消息。
5. SimpleMessageListenerContainer
简单消息监听容器
这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
设置消费者数量、最小最大数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的监听器、消息转换器等等。
注意:
SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
很多机遇RabbitMQ的自制定话后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大
思考
SimpleMessageListenerContainer为什么可以动态感知配置变更?
配置中添加如下代码:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//添加多个队列进行监听
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//当前消费者数量
container.setConcurrentConsumers(1);
//最大消费者数量
container.setMaxConcurrentConsumers(5);
//设置重回队列,一般设置false
container.setDefaultRequeueRejected(false);
//设置自动签收机制
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置listener外露
container.setExposeListenerChannel(true);
//消费端标签生成策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
//每个消费端都有自己独立的标签
return queue + “_” + UUID.randomUUID().toString();
}
});
//消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
return container;
}
运行之前写的testSendMessage2()方法,查看管控台中的相关信息以及控制台打印信息
MessageListenerAdapter 即消息监听适配器
我们把之前的消息监听代码注释,可以不用直接加消息监听,而是采用MessageListenerAdapter的方式,通过适配器方式1,我们来学习下如何使用默认的handleMessage,自定义方法名,自定义转换器。
使用默认handleMessage
//消息监听
/*container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});*/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
MessageListenerAdapter 适配器类,熟悉适配器模式的朋友肯定了解适配器模式的话,可以通过适配器,适配自己的实现,这里我们适配自定义的MessageDelegate
类。我们就可以不采用监听的方式,采用适配的方式。
自定义MessageDelegate
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.err.println(“默认方法, 消息内容:” + new String(messageBody));
}
}
MessageDelegate类中,方法名与参数handleMessage(byte[] messageBody)
是固定的。为什么呢?
MessageListenerAdapter源码分析
我们来看下MessageListenerAdapter底层代码
MessageListenerAdapter类中
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = “handleMessage”;
默认方法名就是叫handleMessage。当然也可以自己去指定设置。通过messageListenerAdapter的代码我们可以看出如下核心属性
defaultListenerMethod默认监听方法名称:用于设置监听方法名称
Delegate 委托对象:实际真实的委托对象,用于处理消息
queueOrTagToMethodName 队列标识与方法名称组成集合
可以一一进行队列与方法名称的匹配
队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
测试一下默认使用的handleMessage方法。启动ApplicationTests类,运行testSendMessage()测试方法。
自定义方法名
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod(“consumeMessage”);
container.setMessageListener(adapter);
修改MessageDelegate()类
public class MessageDelegate {
public void consumeMessage(byte[] messageBody) {
System.err.println(“字节数组方法, 消息内容:” + new String(messageBody));
}
}
自定义TextMessageConverter转换器
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains(“text”)) {
return new String(message.getBody());
}
return message.getBody();
}
}
修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod(“consumeMessage”);
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
修改MessageDelegate类
public class MessageDelegate {
public void consumeMessage(String messageBody) {
System.err.println(“字符串方法, 消息内容:” + messageBody);
}
}
运行testSendMessage4Text()测试方法
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(“text/plain”);
Message message = new Message(“mq 消息1234”.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.abc”, message);
rabbitTemplate.convertAndSend(“topic001”, “spring.amqp”, “hello object message send!”);
rabbitTemplate.convertAndSend(“topic002”, “rabbit.abc”, “hello object message send!”);
}
注意:在发消息的时候,必须符合自己的转换器。
打印结果
自定义队列名称和方法名称。
/**
2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(“queue001”, “method1”);
queueOrTagToMethodName.put(“queue002”, “method2”);
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
public class MessageDelegate {
public void method1(String messageBody) {
System.err.println(“method1 收到消息内容:” + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println(“method2 收到消息内容:” + new String(messageBody));
}
}
运行 测试方法
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(“text/plain”);
Message message = new Message(“mq 消息1234”.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.abc”, message);
rabbitTemplate.send(“topic002”, “rabbit.abc”, message);
}
运行结果:
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
重写下面两个方法:
toMessage:java对象转换为Message
fromMessage:Message对象转换为java对象
Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
自定义二进制转换器:比如图片类型、PDF、PPT、流媒体
其实我们在介绍MessageListenerAdapter的时候,中间就介绍到了TextMessageConverter转换器,将二进制数据转换成字符串数据。
修改RabbitMQConfig类
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod(“consumeMessage”);
//重点,加入json格式的转换器 json对应Map对象
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate {
//json对应Map对象
public void consumeMessage(Map messageBody) {
System.err.println(“map方法, 消息内容:” + messageBody);
}
}
定义一个Order对象
public class Order {
private String id;
private String name;
private String content;
…省略get/set等方法
}
定义测试方法
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId(“001”);
order.setName(“消息订单”);
order.setContent(“描述信息”);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType(“application/json”);
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.order”, message);
}
打印结果:
修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod(“consumeMessage”);
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//需要将javaTypeMapper放入到Jackson2JsonMessageConverter对象中
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate {
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
}
定义测试方法
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId(“001”);
order.setName(“订单消息”);
order.setContent(“订单描述信息”);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType(“application/json”);
//添加typeid 与类的全路径
messageProperties.getHeaders().put(“TypeId”, “com.cp.spring.entity.Order”);
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.order”, message);
}
打印结果:
修改RabbitMQConfig类
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod(“consumeMessage”);
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类
Map<String, Class<?>> idClassMapping = new HashMap
idClassMapping.put(“order”, com.cp.spring.entity.Order.class);
idClassMapping.put(“packaged”, com.cp.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
//一层套一层
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate {
//json对应Map对象
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
}
定义一个Packaged对象
public class Packaged {
private String id;
private String name;
private String description;
…省略get/set等方法
}
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
Java面试核心知识点笔记
其中囊括了JVM、锁、并发、Java反射、Spring原理、微服务、Zookeeper、数据库、数据结构等大量知识点。
Java中高级面试高频考点整理
最后分享Java进阶学习及面试必备的视频教学
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
/set等方法
}
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
[外链图片转存中…(img-j0iRMPRL-1712744002205)]
[外链图片转存中…(img-Z1pDGDvg-1712744002205)]
[外链图片转存中…(img-XMwEKdEA-1712744002205)]
[外链图片转存中…(img-VL9Jv3Mz-1712744002206)]
[外链图片转存中…(img-qONFliLv-1712744002206)]
[外链图片转存中…(img-0DtkhT3f-1712744002206)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
[外链图片转存中…(img-uy7cj6Jb-1712744002207)]
Java面试核心知识点笔记
其中囊括了JVM、锁、并发、Java反射、Spring原理、微服务、Zookeeper、数据库、数据结构等大量知识点。
[外链图片转存中…(img-AY5Cmrnl-1712744002207)]
Java中高级面试高频考点整理
[外链图片转存中…(img-Q5AZpkLU-1712744002207)]
[外链图片转存中…(img-vmE8jw9K-1712744002208)]
最后分享Java进阶学习及面试必备的视频教学
[外链图片转存中…(img-NEdmDc3w-1712744002208)]
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
[外链图片转存中…(img-LPnjGbUl-1712744002209)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。