赞
踩
Linux下的IPC进程间通信包括有:消息队列,管道,共享内存,信号量,socket等。
在寻找信号量通信的使用库时,仅仅找到了multiprocessing
这个库,并没有发现能够直接和Linux C中的Semaphore直接通信的方法,于是自己动手封装了一遍库,使用python调用C动态库的方式实现。
如果有已经编写好的稳定的库,还请留言告知~。
由于工作内容原因,需要实现两个进程间通信,并且一个进程是使用C编写的,另一个是Python编写,进程间的通信需要测试下使用共享内存的效率,已经实现的有C进程与C进程间的共享内存传输测试,为了保证操作的唯一性,必须使用信号量对共享内存中的数据进行保护,但是在使用Python实现时,发现没有找到可以和C进程信号量通信的方法,于是采用本办法,将C的信号量实现封装一层,再使用Python调用。
/* * linux_ipc_sem.c * * Created on: Dec 22, 2021 * Author: noven_zhang */ #include "linux_ipc_sem.h" /** * @brief 封装python使用的sem_open函数 * @param[in] *__name : 信号量名称 * @param[in] __oflag : 标志,具体参考man sem_open详细说明 * @param[in] mode : 模式,权限,具体参考man sem_open详细说明 * @param[in] value : 初始值,一般都为0, 具体参考man sem_open详细说明 * @return * @retval NULL : 失败 * @retval not NULL: 成功,信号量flag存放地址 * @note 详细信息参考 man sem_open */ void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value) { sem_t* sem_flag = NULL; sem_flag = sem_open(__name, __oflag, (mode_t)mode, value); if (NULL == sem_flag) { fprintf(stderr, "failed to open semaphore: %s\n", __name); return NULL; } return (void*)sem_flag; } /** * @brief 封装python使用的sem_close函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @return * @retval -1 : 失败 * @retval 0 : 成功 * @note 详细信息参考 man sem_close */ int py_sem_close(void* sem_flag) { int ret_flag = -1; ret_flag = sem_close((sem_t*)sem_flag); return ret_flag; } /** * @brief 封装python使用的sem_unlink函数 * @param[in] __name : 需要解绑的信号量的名称 * @return * @retval -1 : 失败 * @retval 0 : 成功 * @note 详细信息参考 man sem_unlink */ int py_sem_unlink(const char *__name) { int ret_flag = -1; ret_flag = sem_unlink(__name); return ret_flag; } /** * @brief 封装python使用的sem_wait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @return * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_wait */ int py_sem_wait(void* sem_flag) { int ret_flag = -1; ret_flag = sem_wait((sem_t*)sem_flag); return ret_flag; } /** * @brief 封装python使用的sem_timedwait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @param[in] secs : 延时到的秒,从1970-01-01 00:00:00开始的秒 * @param[in] nsecs : 延时到的纳秒 * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_timedwait */ int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs) { int ret_flag = -1; struct timespec delay_time = {0}; delay_time.tv_sec = secs; delay_time.tv_nsec = nsecs; ret_flag = sem_timedwait((sem_t*)sem_flag, &delay_time); return ret_flag; } /** * @brief 封装python使用的sem_trywait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_trywait */ int py_sem_trywait(void* sem_flag) { int ret_flag = -1; ret_flag = sem_trywait((sem_t*)sem_flag); return ret_flag; } /** * @brief 封装python使用的sem_post函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_post */ int py_sem_post(void* sem_flag) { int ret_flag = -1; ret_flag = sem_post((sem_t*)sem_flag); return ret_flag; } /** * @brief 封装python使用的sem_getvalue函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_getvalue */ int py_sem_getvalue(void* sem_flag) { int ret_flag = -1; int val_addr = -1; ret_flag = sem_getvalue((sem_t*)sem_flag, &val_addr); if(ret_flag < 0) { fprintf(stderr, "failed to get sem val\n"); } return val_addr; }
/* * linux_ipc_sem.h * * Created on: Dec 22, 2021 * Author: racobit */ #ifndef LINUX_IPC_SEM_H_ #define LINUX_IPC_SEM_H_ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <stdint.h> #include <sys/time.h> #include <errno.h> #include <fcntl.h> /* O_CRAT */ #include <sys/stat.h> #include <semaphore.h> /** * @brief 封装python使用的sem_open函数 * @param[in] *__name : 信号量名称 * @param[in] __oflag : 标志,具体参考man sem_open详细说明 * @param[in] mode : 模式,权限,具体参考man sem_open详细说明 * @param[in] value : 初始值,一般都为0, 具体参考man sem_open详细说明 * @return * @retval NULL : 失败 * @retval not NULL: 成功,信号量flag存放地址 * @note 详细信息参考 man sem_open */ extern void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value); /** * @brief 封装python使用的sem_close函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @return * @retval -1 : 失败 * @retval 0 : 成功 * @note 详细信息参考 man sem_close */ extern int py_sem_close(void* sem_flag); /** * @brief 封装python使用的sem_unlink函数 * @param[in] __name : 需要解绑的信号量的名称 * @return * @retval -1 : 失败 * @retval 0 : 成功 * @note 详细信息参考 man sem_unlink */ extern int py_sem_unlink(const char *__name); /** * @brief 封装python使用的sem_wait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @return * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_wait */ extern int py_sem_wait(void* sem_flag); /** * @brief 封装python使用的sem_timedwait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @param[in] secs : 延时到的秒,从1970-01-01 00:00:00开始的秒 * @param[in] nsecs : 延时到的纳秒 * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_timedwait */ extern int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs); /** * @brief 封装python使用的sem_trywait函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_trywait */ extern int py_sem_trywait(void* sem_flag); /** * @brief 封装python使用的sem_post函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_post */ extern int py_sem_post(void* sem_flag); /** * @brief 封装python使用的sem_getvalue函数 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open * @returen * @retval 0 : 成功 * @retval 非0 : 失败 * @note 详细信息参考 man sem_getvalue */ extern int py_sem_getvalue(void* sem_flag); #endif /* LINUX_IPC_SEM_H_ */
gcc -c -FPIC -o linux_ipc_sem.o linux_ipc_sem.c
gcc -shared -o liblinux_ipc_sem_pyapi.so linux_ipc_sem.o -lpthread
生成的库即为封装后的函数库
在python中调用C封装的函数库,将这些函数的使用封装成一个类
from ctypes import * import os import sys class linux_ipc_semaphore_pyapi: library = cdll.LoadLibrary("./liblinux_ipc_sem_pyapi.so") #liblinux_ipc_for_python_share sem_flag = 0 ## sem_name: 信号量名称 ## oflag: 模式,参考man sem_open, python使用os库中的变量 ## mode: 权限,使用8进制表示,python中使用0o开头表示8进制 ## value: 信号量初始值,默认为0 def sem_open(self, sem_name, oflag, mode, value): self.library.py_sem_open.argtypes = [c_char_p, c_int, c_uint, c_uint] self.library.py_sem_open.restype = c_void_p name = create_string_buffer(sem_name.encode('utf-8')) self.sem_flag = self.library.py_sem_open(name, c_int(oflag), c_uint(mode), c_uint(value)) return self.sem_flag def sem_close(self, sem_flag): self.library.py_sem_close.argtypes = [c_void_p,] self.library.py_sem_close.restype = c_int flag = self.library.py_sem_close(c_void_p(sem_flag)) return flag def sem_unlink(self, sem_name): self.library.py_sem_unlink.argtypes = [c_char_p,] self.library.py_sem_unlink.restype = c_int name = create_string_buffer(sem_name.encode('utf-8')) flag = self.library.py_sem_unlink(name) return flag def sem_wait(self, sem_flag): self.library.py_sem_wait.argtypes = [c_void_p,] self.library.py_sem_wait.restype = c_int flag = self.library.py_sem_wait(c_void_p(sem_flag)) return flag def sem_timedwait(self, sem_flag, time_secs, time_nsecs): self.library.py_sem_timedwait.argtypes = [c_void_p, c_int64, c_int64] self.library.py_sem_timedwait.restype = c_int flag = self.library.py_sem_timedwait(c_void_p(sem_flag), c_int64(time_secs), c_int64(time_nsecs)) return flag def sem_trywait(self, sem_flag): self.library.py_sem_trywait.argtypes = [c_void_p,] self.library.py_sem_trywait.restype = c_int flag = self.library.py_sem_trywait(c_void_p(sem_flag)) return flag def sem_post(self, sem_flag): self.library.py_sem_post.argtypes = [c_void_p,] self.library.py_sem_post.restype = c_int flag = self.library.py_sem_post(c_void_p(sem_flag)) return flag def sem_getvalue(self, sem_flag): self.library.py_sem_getvalue.argtypes = [c_void_p] self.library.py_sem_getvalue.restype = c_int ret_val = self.library.py_sem_getvalue(c_void_p(sem_flag)) if ret_val < 0: print("failed to get sem val") return ret_val
对封装的函数进行测试
import os import time import sys from ctypes import * from linux_ipc_pyapi_sem import * global_sem_name = "/test1" global_sem_test_count = 100000 def linux_ipc_sem_pub(): global global_sem_name global global_sem_test_count print("pub") sem_test = linux_ipc_semaphore_pyapi() sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0) if sem_flag == 0: print("failed to create sem: ", global_sem_name) return -1 for i in range(0, global_sem_test_count): flag = sem_test.sem_post(sem_flag) if flag < 0: print("sem post failed") sem_test.sem_close(sem_flag) sem_test.sem_unlink(global_sem_name) return -1 time.sleep(0.030) sem_test.sem_close(sem_flag) sem_test.sem_unlink(global_sem_name) return 0 def linux_ipc_sem_sum(): global global_sem_name global global_sem_test_count print("sum") sem_val = 10 sem_test = linux_ipc_semaphore_pyapi() sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0) if sem_flag == 0: print("failed to create sem: ", global_sem_name) return -1 for i in range(0, global_sem_test_count): flag = sem_test.sem_wait(sem_flag) if flag < 0: print("sem wait failed") sem_test.sem_close(sem_flag) sem_test.sem_unlink(global_sem_name) return -1 flag = sem_test.sem_getvalue(sem_flag) print("[",i,"],sem val",flag) if flag < 0: print("sem getval failed") sem_test.sem_close(sem_flag) sem_test.sem_unlink(global_sem_name) return -1 sem_test.sem_close(sem_flag) sem_test.sem_unlink(global_sem_name) return 0 if __name__ == "__main__": selec_flag = int(sys.argv[1]) print(selec_flag) if selec_flag == 1: linux_ipc_sem_pub() else: linux_ipc_sem_sum()
测试脚本:
# 接收端
python3 linux_ipc_pyapi_sem_test.py 2
# 发送端
python3 linux_ipc_pyapi_sem_test.py 1
测试运行无误,基本可以使用
C进程测试代码:
/* * main.c * * Created on: Dec 22, 2021 * Author: noven_zhang */ #include <linux_ipc_sem.h> static char *sem_name = "/test1"; const int test_count = 100000; int32_t linux_ipc_sem_pub() { sem_t *flag = NULL; flag = py_sem_open(sem_name, O_CREAT, 0666, 0); int i = 0; for(i = 0; i < test_count; i++) { py_sem_post(flag); usleep(30000); } if (NULL == flag) { fprintf(stderr, "failed to open semaphore: %s\n", strerror(errno)); exit(EXIT_FAILURE); } py_sem_close(flag); py_sem_unlink(sem_name); return 0; } int32_t linux_ipc_sem_sum() { int32_t i = 0; int32_t sem_val = 0; sem_t *flag = NULL; flag = py_sem_open(sem_name,O_CREAT, 0666, 0); if (NULL == flag) { fprintf(stderr, "failed to open semaphore: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (i = 0; i < test_count; i++) { py_sem_wait(flag); sem_val = py_sem_getvalue(flag); fprintf(stdout, "[%3d]semaphore value : %d\n", i, sem_val); } return 0; } int main(int argc, char **argv) { /* 测试信号量使用 */ if (argc <= 1) { fprintf(stderr, "please input argv\n"); exit(EXIT_FAILURE); } if (1 == atoi(argv[1])) { fprintf(stdout, "publisher !\n"); linux_ipc_sem_pub(); } else if(2 == atoi(argv[1])) { fprintf(stdout, "consumer !\n"); linux_ipc_sem_sum(); } return 0; }
编译生成运行程序:
# 编译生成C进程
gcc -o linux_ipc_sem_test -llinux_ipc_sem_pyapi
# 运行python api测试进程,接收端
python3 linux_ipc_pyapi_sem_test.py 2
# 运行C测试进程,发送端
./linux_ipc_sem_test 1
初步测试,该方法可以实现使用python进程与C进程的信号量通信
如果有问题,请在评论区指正。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。