赞
踩
1、配置pom
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.2.0.RELEASE</version>
- </dependency>
2、yml配置
spring:
#rabbit mq
rabbitmq:host: 192.168.1.146
port: 5672
username: guest
password: guest
3、连接、交换器、队列等设置
- @Configuration
- public class RabbitMqConfig {
-
- /**
- * 组件发布EXCHANGE
- */
- public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";
- public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";
-
- public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";
-
-
-
- /**
- * 组件发布消息队列
- */
- public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";
-
- public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";
-
- public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";
-
-
- public static final String ROUTING_KEY_="";
-
-
- @Value("${spring.rabbitmq.host}")
- private String host;
- @Value("${spring.rabbitmq.port}")
- private int port;
-
- @Value("${spring.rabbitmq.username}")
- private String username;
- @Value("${spring.rabbitmq.password}")
- private String password;
-
-
-
-
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true);
- return connectionFactory;
- }
-
- @Bean
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setMessageConverter(new Jackson2JsonMessageConverter());
- return factory;
- }
-
-
- /**
- *
- * @return
- */
- @Bean
- public FanoutExchange exchangeComponentPublished()
- {
- return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);
-
- }
-
- @Bean
- public Queue queueComponentPublished()
- {
- return new Queue(QUEUE_COMPONENT_PUBLISHED,false);
- }
-
- @Bean
- public FanoutExchange exchangeComponentSynced()
- {
- return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);
-
- }
-
- @Bean
- public FanoutExchange exchangeExtractionTask()
- {
- return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);
-
- }
-
- @Bean
- public Queue queueComponentSynced()
- {
- return new Queue(QUEUE_COMPONENT_SYNCED,false);
- }
-
-
- @Bean
- public Queue queueExtractionTask()
- {
- return new Queue(QUEUE_EXTRACTION_TASK,false);
- }
-
-
- @Bean
- public Binding componentSyncedBinding(){
- return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());
- }
-
- @Bean
- public Binding extractionTaskBinding(){
- return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());
- }
-
-
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
-
- RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
-
- return rabbitTemplate;
- }
-
-
- }
需要定义交换器,队列,绑定,会自动注册。
4、生产者
-
- @Component
- public class ExtractionTaskMessageSender {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void send(ExtractionTaskMessage msg)
- {
-
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
-
-
- ObjectMapper mapper = new ObjectMapper();
-
- Message m = null;
- try {
-
- m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);
-
-
- }
- }
使用RabbitTemplate发送消息。
5、消费者
-
- @Component
- @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK)
- public class ExtractionTaskMessageReceiver {
-
- private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);
-
- @Autowired
- private DataExtractorFactory dataExtractorFactory;
-
- @Autowired
- private ExtractionTaskService extractionTaskService;
-
- @Autowired
- private ExtractionDataService extractionDataService;
-
-
- @RabbitHandler
- public void process(@Payload ExtractionTaskMessage msg) {
- try {
-
-
- } catch (DataExtractionException e)
- {
- logger.error( "",e.getMessage());
- }
- catch (Exception e) {
- //异常处理
- logger.error(e.getMessage());
- }
- }
-
- }
使用 @RabbitListener 进行监听,@RabbitHandler定义消息处理方法。
在进行监听时,会查询所有@RabbitHandler注解的消息处理方法,如果没有参数类型匹配的方法,则异常。
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
注意:@RabbitListener注解在类上或方法上,行为不一样。在类上,消费方法参数类型不可以设置为Message。在方法上,方法类型可以为Message
Json格式
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setMessageConverter(new Jackson2JsonMessageConverter());
- return factory;
- }
自定义
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setMessageConverter(new MessageConverter() {
- @Override
- public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
- return null;
- }
-
- @Override
- public Object fromMessage(Message message) throws MessageConversionException {
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){
- return (User)ois.readObject();
- }catch (Exception e){
- e.printStackTrace();
- return null;
- }
- }
- });
-
- return factory;
- }
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
- value = @Queue(value = "consumer_queue",durable = "true"),
- key = "key.#"
- ))
- public void processMessage1(Message message) {
- System.out.println(message);
- }
- @Component
- @RabbitListener(queues = "consumer_queue")
- public class Receiver {
-
- @RabbitHandler
- public void processMessage1(String message) {
- System.out.println(message);
- }
-
- @RabbitHandler
- public void processMessage2(byte[] message) {
- System.out.println(new String(message));
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。