赞
踩
因为RabbitMQ是使用Erlang开发的,所以要使用RabbitMQ-Server首先就需要安装Erlang的运行环境,可以在官网下载安装,安装后需要配置环境变量。
配置示例:
ERLANG_HOME:D:\software\erl10.1
Path末尾拼接:%ERLANG_HOME%\bin;
可在官网自行下载安装,安装后需要配置环境变量。
配置示例:
RABBITMQ_SERVER:D:\software\RabbitMQ Server\rabbitmq_server-3.7.9
Path末尾拼接:%RABBITMQ_SERVER%\sbin;
配置完成后便可启动RabbitMQ-Server,初始用户名与密码是guest:guest。
其他具体的配置及命令,不详述,请自行查找。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; /** * rabbitmq服务提供类 * @author dr */ @Component public class RabbitMQProvider { private Logger logger = LoggerFactory.getLogger(this.getClass()); /**是否启用RabbitMQ*/ @Value("${rabbitmq.setting.enable}") private boolean enable; /**交换器名*/ @Value("${rabbitmq.setting.exchange.name}") private String exchangeName; /**下行队列名*/ @Value("down.${push.setting.school}") private String downQueueName; @Value("down.${push.setting.school}") private String downRoutingKey; @Value("up.${push.setting.school}") private String upQueueName; @Value("up.${push.setting.school}") private String upRoutingKey; @Value("${rabbitmq.setting.username}") private String username; @Value("${rabbitmq.setting.password}") private String password; @Value("${rabbitmq.setting.host}") private String host; @Value("${rabbitmq.setting.port}") private Integer port; // 是否自动应答 private boolean autoAck = true; private Channel channelDown; private Channel channelUp; private Connection upConnection; private Connection downConnection; @Value("${rabbitmq.setting.message-life-time}") private Integer messageLifeTime; /**处理接收到的消息的处理实例,即我们的业务代码*/ @Autowired private MQDealer mqDealer; // 初始化 @PostConstruct private void init() { if (!enable) return; try { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setHost(host); factory.setPort(port); // 队列参数 Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", messageLifeTime*1000);// 消息过期时间 // 创建上行连接 upConnection = factory.newConnection(); // 创建上行通道 channelUp = upConnection.createChannel(); // 声明创建配置上行队列 channelUp.queueDeclare(upQueueName, true, false, false, args); // 将队列与交换器绑定,并设置路由码 channelUp.queueBind(upQueueName, exchangeName, upRoutingKey); downConnection = factory.newConnection(); channelDown = downConnection.createChannel(); channelDown.queueDeclare(downQueueName, true, false, false, args); channelDown.queueBind(downQueueName, exchangeName, downRoutingKey); receiveMessage(); } catch (Exception e) { logger.error("启动MQ下行通道时出现异常!", e); } } /** * 持续监听队列以接收数据 * @throws IOException * @throws TimeoutException */ private void receiveMessage() throws IOException, TimeoutException { // 每次缓存5个消息在本地 channelDown.basicQos(5); channelDown.basicConsume(downQueueName, autoAck, "myConsumerTag", new DefaultConsumer(channelDown) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); logger.debug( downQueueName + " Received '" + message + "'" + ", routingKey: " + envelope.getRoutingKey()); // 处理接收到的消息 mqDealer.deal(message); // 持续监听 channelDown.basicConsume(downQueueName, autoAck, "myConsumerTag", this); channelDown.basicAck(envelope.getDeliveryTag(), true); } }); } /** * 向上行消息队列发送一条消息 * @param message * @throws IOException * @throws TimeoutException */ public void sendMessage(String message) throws IOException, TimeoutException { channelUp.basicPublish(exchangeName, upRoutingKey, true, MessageProperties.TEXT_PLAIN, message.getBytes()); logger.debug("send message to " + upQueueName + ": " + message); } }
解析:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。