当前位置:   article > 正文

RabbitMQ练习(Work Queues)

RabbitMQ练习(Work Queues)

1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》

确保RabbitMQ、Sender、Receiver容器正常安装和启动:

  1. root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
  2. (新开一个终端窗口:)
  3. root@k0test1:~# docker start sender
  4. sender
  5. root@k0test1:~# docker start receiver
  6. receiver
  7. root@k0test1:~#
  8. root@k0test1:~# docker ps
  9. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  10. 7eb5e0b45418 rabbitmq:3.13-management "docker-entrypoint.s…" 3 minutes ago Up 3 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
  11. 17d06c4aca4a ubtest:22.04 "/usr/sbin/sshd -D" 47 hours ago Up 2 minutes receiver
  12. 4a86598c2892 ubtest:22.04 "/usr/sbin/sshd -D" 47 hours ago Up 2 minutes sender
  13. root@k0test1:~#

另外,需要创建一个新的容器,命名为Receiver2:

  1. receiver:
  2. root@k0test1:~# docker run --name receiver2 --hostname receiver -d ubtest:22.04
  3. root@k0test1:~# docker exec -it receiver2 /bin/bash
  4. root@receiver2:/# apt update
  5. root@receiver2:/# apt install python3
  6. root@receiver2:/# apt install python3-pip
  7. root@receiver2:/# python3 -m pip install pika --upgrade
  8. 检查:
  9. root@k0test1:~# docker ps
  10. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  11. 695b9c05829f ubtest:22.04 "/usr/sbin/sshd -D" 6 minutes ago Up 6 minutes receiver2
  12. 7eb5e0b45418 rabbitmq:3.13-management "docker-entrypoint.s…" 2 hours ago Up 2 hours 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
  13. 17d06c4aca4a ubtest:22.04 "/usr/sbin/sshd -D" 2 days ago Up 2 hours receiver
  14. 4a86598c2892 ubtest:22.04 "/usr/sbin/sshd -D" 2 days ago Up 2 hours sender
  15. root@k0test1:~#

网络拓扑

3、Work Queues练习

3.1 概述

在本练习中,将创建一个工作队列(Work Queue),它将被用来在多个工作者(Worker)之间分配需要较长时间来完成的任务( time-consuming tasks)。

工作队列(也称为任务队列)的核心理念:

  1. 避免立即执行复杂任务:复杂任务往往是需要花费系统更多时间和更多资源进行处理的任务。工作队列的设计初衷是减少对即时资源的需求。在某些情况下,如果直接执行某些任务,可能会消耗大量计算资源,导致系统响应变慢或无法处理其他请求。

  2. 延迟执行任务:通过将任务推迟到稍后执行,系统可以保持响应性,继续处理其他请求或任务,而不是等待当前任务完成。

  3. 任务封装为消息工作队列模式中,任务被封装成消息。这意味着任务的描述和所需的数据被打包成一个消息对象,可以被发送和接收。

  4. 发送消息到队列:任务消息被发送到一个队列中。队列充当了一个缓冲区,存储着待处理的任务消息。这种方式允许任务按照发送的顺序或根据其他规则进行处理。

  5. 异步处理:工作队列通常与异步处理模式相结合。任务的发送者不必等待任务完成,可以继续执行其他操作。任务的执行可以由不同的进程或服务器在后台异步完成。

  6. 提高效率和可扩展性:通过使用工作队列,系统可以更有效地分配资源,处理高峰时段的负载,并且更容易进行扩展。因为任务可以在不同的时间点被处理,系统可以根据需要动态地添加更多的处理能力。

总的来说,工作队列是一种有效的机制,用于管理和优化任务的执行,特别是在处理需要大量计算资源或可能影响系统性能的后台任务时。

