赞
踩
1.rabbitmq的介绍与安装
介绍:
rabbitmq是一种消息中间件的实现,abbitmq 使用的是一种叫做 AMQP 的协议来通信,基于 erlang 语言开发。
消息交互过程:Rabbit 拿到消息之后,会先交给 交换机 (Exchange), 然后交换机再根据预先设定的不同绑定( Bindings )策略,来确定要发给哪个队列。
RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。
fanout 模式:fanout 模式就是广播模式,消息来了,会发给所有的队列。
Direct 模式:Direct 模式就是指定队列模式, 消息来了,只发给指定的 Queue, 其他Queue 都收不到。
Topic 模式:主题模式,注意这里的主题模式,和 ActivityMQ 里的不一样。 ActivityMQ 里的主题,更像是广播模式。那么这里的主题模式是什么意思呢? 如图所示消息来源有: 美国新闻,美国天气,欧洲新闻,欧洲天气。
如果你想看 美国主题: 那么就会收到 美国新闻,美国天气。
如果你想看 新闻主题: 那么就会收到 美国新闻,欧洲新闻。
如果你想看 天气主题: 那么就会收到 美国天气,欧洲天气。
如果你想看 欧洲主题: 那么就会收到 欧洲新闻,欧洲天气。
安装:
下载erlang,https://www.erlang.org/downloads
选择适合的操作系统下载如windows,下载完后进行安装,配置环境变量与其它语言一样,将下载好的sbin设置为自己path路径即可
cmd输入erl后如下即安装成功
安装rabbitmq,从官网下载对应操作系统的rabbitmq安装https://www.rabbitmq.com/download.html
进入到bin文件夹下,rabbitmq-plugins.bat enable rabbitmq_management即可配置成功
切换成管理员权限或者任务管理器中服务重启rabbitmq即成功启动
输入账号名/密码:guest/guest即可登录成功
2.RABBITMQ - FANOUT 模式代码(代码来源how2j学习网)
pom.xml 提供rabbitmq和hutoo的jar
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.how2j</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq</name> <description>rabbitmq</description> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.3.1</version> </dependency> </dependencies> </project>
RabbitMQUtil 判断服务启动
package cn.how2j; import javax.swing.JOptionPane; import cn.hutool.core.util.NetUtil; public class RabbitMQUtil { public static void main(String[] args) { checkServer(); } public static void checkServer() { if(NetUtil.isUsableLocalPort(15672)) { JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 "); System.exit(1); } } }
TestProducer 模拟发送消息
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消息生成者 */ public class TestProducer { public final static String EXCHANGE_NAME="fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtil.checkServer(); //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for (int i = 0; i < 100; i++) { String message = "direct 消息 " +i; //发送消息到队列中 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message); } //关闭通道和连接 channel.close(); connection.close(); } }
TestDriectCustomer模拟接受消息
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.hutool.core.util.RandomUtil; public class TestCustomer { public final static String EXCHANGE_NAME="fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //为当前消费者取随机名 String name = "consumer-"+ RandomUtil.randomString(5); //判断服务器是否启动 RabbitMQUtil.checkServer(); // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //交换机声明(参数为:交换机名称;交换机类型) channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println(name +" 等待接受消息"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(name + " 接收到消息 '" + message + "'"); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(queueName, true, consumer); } }
先启动接收消息的TestDriectCustomer,再启动发送消息,观察rabbitmq的变化
3.RABBITMQ - DIRECT 模式代码(更改如下两处即可)
发送消息TestDriectProducer
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消息生成者 */ public class TestDriectProducer { public final static String QUEUE_NAME="direct_queue"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtil.checkServer(); //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); for (int i = 0; i < 100; i++) { String message = "direct 消息 " +i; //发送消息到队列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("发送消息: " + message); } //关闭通道和连接 channel.close(); connection.close(); } }
接受消息TestDriectCustomer
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.hutool.core.util.RandomUtil; public class TestDriectCustomer { private final static String QUEUE_NAME = "direct_queue"; public static void main(String[] args) throws IOException, TimeoutException { //为当前消费者取随机名 String name = "consumer-"+ RandomUtil.randomString(5); //判断服务器是否启动 RabbitMQUtil.checkServer(); // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, true, null); System.out.println(name +" 等待接受消息"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(name + " 接收到消息 '" + message + "'"); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); } }
分别运行两个接受的客户,然后发送消息
4.RABBITMQ - TOPIC 模式代码
TestProducer(分别在 四个路由:“usa.news”, “usa.weather”, “europe.news”, “europe.weather” 上发布 “美国新闻”, “美国天气”, “欧洲新闻”, “欧洲天气”.)
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消息生成者 */ public class TestProducer { public final static String EXCHANGE_NAME="topics_exchange"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMQUtil.checkServer(); //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String[] routing_keys = new String[] { "usa.news", "usa.weather", "europe.news", "europe.weather" }; String[] messages = new String[] { "美国新闻", "美国天气", "欧洲新闻", "欧洲天气" }; for (int i = 0; i < routing_keys.length; i++) { String routingKey = routing_keys[i]; String message = messages[i]; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message .getBytes()); System.out.printf("发送消息到路由:%s, 内容是: %s%n ", routingKey,message); } //关闭通道和连接 channel.close(); connection.close(); } }
TestCustomer4USA(专门用于接受 usa.* 消息)
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.hutool.core.util.RandomUtil; public class TestCustomer4USA { public final static String EXCHANGE_NAME="topics_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //为当前消费者取名称 String name = "consumer-usa"; //判断服务器是否启动 RabbitMQUtil.checkServer(); // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //交换机声明(参数为:交换机名称;交换机类型) channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); //接受 USA 信息 channel.queueBind(queueName, EXCHANGE_NAME, "usa.*"); System.out.println(name +" 等待接受消息"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(name + " 接收到消息 '" + message + "'"); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(queueName, true, consumer); } }
TestCustomer4News(专门用于接受 *.news 消息)
package cn.how2j; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.hutool.core.util.RandomUtil; public class TestCustomer4News { public final static String EXCHANGE_NAME="topics_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //为当前消费者取名称 String name = "consumer-news"; //判断服务器是否启动 RabbitMQUtil.checkServer(); // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("localhost"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //交换机声明(参数为:交换机名称;交换机类型) channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); //接受 USA 信息 channel.queueBind(queueName, EXCHANGE_NAME, "*.news"); System.out.println(name +" 等待接受消息"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(name + " 接收到消息 '" + message + "'"); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(queueName, true, consumer); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。