当前位置:   article > 正文

轻量级进程间通信ZMQ详解(C&&PYTHON)

zmq

目录

一:ZMQ简介

二:ZMQ的request-reply模式

三:ZMQ的pub-sub模式


一:ZMQ简介

        官方介绍:ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.

        大概的意思就是:zmq看上去像是一个lib库,我们调用lib库里的函数来实现socket通信,但是其支持的功能远不止如此。可利用zmq进行消息的传递,无论消息是进程间的,程序内的传递,还是以TCP,多播方式传递。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。ZMQ的快速足以胜任集群应用产品。它的异步I/O机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ有着多语言支持,并能在几乎所有的操作系统上运行。

        为什么会接触到ZMQ,工作中有个需求就是有套用C语言写的测试工具,其在后台会在运行过程中产生一系列的测试数据,现需要将测试数据用图表动态的展示出来。用的是FLASK框架,则会产生一个需求,如何将测试工具产生的数据传递到FLASK框架里去给到前端去显示。FLASK用的是PYTHON代码,最开始想到的是用RABBITMQ,虽然其功能丰富,但是RABBITMQ对于此需求有点大材小用,于是想寻求一个轻量级的消息传递组件,ZMQ正好符合此需求,事实上在开发的过程中,其大大简化了socket编程,仅仅使用几个其提供的API即可完成进程间的通信。

        下面会根据上述需求给出穿刺代码

二:ZMQ的request-reply模式

         使用REQ-REP套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用zmq_recv()接收,如此循环(这里所谓的客户端就是发消息的一方,服务端就是接收消息回响应的一方)。此模式支持阻塞式的和非阻塞式的。非阻塞式的,客户端可以不管服务端有没有接收到消息,一直不断的发。本次需求采用非阻塞式的,因为工具测试的时候可不能因为服务端没有收到消息,不回响应而停在那。

        下面同时给出C语言版本和python版本的zmq代码

        C语言:

        服务端

  1. #include <zmq.h>
  2. #include <stdio.h>
  3. #include <unistd.h>
  4. #include <string.h>
  5. #include <assert.h>
  6. int main(void)
  7. {
  8. void * context = zmq_ctx_new();
  9. void * socket = zmq_socket(context, ZMQ_REP);
  10. zmq_connect(socket, "tcp://localhost:5555");
  11. char buffer[100] = {0};
  12. while(1)
  13. {
  14. int bytes = zmq_recv(socket, buffer, 100, 0);
  15. printf("recv message from client:%s\n",buffer);
  16. sleep(1);
  17. const char * replyMsg = "I am xiaoming";
  18. bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);
  19. }
  20. zmq_close(socket);
  21. zmq_ctx_destroy(context);
  22. return 0;
  23. }

         客户端:

  1. #include <zmq.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. static void s_send(void *socket, char *string)
  5. {
  6. // 初始化一个zmq_msg_t对象, 分配的大小为string的大小
  7. zmq_msg_t msg;
  8. zmq_msg_init_size(&msg, strlen(string));
  9. memcpy(zmq_msg_data(&msg), string, strlen(string));
  10. // 发送数据
  11. //printf("send data! %s\n",msg);
  12. int rc = zmq_msg_send(&msg, socket, ZMQ_DONTWAIT);
  13. if(rc == -1)
  14. {
  15. printf("send dptData faild!");
  16. }
  17. // 关闭zmq_msg_t对象
  18. zmq_msg_close(&msg);
  19. }
  20. int main (void)
  21. {
  22. // Socket to talk to clients
  23. void *context = zmq_ctx_new ();
  24. void *requester = zmq_socket (context, ZMQ_REQ);
  25. int rc = zmq_bind (requester, "tcp://*:5555");
  26. char input[100] = {0};
  27. char buffer[100] = {0};
  28. while (1)
  29. {
  30. snprintf(input,100,"who are you?");
  31. s_send(requester,input);
  32. zmq_recv (requester, buffer, 100, 0);
  33. printf("recv message from server:%s\n",buffer);
  34. memset(input,0,100);
  35. memset(buffer,0,100);
  36. }
  37. return 0;
  38. }

编译:

  1. gcc zmqclient.c -o zmqclient -lzmq
  2. gcc zmqserver.c -o zmqserver -lzmq

先起客户端,其打印如下:

  1. send dptData faild!recv message from server:
  2. send dptData faild!recv message from server:
  3. send dptData faild!recv message from server:
  4. send dptData faild!recv message from server:
  5. send dptData faild!recv message from server:
  6. send dptData faild!recv message from server:
  7. recv message from server:I am xiaoming
  8. recv message from server:I am xiaoming
  9. recv message from server:I am xiaoming
  10. recv message from server:I am xiaoming
  11. recv message from server:I am xiaoming

        可以看到在非阻塞的方式下,客户端起来后,也不管服务端有没有连上,先发了再说,也不会阻塞在那。

起服务端,其打印如下:

  1. recv message from client:who are you?
  2. recv message from client:who are you?
  3. recv message from client:who are you?
  4. recv message from client:who are you?
  5. recv message from client:who are you?
  6. recv message from client:who are you?
  7. recv message from client:who are you?