这个概念在Web应用程序中尤其有用,特别是在无法在短暂的HTTP请求窗口期间处理复杂任务的情况下。以下是对这段话的进一步解读:

  1. HTTP请求窗口的限制:在传统的Web应用程序中,客户端(如浏览器)与服务器之间的交互通常通过HTTP请求来完成。这些请求通常有时间限制,因为浏览器或服务器可能会在一定时间内没有响应的情况下终止连接。

  2. 复杂任务的处理:当涉及到需要长时间处理的任务时,例如大量数据的处理、文件的上传下载、复杂的计算或外部服务的调用,这些任务可能无法在HTTP请求的有限时间内完成。

  3. 用户体验:如果在HTTP请求期间尝试处理这些复杂任务,可能会导致用户界面冻结或响应缓慢,从而影响用户体验。

  4. 任务队列的作用:通过使用任务队列,Web应用程序可以将这些复杂任务转换为消息并发送到队列中。这样,主HTTP请求可以快速响应,告知用户任务已经开始处理,而实际的任务执行则由后台服务异步完成。

  5. 提高响应性和可扩展性:这种方法允许Web应用程序保持响应性,即使在高负载或处理复杂任务时也是如此。此外,通过在队列中管理任务,可以更容易地扩展应用程序以处理更多的请求和任务。

  6. 解耦请求和处理:任务队列还有助于解耦请求的接收和任务的处理。这意味着Web服务器不需要直接处理任务,而是可以专注于接收请求并将任务分发到队列中,由专门的工作进程来处理这些任务。

  7. 增强可靠性:使用任务队列还可以提高应用程序的可靠性。如果任务处理失败,它可以根据需要重试,而不会影响主应用程序的稳定性。

总之,任务队列是Web应用程序处理复杂或长时间运行任务的有效方法,它有助于提高应用程序的性能、可扩展性和用户体验。

3.2 模拟复杂任务的方法

本次练习中,使用 Python 的 time.sleep() 函数来模拟任务的处理时间。以下是详细解读:

  1. 发送字符串代表复杂任务:在实际应用中,任务队列可能用于处理如图像大小调整或PDF文件渲染等实际任务。但在本练习中,我们将使用字符串来代表这些任务。

  2. 模拟任务处理:由于这里没有具体的实际任务,我们将通过模拟的方式来表示任务的执行。这通常用于测试或演示目的。

  3. 使用 time.sleep() 函数time.sleep() 是 Python 中的一个函数,它可以使程序暂停执行指定的秒数。在这个例子中,它被用来模拟任务的处理时间。

  4. 字符串中的点代表复杂度:在这个模拟中,字符串中的每个点(.)代表任务的一定复杂度。这意味着每个点都对应着一秒的处理时间。

  5. 计算模拟任务的持续时间:通过计算字符串中的点的数量,我们可以确定模拟任务应该持续的时间。例如,如果字符串是 "Hello...",它包含三个点,因此模拟的任务将持续三秒。

  6. 模拟任务的执行:在实际的代码实现中,这可能涉及到接收任务字符串,计算点的数量,并使用 time.sleep() 来暂停相应数量的秒数。

  7. 模拟的实用性:虽然这种模拟方法不会执行任何实际的工作,但它可以帮助我们理解任务队列如何处理和调度任务,以及如何使用回调函数来处理任务完成后的逻辑。

  8. 示例:如果有一个任务字符串 "Processing....",它包含五个点,那么在模拟中,这个任务将需要五秒钟来“完成”。

通过这种方式,可以在不执行实际任务的情况下,模拟任务队列的工作流程和任务处理时间,这对于开发和测试任务队列系统非常有用。

3.3 消息和队列工作机制

注:工作队列中,任务被封装为消息进行处理,本文提到的消息、工作、任务几个词语可理解为相互通用。

Round-robin dispatching(轮询分发) 是一种在多个工作者(workers)之间分配任务的方法,尤其适用于工作队列(work queues)或任务队列(task queues)的场景。这种方法的核心思想是均衡分配任务,避免某些工作者(workers)过载而其他工作者闲置 。

在具体实现中,RabbitMQ 默认使用 Round-robin dispatching 来分发消息。当多个消费者订阅了同一个队列时,每条到达的消息会按照顺序分配给下一个消费者。这种方式确保了在平均意义上,每个消费者接收到的消息数量大致相同 。

然而,Round-robin dispatching 也有其局限性。如果某些任务比其他任务更加耗时,这可能导致一部分消费者处理速度较慢,而其他消费者可能在等待这些慢的消费者完成它们的任务 。为了解决这个问题,RabbitMQ 提供了 Fair Dispatch(公平调度)机制,它通过 basic_qos 方法配合 prefetch_count 参数来实现。当设置 prefetch_count 为 1 时,RabbitMQ 会保证在消费者完成并确认(acknowledge)当前消息之前,不会向其发送新的消息,这样就能保证任务更公平地分配给当前可用的消费者 。

