当前位置:   article > 正文

java 使用RabbitMQ_java使用rabbitmq

java使用rabbitmq

简单模式:生产者发布Publish消息到队列,消费者从队列消费Consume消息

生产者代码:

  1. package com.example.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class SimpleProvider {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建链接工厂对象
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. //设置RabbitMQ服务主机地址,默认localhost
  12. connectionFactory.setHost("localhost");
  13. //设置端口,默认5672
  14. connectionFactory.setPort(5672);
  15. //设置虚拟主机名,默认/
  16. connectionFactory.setVirtualHost("/mytest");
  17. //设置用户名密码
  18. connectionFactory.setUsername("test");
  19. connectionFactory.setPassword("123456");
  20. //创建链接
  21. Connection connection = connectionFactory.newConnection();
  22. //创建通道
  23. Channel channel = connection.createChannel();
  24. //声明队列
  25. //参数1:消息队列名称
  26. //参数2,是否持久化
  27. //参数3,是否独占本次connection链接
  28. //参数4,是否自动删除消息
  29. //参数5,是否需要传递额外的队列数据Map
  30. channel.queueDeclare("simple_queue1",true,false,false,null);
  31. //创建消息
  32. String msg = "hello";
  33. //发布消息
  34. //参数1,使用指定的交换机,不设置则使用默认交换机
  35. //参数2,指定路由器key,如果是简单模式直接给队列名就行
  36. //参数3,发生消息时是否需要额外的消息数据
  37. //参数4,消息内容
  38. channel.basicPublish("","simple_queue1",null,msg.getBytes());
  39. //关闭资源
  40. channel.close();
  41. connection.close();
  42. }
  43. }

 消费者代码

  1. package com.example.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class SimpleConsumer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //创建链接工厂对象
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. //设置RabbitMQ服务主机地址,默认localhost
  10. connectionFactory.setHost("localhost");
  11. //设置端口,默认5672
  12. connectionFactory.setPort(5672);
  13. //设置虚拟主机名,默认/
  14. connectionFactory.setVirtualHost("/mytest");
  15. //设置用户名密码
  16. connectionFactory.setUsername("test");
  17. connectionFactory.setPassword("123456");
  18. //创建链接
  19. Connection connection = connectionFactory.newConnection();
  20. //创建频道
  21. Channel channel = connection.createChannel();
  22. //声明队列
  23. //参数1:消息队列名称
  24. //参数2,是否持久化
  25. //参数3,是否独占本次connection链接
  26. //参数4,是否自动删除消息
  27. //参数5,是否需要传递额外的队列数据Map
  28. channel.queueDeclare("simple_queue1",true,false,false,null);
  29. //创建消费者,并且设置消息处理
  30. //指定频道
  31. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  32. /**
  33. *
  34. * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
  35. * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
  36. * @param properties 另外的数据所封装的属性对象 null
  37. * @param body 消息本身
  38. * @throws IOException
  39. */
  40. @Override
  41. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. //实现消费的业务逻辑
  43. System.out.println("consumerTag"+consumerTag);
  44. System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
  45. System.out.println("消息本身:"+new String(body,"UTF-8"));
  46. }
  47. };
  48. //监听消息
  49. //参数1,指定要监听的队列的名称
  50. //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
  51. //参数3
  52. channel.basicConsume("simple_queue1",true,defaultConsumer);
  53. //关闭资源,建议不关闭一直监听
  54. }
  55. }


 工作模式:多个消费者消费多个不同的消息,创建多个消费者即可


订阅-广播模式:同一个消息被多个消费者消费。上面两个模式只有三个角色分别是生产者,队列,消费者。而在订阅模式中多了一个角色,那就是exchange交换机角色。

