赞
踩
目录
(1)在application.properties中添加mq配置
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <!-- mq的依赖 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>1.3.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.47</version>
- </dependency>
- #mq的连接信息,可直接多host连接和单host连接
- mq.rabbit.address=192.168.1.1:5672,192.168.1.2:5672
- mq.rabbit.virtualHost=/
- mq.rabbit.username=guest
- mq.rabbit.password=guest
- mq.rabbit.exchange.name=mq.direct
-
- #消费者数量
- mq.concurrent.consumers=4
-
- #每个消费者获取的最大的消息投递数量
- mq.prefetch.count=100
- @Configuration
- public class RabbitConfig {
-
- @Value("${mq.rabbit.address}")
- String address;
- @Value("${mq.rabbit.username}")
- String username;
- @Value("${mq.rabbit.password}")
- String password;
- @Value("${mq.rabbit.virtualHost}")
- String mqRabbitVirtualHost;
- @Value("${mq.rabbit.exchange.name}")
- String exchangeName;
-
- @Value("${mq.concurrent.consumers}")
- int concurrentConsumers;
- @Value("${mq.prefetch.count}")
- int prefetchCount;
-
- //创建mq连接
- @Bean(name = "connectionFactory")
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
-
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(mqRabbitVirtualHost);
-
- connectionFactory.setPublisherConfirms(true);
-
- //该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host
- connectionFactory.setAddresses(address);
- return connectionFactory;
- }
-
- @Bean("exchangeName")
- public Exchange exchangeName(){
- //Map<String,Object> args=new HashMap<>();
- //args.put("x-delayed-type","direct");
- //return new CustomExchange(exchangeName,"x-delayed-message",true,false,args);
- return new DirectExchange(exchangeName,true,false);
- }
-
- //监听处理类
- @Bean
- @Scope("prototype")
- public HandleService handleService() {
- return new HandleService();
- }
-
- //动态创建queue,命名为:hostName.queue1【192.168.1.1.queue1】,并返回数组queue名称
- @Bean
- public String[] mqMsgQueues() throws AmqpException, IOException {
- String[] queueNames = new String[queueSize];
- String hostName = OsUtil.getHostNameForLiunx();//获取hostName
- for (int i = 1; i <= 10; i++) {
- String queueName = String.format("%s.queue%d", hostName, i);
- connectionFactory().createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
- connectionFactory().createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
- queueNames[i - 1] = queueName;
- }
- return queueNames;
- }
-
- //创建监听器,监听队列
- @Bean
- public SimpleMessageListenerContainer mqMessageContainer(HandleService handleService) throws AmqpException, IOException {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueueNames(mqMsgQueues());
- container.setExposeListenerChannel(true);
- container.setPrefetchCount(prefetchCount);//设置每个消费者获取的最大的消息数量
- container.setConcurrentConsumers(concurrentConsumers);//消费者个数
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认
- container.setMessageListener(handleService);//监听处理类
- return container;
- }
-
- }
- @Service
- public class HandleService implements ChannelAwareMessageListener {
- private static final Logger logger = LoggerFactory.getLogger(HandleService.class);
-
- /**
- * @param
- * 1、处理成功,这种时候用basicAck确认消息;
- * 2、可重试的处理失败,这时候用basicNack将消息重新入列;
- * 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
- *
- * basicNack(long deliveryTag, boolean multiple, boolean requeue)
- * deliveryTag:该消息的index
- * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
- * requeue:被拒绝的是否重新入队列
- */
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- logger.info("接收到消息:" + new String(body));
- JSONObject jsonObject = null;
- try {
- jsonObject = JSONObject.parseObject(new String(body));
- if (消费成功) {
- logger.info("消息消费成功");
- channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功
- }else if(可重试的失败处理){
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
- } else { //消费失败
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- } catch (JSONException e) {
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
- logger.error("This message:" + jsonObject + " conversion JSON error ");
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。