此外,消息的持久化(message durability)确认机制(message acknowledgment)也是确保任务不丢失的重要特性。通过设置消息和队列为持久化,即使 RabbitMQ 服务器重启,消息也不会丢失。而消息确认机制则确保了只有在消费者成功处理并确认了消息后,服务器才会从内存中移除该消息,如果消费者在处理过程中崩溃,未确认的消息会被重新放回队列,由其他消费者处理 。

通过这些机制,Round-robin dispatching 和 Fair Dispatch 的结合使用可以有效地提高分布式系统中任务处理的效率和可靠性。

3.4 Sending

进入sender容器,vi编写new_task.py:

  1. root@k0test1:~# docker exec -it sender /bin/bash
  2. root@sender:/# vi new_task.py
  3. root@sender:/# cat new_task.py
  4. import pika
  5. import sys
  6. connection = pika.BlockingConnection(
  7. pika.ConnectionParameters(host='172.17.0.5'))
  8. channel = connection.channel()
  9. channel.queue_declare(queue='task_queue', durable=True)
  10. message = ' '.join(sys.argv[1:]) or "Hello World!"
  11. channel.basic_publish(
  12. exchange='',
  13. routing_key='task_queue',
  14. body=message,
  15. properties=pika.BasicProperties(
  16. delivery_mode=pika.DeliveryMode.Persistent
  17. ))
  18. print(f" [x] Sent {message}")
  19. connection.close()
  20. root@sender:/#

这段代码是一个 Python 脚本,使用了 pika 库来与 RabbitMQ 消息代理进行交互。它的作用是将一条消息发送到一个持久化的队列中。以下是代码的详细解释:

  1. import pika: 导入 pika 模块,这是 Python 的 RabbitMQ 客户端库。

  2. import sys: 导入 Python 的 sys 模块,用来访问命令行参数。

  3. 创建一个到 RabbitMQ 服务器的连接,服务器地址指定为 '172.17.0.5'。这里使用的是默认的 RabbitMQ 端口,没有在代码中指定(默认为 5672)。

    connection = pika.BlockingConnection( pika.ConnectionParameters(host='172.17.0.5'))

  4. 在连接上创建一个新的通道(channel)。

    channel = connection.channel()

  5. 声明一个名为 task_queue持久化队列。这意味着即使 RabbitMQ 服务器重启,队列也会保留。

    channel.queue_declare(queue='task_queue', durable=True)

  6. 准备要发送的消息。如果命令行中有提供参数,message 将是这些参数连接成的字符串;如果没有提供参数,message 将默认为 "Hello World!"

    message = ' '.join(sys.argv[1:]) or "Hello World!"

  7. 使用 basic_publish 方法将消息发送到 task_queue 队列。这里没有使用交换机(exchange),消息直接发送到队列。消息的投递模式设置为持久化,这意味着即使消息在传递过程中服务器重启,消息也不会丢失。

    channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent ))

  8. 打印一条消息,表示消息已经发送。

    print(f" [x] Sent {message}")

  9. 关闭与 RabbitMQ 服务器的连接。

    connection.close()

要运行这个脚本,需要在命令行中提供参数,例如:

python new_task.py "Your message here"

如果没有提供参数,它将发送默认消息 "Hello World!" 到 RabbitMQ 的 task_queue 队列。这个脚本是消息生产者的一个简单示例,用于演示如何使用 pika 库发送消息到 RabbitMQ。

3.5 Receiving

 进入receiver和receiver2容器,vi编写worker.py:

  1. root@k0test1:~# docker exec -it receiver /bin/bash
  2. root@receiver:/# vi worker.py
  3. root@receiver:/# cat worker.py
  4. import pika
  5. import time
  6. connection = pika.BlockingConnection(
  7. pika.ConnectionParameters(host='172.17.0.5'))
  8. channel = connection.channel()
  9. channel.queue_declare(queue='task_queue', durable=True)
  10. print(' [*] Waiting for messages. To exit press CTRL+C')
  11. def callback(ch, method, properties, body):
  12. print(f" [x] Received {body.decode()}")
  13. time.sleep(body.count(b'.'))
  14. print(" [x] Done")
  15. ch.basic_ack(delivery_tag=method.delivery_tag)
  16. channel.basic_qos(prefetch_count=1)
  17. channel.basic_consume(queue='task_queue', on_message_callback=callback)
  18. channel.start_consuming()
  19. root@receiver:/#