订阅模式大概过程是:生产者生产消息后,发送给交换机,然后由交换机转发给各个队列,在这之前队列需要和交换机进行绑定。     .

 生产者代码

  1. package com.example.fanout;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class FanoutProvider {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //创建链接工厂对象
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. //设置RabbitMQ服务主机地址,默认localhost
  13. connectionFactory.setHost("localhost");
  14. //设置端口,默认5672
  15. connectionFactory.setPort(5672);
  16. //设置虚拟主机名,默认/
  17. connectionFactory.setVirtualHost("/mytest");
  18. //设置用户名密码
  19. connectionFactory.setUsername("test");
  20. connectionFactory.setPassword("123456");
  21. //创建链接
  22. Connection connection = connectionFactory.newConnection();
  23. //创建通道
  24. Channel channel = connection.createChannel();
  25. //声明队列
  26. //参数1:消息队列名称
  27. //参数2,是否持久化
  28. //参数3,是否独占本次connection链接
  29. //参数4,是否自动删除消息
  30. //参数5,是否需要传递额外的队列数据Map
  31. channel.queueDeclare("fanout_queue1",true,false,false,null);
  32. //声明两个队列
  33. channel.queueDeclare("fanout_queue2",true,false,false,null);
  34. //声明交换机
  35. //参数1 交换机名称
  36. //参数2,指定交换机的类型 BuiltinExchangeType.FANOUT(广播)
  37. channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
  38. //将队列绑定到指定的交换机
  39. //参数1,指定要绑定的队列
  40. //参数2,指定绑定的交换机
  41. //参数3,指定routingkey,广播模式可以不写
  42. channel.queueBind("fanout_queue1","fanout_exchange","");
  43. channel.queueBind("fanout_queue2","fanout_exchange","");
  44. //创建消息
  45. String msg = "hello";
  46. //发布消息
  47. //参数1,使用指定的交换机,不设置则使用默认交换机
  48. //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
  49. //参数3,发生消息时是否需要额外的消息数据
  50. //参数4,消息内容
  51. channel.basicPublish("fanout_exchange","",null,msg.getBytes());
  52. //关闭资源
  53. channel.close();
  54. connection.close();
  55. }
  56. }

消费者1

  1. package com.example.fanout;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class FanoutConsumer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //创建链接工厂对象
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. //设置RabbitMQ服务主机地址,默认localhost
  10. connectionFactory.setHost("localhost");
  11. //设置端口,默认5672
  12. connectionFactory.setPort(5672);
  13. //设置虚拟主机名,默认/
  14. connectionFactory.setVirtualHost("/mytest");
  15. //设置用户名密码
  16. connectionFactory.setUsername("test");
  17. connectionFactory.setPassword("123456");
  18. //创建链接
  19. Connection connection = connectionFactory.newConnection();
  20. //创建频道
  21. Channel channel = connection.createChannel();
  22. //声明队列
  23. //参数1:消息队列名称
  24. //参数2,是否持久化
  25. //参数3,是否独占本次connection链接
  26. //参数4,是否自动删除消息
  27. //参数5,是否需要传递额外的队列数据Map
  28. channel.queueDeclare("fanout_queue1",true,false,false,null);
  29. //创建消费者,并且设置消息处理
  30. //指定频道
  31. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  32. /**
  33. *
  34. * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
  35. * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
  36. * @param properties 另外的数据所封装的属性对象 null
  37. * @param body 消息本身
  38. * @throws IOException
  39. */
  40. @Override
  41. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. //实现消费的业务逻辑
  43. System.out.println("consumerTag"+consumerTag);
  44. System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
  45. System.out.println("消息本身:"+new String(body,"UTF-8"));
  46. }
  47. };
  48. //监听消息
  49. //参数1,指定要监听的队列的名称
  50. //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
  51. //参数3
  52. channel.basicConsume("fanout_queue1",true,defaultConsumer);
  53. //关闭资源,建议不关闭一直监听
  54. }
  55. }

