赞
踩
pom.xml文件导入包
<!-- spring-message 消息模块 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<!-- rabbitmq 消息模块 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
yml文件添加配置
spring.rabbitmq.addresses=192.168.99.100
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
三种交换器消息发送测试,代码如下:
@Autowired RabbitTemplate rabbitTemplate; @Test public void rabbitTest() { Map<String, Object> map = new HashMap<>(); // direct:单播 map.put("msg", "direct.pc.news"); rabbitTemplate.convertAndSend("exchange.direct","pc.news",map); // fanout,交换器内所有队列接收到消息 map.put("msg", "fanout.all"); rabbitTemplate.convertAndSend("exchange.fanout","",map); // topic,*.news接收到1号雇员信息 Employee empById = employeeMapper.getEmpById(1); rabbitTemplate.convertAndSend("exchange.topic","sl.news",empById); }
结果在页面上查看:
自定义Configuration,指定序列化格式为JSON,而非上面图片中默认的JDK序列化结果:
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
效果如下:
到这里,rabbitmq的重点还是在页面上exchange和queue的配置上…
启用rabbit监听,需要在Application中使用@EnableRabbit:
@SpringBootApplication
@EnableRabbit
public class MyfisrtspringbootApplication {
public static void main(String[] args) {
SpringApplication.run(MyfisrtspringbootApplication.class, args);
}
}
在Service中,通过在RabbitListener指定队列名读取队列中的数据
@Service
public class EmployeeService {
@RabbitListener(queues = "pc.news")
public void receive(Employee emp) {
System.out.println("接收员工消息:" + emp.toString());
}
@RabbitListener(queues = "pc.emps")
public void receive02(Message message) {
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}
启动Application,即可在控制台中看到输出,内容为上面RabbitTemplate中传入的数据
使用AmqpAdmin,对rabbitmq的组件进行操作
@Autowired AmqpAdmin amqpAdmin; @Test public void rabbitOperation() { // amqp声明队列 Queue queue = new Queue("amqp.queue"); amqpAdmin.declareQueue(queue); // amqp声明交换器 Exchange exchange = new DirectExchange("amqp.direct.exchaneg"); amqpAdmin.declareExchange(exchange); // amqp绑定 Binding binding = new Binding("amqp.queue", Binding.DestinationType.QUEUE, "amqp.direct.exchaneg", "amqp.routingKey", null); amqpAdmin.declareBinding(binding); // amqp删除绑定 amqpAdmin.removeBinding(binding); // amqp删除交换器 amqpAdmin.deleteQueue("amqp.queue"); // amqp删除队列 amqpAdmin.deleteExchange("amqp.direct.exchaneg"); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。