这段Python代码是一个RabbitMQ消费者客户端的实现,它使用`pika`库来与RabbitMQ服务器进行交互。下面是对这段代码的详细解释:

1. **导入必要的模块**:
   import pika
   import time
   这里导入了`pika`模块,它是RabbitMQ的Python客户端库,以及`time`模块,用于在处理消息时实现延时。

2. **创建到RabbitMQ的连接**:
   connection = pika.BlockingConnection(
       pika.ConnectionParameters(host='172.17.0.5'))
   创建一个连接到RabbitMQ服务器。这里指定了服务器的IP地址为`172.17.0.5`。

3. **创建通道**:
   channel = connection.channel()
   从连接中创建一个新的通道。通道是RabbitMQ中消息传递的基本单元。

4. **声明队列**:
   channel.queue_declare(queue='task_queue', durable=True)
   声明一个名为`task_queue`的队列,设置为持久化。这意味着即使RabbitMQ服务器重启,队列及其消息也不会丢失。

5. **打印提示信息**:
   print(' [*] Waiting for messages. To exit press CTRL+C')
   打印一条信息,告知用户程序正在等待接收消息,并提示用户可以通过按`CTRL+C`来退出程序。

6. **定义消息处理回调函数**:
   def callback(ch, method, properties, body):
       print(f" [x] Received {body.decode()}")
       time.sleep(body.count(b'.'))
       print(" [x] Done")
       ch.basic_ack(delivery_tag=method.delivery_tag)
   定义一个名为`callback`的函数,该函数将在消息到达时被调用。函数的工作流程如下:
   - 打印接收到的消息内容(`body.decode()`将字节类型的消息解码为字符串)。
   - 根据消息内容中点(`.`)的数量进行延时,使用`time.sleep()`函数实现。
   - 打印消息处理完成的提示。
   - 使用`ch.basic_ack()`方法确认消息已被成功处理。`delivery_tag`是消息的唯一标识,用于确认消息。

7. **设置QoS**:
   channel.basic_qos(prefetch_count=1)
   设置通道的基本QoS(Quality of Service),`prefetch_count=1`参数确保每次只从队列中获取一个消息进行处理,这有助于防止消息的重复处理。

8. **开始消费消息**:
   channel.basic_consume(queue='task_queue', on_message_callback=callback)
   使用`basic_consume()`方法开始消费来自`task_queue`队列的消息。`on_message_callback`参数指定了当消息到达时调用的回调函数。

9. **启动消息消费**:
   channel.start_consuming()
   调用`start_consuming()`方法启动消息消费过程。程序将在此等待并处理消息,直到用户中断程序(例如通过按`CTRL+C`)。

这段代码实现了一个简单的RabbitMQ消费者,它能够连接到服务器,声明队列,消费消息,并在处理完消息后进行确认。这种模式在需要异步处理任务或在分布式系统中进行消息传递时非常有用。

 3.6 开始测试

1、运行两个消费者脚本

在receiver/receiver2这两个容器同时运行worker.py脚本。这两个容器将作为消费者C1和C2,它们都会连接到同一个队列。从同一个队列中获取消息。在RabbitMQ中,队列可以有多个消费者,每个消费者都可以独立地从队列中获取消息。

  1. root@receiver:/# python3 worker.py
  2. [*] Waiting for messages. To exit press CTRL+C
  3. root@receiver2:/# python3 worker.py
  4. [*] Waiting for messages. To exit press CTRL+C

