赞
踩
直接安装在Linux上
启动服务
systemctl start rabbitmq-server
查看服务状态
systemctl status rabbitmq-server
停止服务
systemctl stop rabbitmq-server
开机启动服务
systemctl enable rabbitmq-server
参考地址:https://www.erlang-solutions.com/downloads/
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpmrpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v
安装socat
yum install -y socat
(1)yum 包更新到最新 yum update (2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的 yum install -y yum-utils device-mapper-persistent-data lvm2 (3)设置yum源为阿里云 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo (4)安装docker yum install docker-ce -y (5)安装后查看docker版本 docker -v (6) 安装加速镜像 sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker
docker相关命令
# 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
启动安装在docker中的RabbitMQ
docker ps -a
查看所有的docker容器
docker start CONTAINER ID
CONTAINER ID是在docker ps -a
中显示出的容器的CONTAINER ID
docker ps
查看已启动的容器
docker ps
#进入容器
docker exec -it d2dd40da7056 /bin/bash
添加/配置用户test 设置密码为test
rabbitmqctl add_user test test
如果不是通过docker安装rabbitmq,则需要在rabbitmq的sbin目录下执行./rabbitmqctl add_user test test
设置用户权限
#设置admin为administrator级别
rabbitmqctl set_user_tags test administrator
参考文档:http://www.rabbitmq.com/getstarted.html
简单模式就是生产者将消息发到队列,消费者从队列中取消息,一条消息对应一个消费者
一个队列只有一个消费者
Work模式就是一条消息可以被多个消费者尝试接收,但是最终只能有一个消费者能获取
一条消息可以被多个消费者同时获取
生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机
当发送消息后所有注册了队列的消费者都可以收到消息
生产者将消息发送到了type为direct模式的交换机
消费者的队列在将自己绑定到路由的时候会给自己绑定一个key
只有生产者发送对应key格式的消息时,相应队列才会收到消息
进入交换机ex1
在模式为fanout的交换机ex1上绑定两个队列queue1,queue2
RabbitMQ消息队列模式
创建Maven 项目
添加RabbitMQ连接需要的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
创建日志配置文件 log4j.properties
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
log4j.logger.org.mybatis = DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
创建MQ连接帮助类
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password) factory.setHost("47.113.192.192"); factory.setPort(5672); factory.setVirtualHost("host1"); factory.setUsername("admin"); factory.setPassword("admin"); //3.通过工厂对象获取与MQ的连接 Connection connection = factory.newConnection(); return connection; } }
消息生产者发送消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { String msg = "Hello RabbitMQ"; Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC的statement //定义队列(在Java代码中新建一个MQ队列) //参数1:定义的队列名 //参数2:队列中的数据是否选择持久化 //参数3:是否排外(当前队列是否为当前连接所私有) //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)) //参数5:设置当前队列的参数 //channel.queueDeclare("queue7",false,false,false,null); //参数1:交换机名称(此处为简单模式,没有交换机) //参数2:目标队列名称 //参数3:设置当前这条消息的属性(如设置过期时间) //参数4:消息的内容 channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("发送" + msg); channel.close(); connection.close(); } }
消息消费者接收消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("接收的数据是:" + msg); } }; channel.basicConsume("queue1",true,consumer); } }
消息生产者发送消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC的statement //定义队列(在Java代码中新建一个MQ队列) //参数1:定义的队列名 //参数2:队列中的数据是否选择持久化 //参数3:是否排外(当前队列是否为当前连接所私有) //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)) //参数5:设置当前队列的参数 //channel.queueDeclare("queue7",false,false,false,null); //参数1:交换机名称(此处为工作模式,没有交换机) //参数2:目标队列名称 //参数3:设置当前这条消息的属性(如设置过期时间) //参数4:消息的内容 channel.basicPublish("","queue2",null,msg.getBytes()); System.out.println("发送" + msg); channel.close(); connection.close(); } } }
消息消费者接收消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer1接收的数据是:" + msg); } }; channel.basicConsume("queue2",true,consumer); } }
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer2接收的数据是:" + msg); } }; channel.basicConsume("queue2",true,consumer); } }
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC的statement //参数1:交换机名称 //参数2:目标队列名称 //参数3:设置当前这条消息的属性(如设置过期时间) //参数4:消息的内容 channel.basicPublish("ex1","",null,msg.getBytes()); System.out.println("发送" + msg); channel.close(); connection.close(); } } }
消息消费者接收消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer1接收的数据是:" + msg); } }; channel.basicConsume("queue3",true,consumer); } }
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer2接收的数据是:" + msg); } }; channel.basicConsume("queue4",true,consumer); } }
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //相当于JDBC中的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC的statement //参数1:交换机名称 //参数2:目标队列名称,但由于这里有交换机,因此该参数为Key //参数3:设置当前这条消息的属性(如设置过期时间) //参数4:消息的内容 if(msg.startsWith("a")){ channel.basicPublish("ex2","a" ,null,msg.getBytes()); }else if(msg.startsWith("b")){ channel.basicPublish("ex2","b" ,null,msg.getBytes()); } System.out.println("发送" + msg); channel.close(); connection.close(); } } }
消息消费者接收消息
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer1接收的数据是:" + msg); } }; channel.basicConsume("queue5",true,consumer); } }
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("consumer2接收的数据是:" + msg); } }; channel.basicConsume("queue6",true,consumer); } }
SpringBoot可以完成自动配置和依赖注入,通过Spring直接获取RabbitMQ的连接对象
相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
配置applicaiton.yml
server:
port: 9001
spring:
application:
name: producer
rabbitmq:
host: 47.113.192.192
port: 5672
virtual-host: host1
username: admin
password: admin
生产者发送消息
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void senMsg(String msg){ //1.发送消息到队列 amqpTemplate.convertAndSend("queue1",msg); //2.发送消息到交换机(订阅交换机) amqpTemplate.convertAndSend("ex1","",msg); //3.发送消息到交换机(路由交换机) amqpTemplate.convertAndSend("ex2","a",msg); amqpTemplate.convertAndSend("ex2","b",msg); } }
添加依赖
配置yml
server:
port: 9002
spring:
application:
name: producer
rabbitmq:
host: 47.113.192.192
port: 5672
virtual-host: host1
username: admin
password: admin
消费者接收消息
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(queues = {"queue1"})
public class ReceiveMsgService {
@RabbitHandler
public void receiveMsg(String msg){
System.out.println("接收msg:" + msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。