赞
踩
传统HTTP协议调用接口的缺陷?
答:传统的HTTP协议采用同步的请求方式,如果服务器响应延迟可能会导致客户端一直转圈等待,这时会影响到客户端的体验。
解决:可以采用多线程优化提高服务器端响应速度,缺点:对CPU的性能不是很好,因为频繁的创建线程;就算使用线程池,在高并发的情况下,如果超出了线程池核心数还是会等待。开启了多线程默认情况下是没有返回结果。
为什么使用消息中间件?
答:流量消峰(解决高并发)、异步通讯、解耦、解决分布式问题、自动重试和补偿
生产者:向消息中间件投递消息
消费者:向消息中间件获取消息
服务端:broker消息中间件的server,主要负责存储和转发消息
message:发送消息的内容,发送的参数信息
messageid:全局的消息唯一id,解决消息的重复消费问题解决
历史悠久的开源项目,是Apache下的一个子项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,不够轻巧(源代码比RocketMQ多),支持持久化到数据库,对队列数较多的情况支持不好。
结合erlang语言本身的并发优势,支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。
阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。
Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,具有以下特性:高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;高堆积:支持topic下消费者较长时间离线,消息堆积量大;
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。
RabitMQ官方网站:https://www.rabbitmq.com/
常见概念:
rabbitmq安装:我这里属于学习,所以使用docker安装,其他方式的安装下载对于的安装程序即可。
安装命令:
第一步、执行run命令
docker run -d \
-v /opt/rabbitmq/data:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \
--hostname myRabbit rabbitmq
第二步、执行启动命令
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第三步、解决页面UI问题
#进入rabbitmq容器
docker exec -it rabbitmq /bin/bash
#进入容器后,cd到以下路径
cd /etc/rabbitmq/conf.d/
#修改 management_agent.disable_metrics_collector = false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
#退出容器
exit
#重启rabbitmq容器
docker restart rabbitmq
安装成功后:请求路径:http://www.kaicostudy.com:15672/
Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
答:RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672: rabbitmq消息中间内部通讯的端口
默认的端口号25672 rabbitmq集群的端口号
需要给用户设置权限,设置Virtual host的权限,还有对应的角色。
登录失败问题:没有给账号设置 Tags。
(1) 超级管理员(administrator)
超级管理员,可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作,因为是超级管理员,可以这样理解,它可以为所欲为,什么操作都能干,删除用户、修改用户密码、重置用户角色、策略制定等等。
(2) 监控者(monitoring)
监控者,可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。与administrator的对比,administrator 能看到这些内容
(4) 普通管理者(management)
普通管理者,仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
(5) 模拟者(Impersonator)
无法登陆管理控制台,通常就是普通的生产者和消费者。
(6) None
其他用户,无法登陆管理控制台,通常就是普通的生产者和消费者。
用户拥有的权限不同,对应的所能操作的范围也就不同。用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。
例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。
队列类型:
需要给对应的用户授权该队列权限。
1、连接工具类
需要配置连接地址,ip,账号密码,队列名称,注意:账号需要有对应的VirtualHost权限、队列名称权限。
public class RabbitMQConnection { //获取rabbitmq连接 public static Connection getConnection() throws IOException, TimeoutException { // 1.创建连接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2.设置连接地址 connectionFactory.setHost("www.kaicostudy.com"); // 3.设置端口号: connectionFactory.setPort(5672); // 4.设置账号和密码 connectionFactory.setUsername("kaico"); connectionFactory.setPassword("kaico"); // 5.设置VirtualHost connectionFactory.setVirtualHost("/kaicoStudy"); return connectionFactory.newConnection(); } }
2、生产者
public class Product { private static final String QUEUE_NAME = "kaico"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接 Connection connection = RabbitMQConnection.getConnection(); // 2.设置通道 Channel channel = connection.createChannel(); // 3.设置消息 String msg = "kaico开始学习RabbitMQ了111"; System.out.println("msg:" + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.close(); connection.close(); } }
3、消费者
public class Consumer { private static final String QUEUE_NAME = "kaico"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接 Connection connection = RabbitMQConnection.getConnection(); // 2.设置通道 Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msg); } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //可移除关闭的代码 channel.close(); connection.close(); } }
该模式多个消费者的话,默认是轮询算法进行消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。