2、运行生产者脚本

  • 在sender这个容器运行new_task.py脚本。这个容器将作为生产者P。
  • 使用new_task.py脚本来向RabbitMQ队列发送消息。每次执行脚本时,它都会将一条消息发送到队列中,并在终端打印一条消息来确认消息已经发送。
  • 连续发送了八条消息,每条消息的内容不同,并且消息的长度和结尾的点号数量也在变化。
  • 这些消息将发送到RabbitMQ的task_queue队列中,等待消费者(C1和C2)来消费。
  1. root@sender:/# python3 new_task.py First message.
  2. [x] Sent First message.
  3. root@sender:/# python3 new_task.py Second message..
  4. [x] Sent Second message..
  5. root@sender:/# python3 new_task.py Third message...
  6. [x] Sent Third message...
  7. root@sender:/# python3 new_task.py Fourth message....
  8. [x] Sent Fourth message....
  9. root@sender:/# python3 new_task.py Fifth message.....
  10. [x] Sent Fifth message.....
  11. root@sender:/# python3 new_task.py Sixth message.................
  12. [x] Sent Sixth message.................
  13. root@sender:/# python3 new_task.py Seventh message.
  14. [x] Sent Seventh message.
  15. root@sender:/# python3 new_task.py eighth message.
  16. [x] Sent eighth message.
  17. root@sender:/#

3、消费者接收消息

  • 消息接收:消费者容器receiver/receiver2正在监听这个队列,并且当消息到达时,它们会接收消息、处理消息(例如,根据消息内容的点号数量进行延时),然后确认消息已经被处理。
  • 消息轮询:从输出来看,消息是在两个消费者之间轮流接收的。receiver首先接收了第一条消息("First message."),然后是第三条、第五条、第七条和第八条消息。而receiver2接收了第二条、第四条和第六条消息。

  • 消息处理:每个消费者在接收到消息后,都会打印一条消息来确认接收(例如:"[x] Received First message."),然后执行相应的处理(在这个例子中,是简单地根据消息中的点号数量进行延时),最后打印一条消息来确认处理完成("[x] Done")。

  • 消息确认:消费者在处理完消息后,发送确认信号给RabbitMQ,这样RabbitMQ就知道消息已经被成功消费,可以将其从队列中移除。

  • 公平分发:由于设置了prefetch_count=1,RabbitMQ会尝试公平地将消息分发给消费者。然而,实际的消息分发可能会受到多种因素的影响,包括网络延迟、消费者处理速度等。

  • 退出消费者:消费者脚本会持续运行,等待并处理消息,直到您通过按下CTRL+C来终止它们。

  1. channel.start_consuming()
  2. root@receiver:/# python3 worker.py
  3. [*] Waiting for messages. To exit press CTRL+C
  4. [x] Received First message.
  5. [x] Done
  6. [x] Received Third message...
  7. [x] Done
  8. [x] Received Fifth message.....
  9. [x] Done
  10. [x] Received Seventh message.
  11. [x] Done
  12. [x] Received eighth message.
  13. [x] Done
  14. root@receiver2:/# python3 worker.py
  15. [*] Waiting for messages. To exit press CTRL+C
  16. [x] Received Second message..
  17. [x] Done
  18. [x] Received Fourth message....
  19. [x] Done
  20. [x] Received Sixth message.................
  21. [x] Done

4、Wireshark抓包

 4.1 抓包方式

参考:《RabbitMQ练习(Hello World)》

4.2 抓包信息和典型数据包

4.2.1 消费者等待消息

receiver容器(消费者,也是工作者worker)创建到 RabbitMQ 服务器的连接信息:

 典型数据包(Frame 16):

  1. Frame 16: 96 bytes on wire (768 bits), 96 bytes captured (768 bits) on interface docker0, id 0
  2. Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:05 (02:42:ac:11:00:05)
  3. Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.5 (172.17.0.5)
  4. Transmission Control Protocol, Src Port: 34932, Dst Port: 5672, Seq: 357, Ack: 571, Len: 30
  5. Advanced Message Queuing Protocol
  6. Type: Method (1)
  7. Channel: 1
  8. Length: 22
  9. Class: Queue (50)
  10. Method: Declare (10)
  11. Arguments
  12. Ticket: 0
  13. Queue: task_queue
  14. .... ...0 = Passive: False
  15. .... ..1. = Durable: True
  16. .... .0.. = Exclusive: False
  17. .... 0... = Auto-Delete: False
  18. ...0 .... = Nowait: False
  19. Arguments