需要特别注意的是以下两个函数,消息收发函数

  1. int zmq_recv (void *socket, void *buf, size_t len, int flags);
  2. int zmq_send (void *socket, void *buf, size_t len, int flags);

        zmq_recv 和 zmq_send 默认都是阻塞的,可以通过flags=ZMQ_DONTWAIT参数来设置为非阻塞模式。  buf 和 len都是靠应用程序来保证的。

        对于阻塞模式,zmq_recv的返回值是接收到的字节数,注意如果超过 len,后面的数据将会被截断,但返回值的长度却是原本没有被截掉的长度。 如果错误,或者在非阻塞模式下没有消息,返回-1,并设置  errno。

        另外对于zmq发送消息的理解可参考以下文章:https://dongshao.blog.csdn.net/article/details/105991716

下面给出Python代码,就不做详细解释了

客户端:

  1. import zmq
  2. context = zmq.Context()
  3. socket = context.socket(zmq.REP)
  4. socket.connect('tcp://127.0.0.1:5555')
  5. while True:
  6. message = socket.recv()
  7. print(message)
  8. response = 'server response!'
  9. socket.send(response.encode())

服务端:

  1. import zmq
  2. context = zmq.Context()
  3. socket = context.socket(zmq.REQ)
  4. socket.bind('tcp://*:5555')
  5. while True:
  6. data = input('input your data:')
  7. socket.send(data.encode())
  8. response = socket.recv()
  9. print(response)

三:ZMQ的pub-sub模式

         PUB-SUB套接字组合是异步的。客户端在一个循环体中使用recv ()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用send ()发送消息,但不能再PUB套接字上使用recv ()

下面给出一个用C代码写的PUB端,即主动产生消息的一方,PYTHON写的SUB端,取消息的一方

消息生产方PUB

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <assert.h>
  4. #include <unistd.h>
  5. #include <string.h>
  6. #include <time.h>
  7. #include <zmq.h>
  8. // 随机生成0...num-1的随机数
  9. #define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))
  10. // 将string消息格式化为zmq_meg_t对象, 然后发往socket套接字上
  11. static int s_send(void *socket, char *string);
  12. int main()
  13. {
  14. // 1.初始化上下文
  15. void *context = zmq_ctx_new();
  16. // 2.创建、绑定套接字
  17. void *publisher = zmq_socket(context, ZMQ_PUB);
  18. assert(publisher != NULL);
  19. // 此处我们将发布者绑定到一个tcp节点上和一个ipc节点上, 但是本案例我们只使用tcp, ipc那个只是演示说明zmq的套接字可以绑定到多个节点上
  20. int rc = zmq_bind(publisher, "tcp://*:5555");
  21. assert(rc == 0);
  22. // 3.初始化随机数发生器
  23. srandom((unsigned)time(NULL));
  24. // 4.循环发送数据
  25. while(1)
  26. {
  27. // 5.随机生成邮政编码、温度、适度
  28. int zipcode, temperature, relhumidity;
  29. zipcode = randof(100000);
  30. temperature = randof(215) - 80;
  31. relhumidity = randof(50) + 10;
  32. // 6.将消息发送给所有的订阅者
  33. char update[20];
  34. sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);
  35. rc = s_send(publisher, update);
  36. assert(rc);
  37. sleep(1);
  38. }
  39. // 7.关闭套接字、销毁上下文
  40. zmq_close(publisher);
  41. zmq_ctx_destroy(context);
  42. return 0;
  43. }
  44. static int s_send(void *socket, char *string)
  45. {
  46. // 初始化一个zmq_msg_t对象, 分配的大小为string的大小
  47. zmq_msg_t msg;
  48. zmq_msg_init_size(&msg, strlen(string));
  49. memcpy(zmq_msg_data(&msg), string, strlen(string));
  50. // 发送数据
  51. printf("send data! %s\n",msg);
  52. int rc = zmq_msg_send(&msg, socket, 0);
  53. // 关闭zmq_msg_t对象
  54. zmq_msg_close(&msg);
  55. return rc;
  56. }

 消息消费端SUB

  1. import zmq
  2. context = zmq.Context()
  3. socket = context.socket(zmq.SUB)
  4. socket.connect('tcp://127.0.0.1:5555')
  5. socket.setsockopt(zmq.SUBSCRIBE, ''.encode())
  6. while True:
  7. data = socket.recv()
  8. print(data)

需要注意的是:

1,这个模式是单向的,就是说 pub只能发, sub只能收,

2,sub可以注册多个pub,并且多个pub上的消息会公平的过来。

3,如果pub没有任何sub,那么消息将会被丢弃。

4,如果sub消费得比较慢,消息就会堆积在pub端,在v3.x版本里面,tcp 或 ipc的过滤是发生在publisher,而在低版本里面,所有的过滤都是发生在subscriber,这样就比较浪费流量和资源

        sub 除了要创建 ZMQ_SUB 类型的socket,并连接之外,还要 调用 zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter)); 来进行注册,才有效果。 其中filter用来匹配消息开头的字符串,如果匹配则接受下来,否则丢弃;但如果filter = NULL,并且长度为0的话,则表示所有的消息都接收。 

        publisher的第一个包经常是会被丢掉的,即便 sub端先起来,然后启动push 发送消息,刚开始的消息也有可能丢的  。因为即便是再快的网络,建立连接都是需要一些时间的,比如几个毫秒,而用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的),再结合之前说的如果publisher没有任何subscriber连上来的话消息会被丢弃。官网给了两个解决方案;1, 让发布者先不发数据,而是等订阅者真正连上之后,再发数据; 2,就是发布是永不停止的,没有开始与尽头的概念。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/122717?site
推荐阅读
相关标签
  

闽ICP备14008679号