赞
踩
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.orchids</groupId> <artifactId>Rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version> </dependency> </dependencies> </project>
package com.orchids.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ Author qwh * @ Date 2024/6/5 16:05 */ public class ConnectionUtil { public static final String HOST_ADDRESS = "192.168.70.145"; public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setHost(HOST_ADDRESS); // 端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection con = ConnectionUtil.getConnection(); // amqp://guest@192.168.200.100:5672/ System.out.println(con); con.close(); } }
package com.orchids.rabbitmq.demo; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2. 设置参数 factory.setHost("192.168.70.145"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); // 3. 创建连接 Connection Connection connection = factory.newConnection(); // 4. 创建Channel Channel channel = connection.createChannel(); // 5. 创建队列 // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 // 参数1. queue:队列名称 // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在 // 参数3. exclusive:是否独占。 // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 // 参数5. arguments:其它参数。 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 回调方法,当收到消息后,会自动执行该方法 // 参数1. consumerTag:标识 // 参数2. envelope:获取一些信息,交换机,路由key... // 参数3. properties:配置信息 // 参数4. body:数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; // 参数1. queue:队列名称 // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 // 参数3. callback:回调对象 // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer); } }
package com.orchids.rabbitmq.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置主机地址 connectionFactory.setHost("192.168.70.145"); // 设置连接端口号:默认为 5672 connectionFactory.setPort(5672); // 虚拟主机名称:默认为 / connectionFactory.setVirtualHost("/"); // 设置连接用户名;默认为guest connectionFactory.setUsername("guest"); // 设置连接密码;默认为guest connectionFactory.setPassword("123456"); // 创建连接 Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 // queue 参数1:队列名称 // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在 // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除 // arguments 参数5:队列其它参数 channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "你好;小兔子!"; // 参数1:交换机名称,如果没有指定则使用默认Default Exchange // 参数2:路由key,简单模式可以传递队列名称 // 参数3:配置信息 // 参数4:消息内容 channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close(); } }
常见的三种交换机
package com.orchids.rabbitmq.fanout; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
package com.orchids.rabbitmq.fanout; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
package com.orchids.rabbitmq.fanout; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtil.getConnection(); // 2、创建频道 Channel channel = connection.createChannel(); // 参数1. exchange:交换机名称 // 参数2. type:交换机类型 // DIRECT("direct"):定向 // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 // TOPIC("topic"):通配符的方式 // HEADERS("headers"):参数匹配 // 参数3. durable:是否持久化 // 参数4. autoDelete:自动删除 // 参数5. internal:内部使用。一般false // 参数6. arguments:其它参数 String exchangeName = "test_fanout"; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout,routingKey设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }
package com.orchids.rabbitmq.dircet; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
package com.orchids.rabbitmq.dircet; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
package com.orchids.rabbitmq.dircet; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct"; // 创建交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); // 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 声明(创建)队列 channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name,exchangeName,"error"); // 队列2绑定info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String message = "日志信息:张三调用了delete方法.错误了,日志级别warning"; // 发送消息 channel.basicPublish(exchangeName,"warning",null,message.getBytes()); System.out.println(message); // 释放资源 channel.close(); connection.close(); } }
package com.orchids.rabbitmq.topics; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
package com.orchids.rabbitmq.topics; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
package com.orchids.rabbitmq.topics; import com.orchids.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为"" // routing key 常用格式:系统的名称.日志的级别。 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); // 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]"; channel.basicPublish(exchangeName,"goods.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]"; channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); channel.close(); connection.close(); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <groupId>com.orchids</groupId> <artifactId>springrabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <modules> <module>producer</module> <module>consumer</module> <module>comfirm-producer</module> <module>comfire-consumer</module> </modules> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.orchids</groupId> <artifactId>springrabbitmq</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>producer</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
spring:
rabbitmq:
host: 192.168.70.145
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
package com.orchids; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * @ Author qwh * @ Date 2024/6/5 19:17 */ @SpringBootTest public class ProducerTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; @Autowired private RabbitTemplate rabbitTemplate; @Test public void SendMessage(){ rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello world RabbitMQ"); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.orchids</groupId> <artifactId>springrabbitmq</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>consumer</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
package com.orchids.rabbit.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @ Author qwh * @ Date 2024/6/5 19:05 */ @Component @Slf4j public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_NAME,durable = "true"), exchange = @Exchange(value = EXCHANGE_DIRECT,type = "direct"), key = {ROUTING_KEY} )) public void processMessage(String dateString, Message message, Channel channel){ log.info("消息信息"+dateString); } }
package com.orchids;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @ Author qwh
* @ Date 2024/6/5 19:02
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。