这段文本描述的是AMQP(高级消息队列协议)中的一个队列声明(Queue Declare)方法的调用。以下是对这段文本的解读:

  • 类型(Type):方法(Method),类型编号为1,表示这是一个方法帧(Method Frame)。
  • 通道(Channel):1,指定了AMQP协议操作所使用的通道号。
  • 长度(Length):22,表示方法帧的总长度,包括方法帧头和参数。
  • 类别(Class):队列(Queue),类别编号为50,表示这个帧属于队列相关的操作。
  • 方法(Method):声明(Declare),方法编号为10,表示这是一个声明队列的操作。
  • 参数(Arguments)
    • 票据(Ticket):0,表示客户端使用的是默认的权限票据。
    • 队列(Queue)task_queue,指定了要声明的队列名称。
    • 被动(Passive)False,表示不检查队列是否存在,如果队列不存在,则创建它。
    • 持久(Durable)True,表示队列是持久的,即使在服务器重启后也会保留。
    • 独占(Exclusive)False,表示队列不是独占的,可以被多个客户端访问。
    • 自动删除(Auto-Delete)False,表示队列不会在没有消费者连接时自动删除。
    • 无需等待(Nowait)False,表示客户端期望服务器对这个声明操作做出响应。

在AMQP中,队列声明操作用于创建一个新的队列,或者声明对现有队列的使用。根据参数的不同设置,队列的行为和特性会有所不同。例如,持久化的队列可以保证消息在服务器重启后不会丢失,而独占队列则提供了一种机制,使得队列只能被创建它的连接使用。

 典型数据包(Frame 18):

  1. Frame 18: 85 bytes on wire (680 bits), 85 bytes captured (680 bits) on interface docker0, id 0
  2. Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:05 (02:42:ac:11:00:05)
  3. Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.5 (172.17.0.5)
  4. Transmission Control Protocol, Src Port: 34932, Dst Port: 5672, Seq: 387, Ack: 602, Len: 19
  5. Advanced Message Queuing Protocol
  6. Type: Method (1)
  7. Channel: 1
  8. Length: 11
  9. Class: Basic (60)
  10. Method: Qos (10)
  11. Arguments
  12. Prefetch-Size: 0
  13. Prefetch-Count: 1
  14. .... ...0 = Global: False

这段文本描述的是AMQP协议中的一个基本QoS(Quality of Service,服务质量)方法调用。以下是对这段文本的解读:

  • 类型(Type):方法(Method),类型编号为1,表示这是一个方法帧。
  • 通道(Channel):1,指定了AMQP协议操作所使用的通道号。
  • 长度(Length):11,表示方法帧的总长度,包括方法帧头和参数。
  • 类别(Class):基本(Basic),类别编号为60,表示这个帧属于基本消息发布和接收相关的操作。
  • 方法(Method):QoS(Quality of Service),方法编号为10,表示这是一个设置服务质量的操作。
  • 参数(Arguments)
    • Prefetch-Size:0,表示不限制每个消费者可以预取的消息体的大小。如果设置为0,表示不基于消息体大小来限制预取。
    • Prefetch-Count:1,表示每个消费者可以预取的消息数量限制为1。这意味着消费者在接收到下一条消息之前必须确认当前消息。
    • GlobalFalse,表示这个QoS设置只适用于当前的通道,而不是全局设置。

QoS设置允许客户端和服务器之间协商消息的传输速率和行为,以确保消息的可靠传递。通过设置Prefetch-SizePrefetch-Count,客户端可以控制它想要接收的消息数量,以及在确认之前可以接收多少条消息。这有助于防止消费者因接收过多消息而不堪重负,同时也允许消费者根据自身的处理能力来调整消息的接收速率。

 4.2.2 生产者发送消息

sender容器(生产者)创建到 RabbitMQ 服务器的连接,发送消息:

 典型数据包(Frame 72):

  1. Frame 72: 88 bytes on wire (704 bits), 88 bytes captured (704 bits) on interface docker0, id 0
  2. Ethernet II, Src: 02:42:ac:11:00:02 (02:42:ac:11:00:02), Dst: 02:42:ac:11:00:05 (02:42:ac:11:00:05)
  3. Internet Protocol Version 4, Src: 172.17.0.2 (172.17.0.2), Dst: 172.17.0.5 (172.17.0.5)
  4. Transmission Control Protocol, Src Port: 53486, Dst Port: 5672, Seq: 437, Ack: 602, Len: 22
  5. Advanced Message Queuing Protocol
  6. Type: Content body (3)
  7. Channel: 1
  8. Length: 14
  9. Payload: 4669727374206d6573736167652e 注: 内容为First message.

