赞
踩
如何使用MQ(java代码实现)
<!--指定jdk版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.7</version> </dependency> </dependencies>
生产者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /* 生产者,目标是发消息 */ public class Product { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //建立一个连接方式 ConnectionFactory factory = new ConnectionFactory(); //工厂IP 连接RabbitMQ的队列 factory.setHost("www.littlehei.fun"); //用户名 factory.setUsername("guest"); //密码 factory.setPassword("guest"); //创建链接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //生成一个队列 /* 生成队列 参数1,队列名称 参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中 参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费 参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除 参数5,其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发消息 String message = "hello world";//初次使用 /* 发送一个消费 参数1,发送到哪个交换机 参数2.路由的key值是哪个? 本次是队列的名称 参数3,其他参数配置 参数4,发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送成功"); } }
消费者:用来接受生成者产生的消息。
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /* 消费者,用来消费生成者产生的代码 */ public class Consumer { //队列的名称: private static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { //创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("www.littlehei.fun"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明 DeliverCallback deliverCallback = (consumerTag,message)->{ // String message = new String("自己手动去创建一个消息,但是不推荐"); System.out.println(new String(message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; /* 消费者接收消息 参数1,消费哪个队列 参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是 参数3,消费者未成功消费的回调 参数4,消费者取录消费的回到 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
生成者 —大量发消息-- 队列 – 接到消息–工作线程1或者工作线程2.。。。
轮训处理消息,你一个,我下一个,他下下个。
不同工作线程之间的关系是竞争关系
创建链接工具类:
public class GetConnection {
//建立一个工具类,每次都直接使用,减少代码重复量
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip地址");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
轮训分发代码:
/* 一个工作线程,可以多个创建,可以多线程创建,具体看你自己如何定于 */ public class WorkThread1 { //首先还是创建一个队列名称 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); /* 消费者接收消息 参数1,消费哪个队列 参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是 参数3,消费者未成功消费的回调 参数4,消费者取录消费的回到 */ DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收到的消息为" + new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息被取消消费接口回调"); }; System.out.println("C1等待接收消息..."); //消息接收 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
生产者:
/* 生产者,可以发送大量消息 */ public class Product1 { //队列名称 public static final String QUEUE_NAEM="hello"; //发送大量消息 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); //队列声明 channel.queueDeclare(QUEUE_NAEM,false,false,false,null); //发送消息 //从控制台中输入 Scanner sc = new Scanner(System.in); //判断是否有下一个消息输入 while (sc.hasNext()){ String name = sc.next(); channel.basicPublish("",QUEUE_NAEM,null,name.getBytes()); System.out.println("发送完成: "+name); } } }
----------结果---------- nihap 发送完成nihap wp1 发送完成wp1 ci1 发送完成ci1 wooda 发送完成wooda 我喜欢你 发送完成我喜欢你 ------------------ C1等待接收消息... 接收到的消息为nihap 接收到的消息为ci1 接收到的消息为我喜欢你 -------------------- C2等待接收消息... 接收到的消息为wp1 接收到的消息为wooda
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。