当前位置:   article > 正文

RabbitMQ amqp_client的使用_amqp-client

amqp-client

RabbitMQ使用多种协议。本教程使用AMQP 0-9-1,这是一个开放的通用消息传递协议。RabbitMQ有许多不同语言的客户端。我们将使用RabbitMQ提供的Java客户端。

  • 本文以RabbitMQ的第一种最简单的模式为例

一、创建空项目,并创建RabbitMQ-product,RabbitMQ-comsumer两个子项目。

  • 创建后项目结构如下:
    在这里插入图片描述

二、导入对应依赖

		<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

三、创建生产者

1、创建连接工厂并设置参数

		ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("124.220.82.113");
        connectionFactory.setPort(5672);
  • 1
  • 2
  • 3

2、创建连接和通道

try(Connection connection = connectionFactory.newConnection()) {
            Channel channel = connection.createChannel();
  • 1
  • 2

3、创建队列

  • 接下来,我们创建一个通道,这是大多数用于完成任务的API所在的地方。注意,我们可以使用try-with-resources语句,因为Connection和Channel都实现了java. lang. AutoClosable。这样我们就不需要在代码中显式地关闭它们。
  • 要发送,我们必须声明一个队列供我们发送到;然后我们可以发布一条消息到队列,所有这些都在try-with-resources语句中:
/*
            作用:声明队列
            参数:
            queue – 队列的名称
            durable – 如果我们声明一个持久队列,则为 true(持久到数据库中)
            exclusive – 如果我们声明一个排他性队列(只允许一个消费者监听这个队列),则为 true。
            autoDelete – 如果我们声明一个自动删除队列,则为 true(当没有消费者时自动删除)
            arguments – 队列的其他属性(构造参数)
            返回:
            一种声明-确认方法,用于指示队列已成功声明
            抛出:
            IOException – 如果遇到错误
            */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 声明一个队列是幂等的–只有当它不存在时才会被创建。消息内容是一个字节数组,因此您可以在那里编码任何您喜欢的内容。

4、发送消息

			String message = "hello damage!";
            /*
            作用:发布消息
            参数:
            exchange - 将消息发布到的交换机,交换模式下默认""
            routingKey - 路由密匙
            props - 消息的其他属性-路游表头等
            body - 消息正文
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5、在管控台中的QUEUES->Get Messages可以看到发送的消息

在这里插入图片描述
可以看到有一条消息,使用了默认的交换机,路游key值为hello,消息体为"hello damage!"

四、创建消费者

1、创建连接工厂并设置参数

		ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("124.220.82.113");
        connectionFactory.setPort(5672);
  • 1
  • 2
  • 3

2、创建连接和通道

try(Connection connection = connectionFactory.newConnection()) {
            Channel channel = connection.createChannel();
  • 1
  • 2
  • 注意这里并不需要使用try-catch-resources 来关闭链接,通过这样做,我们将简单地使程序继续前进,关闭所有内容,然后退出!这会很尴尬,因为我们希望进程在消费者异步侦听消息到达时保持活动状态。

3、创建队列

channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  • 1
  • 这里也还是需要创建队列,因为我们可能会在发布者之前启动消费者,所以我们希望在尝试消费队列中的消息之前确保队列存在。第一个参数如果队列名不存在会创建队列,如果队列存在则不进行操作,具有幂等性。

4、声明回调方法

  • 我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
  • 我们要告诉服务器把队列中的消息传递给我们。因为它会异步地推送消息,所以我们提供了一个对象形式的回调函数,该回调函数将缓冲消息,直到我们准备好使用它们。这就是DeliverCallback子类所做的。
Consumer consumer = new DefaultConsumer(channel){
            /*
            回调方法,收到消息后自动执行该方法
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag : " + consumerTag);
                System.out.println("envelope getExchange: " + envelope.getExchange());
                System.out.println("envelope getRoutingKey: " + envelope.getRoutingKey());
                System.out.println("envelope getDeliveryTag: " + envelope.getDeliveryTag());
                System.out.println("properties : " + properties);
                System.out.println("body : " + Arrays.toString(body));
            }
        };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

5、接受消息

channel.basicConsume(QUEUE_NAME,true,consumer);
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/956435
推荐阅读
相关标签
  

闽ICP备14008679号