在AMQP(高级消息队列协议)中,内容体(Content body)部分是消息的实际数据负载。以下是对这段内容的解读:

  • 类型(Type):内容体(Content body),类型编号为3,表示这是一个内容体帧。
  • 通道(Channel):1,这表明消息通过AMQP通道1发送。
  • 长度(Length):14,这表示内容体帧的总长度是14个字节,包括帧头和有效载荷。
  • 负载(Payload):4669727374206d6573736167652e,这是消息的数据部分,通常以十六进制格式表示。

要理解负载部分,我们可以将其视为一个字符串,解码如下:

  • Payload 解码First message.,这是一个简单的文本消息,内容为 "First message."

4.2.3 RabbitMQ 向消费者发送消息

典型数据包(Frame 75):

  1. Frame 75: 183 bytes on wire (1464 bits), 183 bytes captured (1464 bits) on interface docker0, id 0
  2. Ethernet II, Src: 02:42:ac:11:00:05 (02:42:ac:11:00:05), Dst: 02:42:ac:11:00:03 (02:42:ac:11:00:03)
  3. Internet Protocol Version 4, Src: 172.17.0.5 (172.17.0.5), Dst: 172.17.0.3 (172.17.0.3)
  4. Transmission Control Protocol, Src Port: 5672, Dst Port: 34932, Seq: 665, Ack: 475, Len: 117
  5. Advanced Message Queuing Protocol
  6. Type: Method (1)
  7. Channel: 1
  8. Length: 64
  9. Class: Basic (60)
  10. Method: Deliver (60)
  11. Arguments
  12. Consumer-Tag: ctag1.0028a0f65f8645efbcf68f81bdbfd278
  13. Delivery-Tag: 1
  14. .... ...0 = Redelivered: False
  15. Exchange:
  16. Routing-Key: task_queue
  17. Advanced Message Queuing Protocol
  18. Type: Content header (2)
  19. Channel: 1
  20. Length: 15
  21. Class ID: Basic (60)
  22. Weight: 0
  23. Body size: 14
  24. Property flags: 0x1000
  25. Properties
  26. Advanced Message Queuing Protocol
  27. Type: Content body (3)
  28. Channel: 1
  29. Length: 14
  30. Payload: 4669727374206d6573736167652e 注: 内容为First message.

 这段文本描述的是AMQP协议中的一个基本类(Basic)的Deliver方法调用。以下是对这段文本的解读:

  • 类型(Type):方法(Method),类型编号为1,表示这是一个方法帧。
  • 通道(Channel):1,指定了AMQP协议操作所使用的通道号。
  • 长度(Length):64,表示方法帧的总长度,包括方法帧头和参数。
  • 类别(Class):基本(Basic),类别编号为60,表示这个帧属于基本消息发布和接收相关的操作。
  • 方法(Method):Deliver(Deliver),方法编号为60,表示这是一个消息送达通知的方法。

参数的详细说明如下:

  • Consumer-Tagctag1.0028a0f65f8645efbcf68f81bdbfd278,这是消费者标签,用于唯一标识一个消费者。在AMQP中,消费者通过消费者标签来区分属于不同消费者的消息。
  • Delivery-Tag:1,这是消息的唯一标识符,每个消息送达时都会有一个递增的delivery-tag。
  • RedeliveredFalse,这个参数表明消息是否是重新投递的。如果为True,则表示消息之前已经被投递过,但是没有被消费者确认(acknowledged),因此重新投递。
  • Exchange:空值,这个参数应该包含发送消息时所指定的交换机名称,但在这段文本中没有给出具体值,使用缺省交换机。
  • Routing-Keytask_queue,这是路由键,用于指定消息应该发送到哪个队列。在这个例子中,消息通过缺省交换机路由到了名为task_queue的队列。