消费者2

  1. package com.example.fanout;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class FanoutConsumer2 {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //创建链接工厂对象
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. //设置RabbitMQ服务主机地址,默认localhost
  10. connectionFactory.setHost("localhost");
  11. //设置端口,默认5672
  12. connectionFactory.setPort(5672);
  13. //设置虚拟主机名,默认/
  14. connectionFactory.setVirtualHost("/mytest");
  15. //设置用户名密码
  16. connectionFactory.setUsername("test");
  17. connectionFactory.setPassword("123456");
  18. //创建链接
  19. Connection connection = connectionFactory.newConnection();
  20. //创建频道
  21. Channel channel = connection.createChannel();
  22. //声明队列
  23. //参数1:消息队列名称
  24. //参数2,是否持久化
  25. //参数3,是否独占本次connection链接
  26. //参数4,是否自动删除消息
  27. //参数5,是否需要传递额外的队列数据Map
  28. channel.queueDeclare("fanout_queue2",true,false,false,null);
  29. //创建消费者,并且设置消息处理
  30. //指定频道
  31. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  32. /**
  33. *
  34. * @param consumerTag 消费者名称,如果之前没有指定,则会自动生成一个
  35. * @param envelope 消息一些额外的参数 channel.basicPublish("","simple_queue1",null,msg.getBytes());
  36. * @param properties 另外的数据所封装的属性对象 null
  37. * @param body 消息本身
  38. * @throws IOException
  39. */
  40. @Override
  41. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. //实现消费的业务逻辑
  43. System.out.println("consumerTag"+consumerTag);
  44. System.out.println("交换机:"+envelope.getExchange()+"----RoutingKey:"+envelope.getRoutingKey()+"----DeliveryTag:"+envelope.getDeliveryTag());
  45. System.out.println("消息本身:"+new String(body,"UTF-8"));
  46. }
  47. };
  48. //监听消息
  49. //参数1,指定要监听的队列的名称
  50. //参数2,设置消息的应答模式,true自动应答(性能好,消息有可能丢失),false手动应答
  51. //参数3
  52. channel.basicConsume("fanout_queue2",true,defaultConsumer);
  53. //关闭资源,建议不关闭一直监听
  54. }
  55. }

订阅-Routing路由模式

生产者代码

  1. package com.example.routingkey;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.concurrent.TimeoutException;
  9. public class RoutingKeyProvider {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //创建链接工厂对象
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. //设置RabbitMQ服务主机地址,默认localhost
  14. connectionFactory.setHost("localhost");
  15. //设置端口,默认5672
  16. connectionFactory.setPort(5672);
  17. //设置虚拟主机名,默认/
  18. connectionFactory.setVirtualHost("/mytest");
  19. //设置用户名密码
  20. connectionFactory.setUsername("test");
  21. connectionFactory.setPassword("123456");
  22. //创建链接
  23. Connection connection = connectionFactory.newConnection();
  24. //创建通道
  25. Channel channel = connection.createChannel();
  26. //声明队列
  27. //参数1:消息队列名称
  28. //参数2,是否持久化
  29. //参数3,是否独占本次connection链接
  30. //参数4,是否自动删除消息
  31. //参数5,是否需要传递额外的队列数据Map
  32. channel.queueDeclare("routingkey_queue1",true,false,false,null);
  33. //声明两个队列
  34. channel.queueDeclare("routingkey_queue2",true,false,false,null);
  35. //声明交换机
  36. //参数1 交换机名称
  37. //参数2,指定交换机的类型 BuiltinExchangeType.FANOUT(广播) BuiltinExchangeType.DIRECT(路由模式)
  38. channel.exchangeDeclare("routingkey_exchange", BuiltinExchangeType.DIRECT);
  39. //将队列绑定到指定的交换机
  40. //参数1,指定要绑定的队列
  41. //参数2,指定绑定的交换机
  42. //参数3,指定routingkey,广播模式可以不写, routingkey模式要写
  43. channel.queueBind("routingkey_queue1","routingkey_exchange","item.insert");
  44. channel.queueBind("routingkey_queue2","routingkey_exchange","item.delete");
  45. //创建消息
  46. String msg = "增加操作";
  47. //发布消息
  48. //参数1,使用指定的交换机,不设置则使用默认交换机
  49. //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
  50. //参数3,发生消息时是否需要额外的消息数据
  51. //参数4,消息内容
  52. channel.basicPublish("routingkey_exchange","item.insert",null,msg.getBytes(StandardCharsets.UTF_8));
  53. String msg2 = "删除操作";
  54. //发布消息
  55. //参数1,使用指定的交换机,不设置则使用默认交换机
  56. //参数2,指定路由器key,如果是简单模式直接给队列名就行,广播模式指定为空
  57. //参数3,发生消息时是否需要额外的消息数据
  58. //参数4,消息内容
  59. channel.basicPublish("routingkey_exchange","item.delete",null,msg2.getBytes(StandardCharsets.UTF_8));
  60. //关闭资源
  61. channel.close();
  62. connection.close();
  63. }
  64. }

消费者代码还跟上面一样,改一下对应的队列名就行


订阅-Topics通配符模式(主题模式)

RoutingKey一般是由一个或者多个单词组成,多个单词之间以.分割。例如:item.insert

通配符规则:

#:匹配一个或者多个词

*:匹配不多不少一个词

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/384400
推荐阅读
相关标签
  

闽ICP备14008679号