Deliver方法通常由服务器端发送给客户端,以通知客户端有一条新的消息已经送达,并且可以被消费者接收和处理。客户端在接收到Deliver方法后,应该根据需要进行消息确认(acknowledgement)。如果客户端设置为自动确认模式,则消息在被接收后会自动确认;如果设置为手动确认模式,则需要客户端显式发送确认。

4.2.4 消费者发送消息确认

 典型数据包(Frame 247):

  1. Frame 247: 87 bytes on wire (696 bits), 87 bytes captured (696 bits) on interface docker0, id 0
  2. Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:05 (02:42:ac:11:00:05)
  3. Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.5 (172.17.0.5)
  4. Transmission Control Protocol, Src Port: 34932, Dst Port: 5672, Seq: 475, Ack: 782, Len: 21
  5. Advanced Message Queuing Protocol
  6. Type: Method (1)
  7. Channel: 1
  8. Length: 13
  9. Class: Basic (60)
  10. Method: Ack (80)
  11. Arguments
  12. Delivery-Tag: 1
  13. .... ...0 = Multiple: False

这段文本描述的是AMQP协议中的一个基本类(Basic)的确认(Ack)方法调用。以下是对这段文本的解读:

  • 类型(Type):方法(Method),类型编号为1,表示这是一个方法帧。
  • 通道(Channel):1,指定了AMQP协议操作所使用的通道号。
  • 长度(Length):13,表示方法帧的总长度,包括方法帧头和参数。
  • 类别(Class):基本(Basic),类别编号为60,表示这个帧属于基本消息发布和接收相关的操作。
  • 方法(Method):Ack(Acknowledge),方法编号为80,表示这是一个消息确认的方法。

参数的详细说明如下:

  • Delivery-Tag:1,这是之前通过Deliver方法接收到的消息的唯一标识符。
  • MultipleFalse,这个参数表明确认是否适用于多个消息。如果为True,则表示确认适用于到当前消息之前所有未确认的消息;如果为False,则仅确认指定的单个消息。

Ack方法通常由客户端发送给服务器端,以通知服务器端某条消息已经被成功接收和处理。服务器端在收到Ack方法后,会将对应消息标记为已确认,这样即使客户端发生故障,服务器端也不会再次投递这条消息给该消费者。如果消费者处理消息后没有发送Ack方法,服务器端可能会在一定条件下重新投递消息,具体取决于服务质量(QoS)设置。

4.3 只显示消息发送过程的抓包信息

只显示生产者p发送消息到消息代理(rabbitmq),消息代理(rabbitmq)发送消息到消费者(c1/c2),消费者发送确认消息到消息代理(rabbitmq)。

4.4 只显示消息发送过程的抓包流量图

 5、小结

本次练习专注于工作队列(Work Queues),也称为任务队列(Task Queues)。以下是小结:

1. 背景和前提条件

  • RabbitMQ已经安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,则需要调整连接设置。
  • 使用的Python客户端是Pika,版本为1.0.0。

2. 工作队列的概念

  • 工作队列用于分配耗时任务给多个工作者(workers)。
  • 避免在资源密集型任务上立即执行并等待完成,而是将任务安排到队列中稍后执行。
  • 在Web应用程序中特别有用,因为在短暂的HTTP请求窗口期间处理复杂任务是不可能的。

3. 示例程序

  • new_task.py:根据命令行参数发送消息到工作队列。
  • worker.py:接收队列中的消息并模拟执行任务(使用time.sleep()模拟耗时工作)。

4. 轮询分发(Round-robin dispatching)

  • RabbitMQ默认使用轮询方式分发消息,依次将消息分发给每个消费者。

5. 消息确认(Message acknowledgment)

  • 消息确认确保消息在消费者处理完成后才从队列中删除。
  • 如果消费者在处理消息时崩溃,未确认的消息将重新入队并可能被其他消费者接收。

6. 消息持久性(Message durability)

  • 通过将队列和消息标记为持久(durable),确保在RabbitMQ服务器重启后消息不会丢失。

7. 公平分发(Fair dispatch)

  • 使用basic_qos方法和prefetch_count=1设置,确保每个工作者在处理并确认当前消息之前不会接收到新消息。

通过本次练习,详细解释了如何在RabbitMQ中使用工作队列来处理任务,以及如何确保任务的可靠性和消息的持久性。

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/1015797
推荐阅读
相关标签
  

闽ICP备14008679号