当前位置:   article > 正文

Python——Python中使用linux下的IPC通信_python 调用 linux ipc 消息通道

python 调用 linux ipc 消息通道

在Python中使用Linux的IPC通信

说明

Linux下的IPC进程间通信包括有:消息队列,管道,共享内存,信号量,socket等。

在寻找信号量通信的使用库时,仅仅找到了multiprocessing这个库,并没有发现能够直接和Linux C中的Semaphore直接通信的方法,于是自己动手封装了一遍库,使用python调用C动态库的方式实现。

如果有已经编写好的稳定的库,还请留言告知~。

目的

由于工作内容原因,需要实现两个进程间通信,并且一个进程是使用C编写的,另一个是Python编写,进程间的通信需要测试下使用共享内存的效率,已经实现的有C进程与C进程间的共享内存传输测试,为了保证操作的唯一性,必须使用信号量对共享内存中的数据进行保护,但是在使用Python实现时,发现没有找到可以和C进程信号量通信的方法,于是采用本办法,将C的信号量实现封装一层,再使用Python调用。

Semaphore C代码封装

源代码: linux_ipc_sem.c

/*
 * linux_ipc_sem.c
 *
 *  Created on: Dec 22, 2021
 *      Author: noven_zhang
 */

#include "linux_ipc_sem.h"

/**
 * @brief 封装python使用的sem_open函数
 * @param[in] *__name 	: 信号量名称
 * @param[in] __oflag 	: 标志,具体参考man sem_open详细说明
 * @param[in] mode		: 模式,权限,具体参考man sem_open详细说明
 * @param[in] value		: 初始值,一般都为0, 具体参考man sem_open详细说明
 * @return
 * 		@retval NULL	: 失败
 * 		@retval not NULL: 成功,信号量flag存放地址
 * @note 详细信息参考 man sem_open
 */
void* py_sem_open(const char *__name, int __oflag, unsigned int mode, unsigned int value)
{
	sem_t* sem_flag = NULL;

	sem_flag = sem_open(__name, __oflag, (mode_t)mode, value);

	if (NULL == sem_flag)
	{
		fprintf(stderr, "failed to open semaphore: %s\n", __name);
		return NULL;
	}

	return (void*)sem_flag;
}

/**
 * @brief 封装python使用的sem_close函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @return
 * 		@retval -1 : 失败
 * 		@retval 0  : 成功
 * @note 详细信息参考 man sem_close
 */
int py_sem_close(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_close((sem_t*)sem_flag);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_unlink函数
 * @param[in] __name : 需要解绑的信号量的名称
 * @return
 * 		@retval -1 : 失败
 * 		@retval 0  : 成功
 * @note 详细信息参考 man sem_unlink
 */
int py_sem_unlink(const char *__name)
{
	int ret_flag = -1;
	ret_flag = sem_unlink(__name);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_wait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @return
 * 		@retval 0  	: 成功
 * 		@retval 非0 	: 失败
 * @note 详细信息参考 man sem_wait
 */
int py_sem_wait(void* sem_flag)
{
	int ret_flag = -1;
	ret_flag = sem_wait((sem_t*)sem_flag);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_timedwait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @param[in] secs : 延时到的秒,从1970-01-01 00:00:00开始的秒
 * @param[in] nsecs : 延时到的纳秒
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_timedwait
 */
int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs)
{
	int ret_flag = -1;
	struct timespec delay_time = {0};
	delay_time.tv_sec = secs;
	delay_time.tv_nsec = nsecs;

	ret_flag = sem_timedwait((sem_t*)sem_flag, &delay_time);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_trywait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_trywait
 */
int py_sem_trywait(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_trywait((sem_t*)sem_flag);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_post函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_post
 */
int py_sem_post(void* sem_flag)
{
	int ret_flag = -1;

	ret_flag = sem_post((sem_t*)sem_flag);

	return ret_flag;
}

/**
 * @brief 封装python使用的sem_getvalue函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_getvalue
 */
int py_sem_getvalue(void* sem_flag)
{
	int ret_flag = -1;
	int val_addr = -1;

	ret_flag = sem_getvalue((sem_t*)sem_flag, &val_addr);
	if(ret_flag < 0)
	{
		fprintf(stderr, "failed to get sem val\n");
	}

	return val_addr;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161

头文件 linux_ipc_sem.h

/*
 * linux_ipc_sem.h
 *
 *  Created on: Dec 22, 2021
 *      Author: racobit
 */

#ifndef LINUX_IPC_SEM_H_
#define LINUX_IPC_SEM_H_



#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/time.h>
#include <errno.h>

#include <fcntl.h>	/* O_CRAT */
#include <sys/stat.h>
#include <semaphore.h>


/**
 * @brief 封装python使用的sem_open函数
 * @param[in] *__name 	: 信号量名称
 * @param[in] __oflag 	: 标志,具体参考man sem_open详细说明
 * @param[in] mode		: 模式,权限,具体参考man sem_open详细说明
 * @param[in] value		: 初始值,一般都为0, 具体参考man sem_open详细说明
 * @return
 * 		@retval NULL	: 失败
 * 		@retval not NULL: 成功,信号量flag存放地址
 * @note 详细信息参考 man sem_open
 */
extern void* py_sem_open(const char *__name, int __oflag, unsigned int  mode, unsigned int value);

/**
 * @brief 封装python使用的sem_close函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @return
 * 		@retval -1 : 失败
 * 		@retval 0  : 成功
 * @note 详细信息参考 man sem_close
 */
extern int py_sem_close(void* sem_flag);

/**
 * @brief 封装python使用的sem_unlink函数
 * @param[in] __name : 需要解绑的信号量的名称
 * @return
 * 		@retval -1 : 失败
 * 		@retval 0  : 成功
 * @note 详细信息参考 man sem_unlink
 */
extern int py_sem_unlink(const char *__name);

/**
 * @brief 封装python使用的sem_wait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @return
 * 		@retval 0  	: 成功
 * 		@retval 非0 	: 失败
 * @note 详细信息参考 man sem_wait
 */
extern int py_sem_wait(void* sem_flag);

/**
 * @brief 封装python使用的sem_timedwait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @param[in] secs : 延时到的秒,从1970-01-01 00:00:00开始的秒
 * @param[in] nsecs : 延时到的纳秒
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_timedwait
 */
extern int py_sem_timedwait(void* sem_flag, long int secs, long int nsecs);

/**
 * @brief 封装python使用的sem_trywait函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_trywait
 */
extern int py_sem_trywait(void* sem_flag);

/**
 * @brief 封装python使用的sem_post函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_post
 */
extern int py_sem_post(void* sem_flag);

/**
 * @brief 封装python使用的sem_getvalue函数
 * @param[in] sem_flag : py_sem_open的返回值@see py_sem_open
 * @returen
 * 		@retval 0 : 成功
 * 		@retval 非0 : 失败
 * @note 详细信息参考 man sem_getvalue
 */
extern int py_sem_getvalue(void* sem_flag);

#endif /* LINUX_IPC_SEM_H_ */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111

编译生成库文件

gcc -c -FPIC -o linux_ipc_sem.o linux_ipc_sem.c 
gcc -shared -o liblinux_ipc_sem_pyapi.so linux_ipc_sem.o -lpthread
  • 1
  • 2

生成的库即为封装后的函数库

Semaphore Python API封装

在python中调用C封装的函数库,将这些函数的使用封装成一个类

Python API: linux_ipc_pyapi_sem.py

from ctypes import *
import os
import sys

class linux_ipc_semaphore_pyapi:
    library = cdll.LoadLibrary("./liblinux_ipc_sem_pyapi.so")
#liblinux_ipc_for_python_share
    sem_flag = 0

    ## sem_name: 信号量名称
    ## oflag: 模式,参考man sem_open, python使用os库中的变量
    ## mode: 权限,使用8进制表示,python中使用0o开头表示8进制
    ## value: 信号量初始值,默认为0
    def sem_open(self, sem_name, oflag, mode, value):
        self.library.py_sem_open.argtypes = [c_char_p, c_int, c_uint, c_uint]
        self.library.py_sem_open.restype = c_void_p

        name = create_string_buffer(sem_name.encode('utf-8'))
        self.sem_flag = self.library.py_sem_open(name, c_int(oflag), c_uint(mode), c_uint(value))
        return self.sem_flag

    def sem_close(self, sem_flag):
        self.library.py_sem_close.argtypes = [c_void_p,]
        self.library.py_sem_close.restype = c_int
        
        flag = self.library.py_sem_close(c_void_p(sem_flag))
        return flag
    
    def sem_unlink(self, sem_name):
        self.library.py_sem_unlink.argtypes = [c_char_p,]
        self.library.py_sem_unlink.restype = c_int

        name = create_string_buffer(sem_name.encode('utf-8'))
        flag = self.library.py_sem_unlink(name)
        return flag
    
    def sem_wait(self, sem_flag):
        self.library.py_sem_wait.argtypes = [c_void_p,]
        self.library.py_sem_wait.restype = c_int

        flag = self.library.py_sem_wait(c_void_p(sem_flag))
        return flag
    
    def sem_timedwait(self, sem_flag, time_secs, time_nsecs):
        self.library.py_sem_timedwait.argtypes = [c_void_p, c_int64, c_int64]
        self.library.py_sem_timedwait.restype = c_int

        flag = self.library.py_sem_timedwait(c_void_p(sem_flag), c_int64(time_secs), c_int64(time_nsecs))
        return flag
    
    def sem_trywait(self, sem_flag):
        self.library.py_sem_trywait.argtypes = [c_void_p,]
        self.library.py_sem_trywait.restype = c_int

        flag =  self.library.py_sem_trywait(c_void_p(sem_flag))
        return flag
    
    def sem_post(self, sem_flag):
        self.library.py_sem_post.argtypes = [c_void_p,]
        self.library.py_sem_post.restype = c_int
        
        flag = self.library.py_sem_post(c_void_p(sem_flag))

        return flag
    
    def sem_getvalue(self, sem_flag):
        self.library.py_sem_getvalue.argtypes = [c_void_p]
        self.library.py_sem_getvalue.restype = c_int
        
        ret_val  = self.library.py_sem_getvalue(c_void_p(sem_flag))
        if ret_val < 0:
            print("failed to get sem val")
        return ret_val
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

linux_ipc_pyapi_sem.py 测试: linux_ipc_pyapi_sem_test.py

对封装的函数进行测试

import os
import time
import sys

from ctypes import *
from linux_ipc_pyapi_sem import *

global_sem_name = "/test1"
global_sem_test_count = 100000
def linux_ipc_sem_pub():
    global global_sem_name
    global global_sem_test_count
    print("pub")
    sem_test = linux_ipc_semaphore_pyapi()
    sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
    if sem_flag == 0:
        print("failed to create sem: ", global_sem_name)
        return -1

    for i in range(0, global_sem_test_count):
        flag = sem_test.sem_post(sem_flag)
        if flag < 0:
            print("sem post failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
        time.sleep(0.030)
    
    sem_test.sem_close(sem_flag)
    sem_test.sem_unlink(global_sem_name)

    return 0

def linux_ipc_sem_sum():
    global global_sem_name
    global global_sem_test_count
    print("sum")
    sem_val = 10
    sem_test = linux_ipc_semaphore_pyapi()
    sem_flag = sem_test.sem_open(global_sem_name, os.O_CREAT, 0o666, 0)
    if sem_flag == 0:
        print("failed to create sem: ", global_sem_name)
        return -1

    for i in range(0, global_sem_test_count):
        flag = sem_test.sem_wait(sem_flag)
        if flag < 0:
            print("sem wait failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
        flag = sem_test.sem_getvalue(sem_flag)
        print("[",i,"],sem val",flag)
        if flag < 0:
            print("sem getval failed")
            sem_test.sem_close(sem_flag)
            sem_test.sem_unlink(global_sem_name)
            return -1
    
    sem_test.sem_close(sem_flag)
    sem_test.sem_unlink(global_sem_name)

    return 0

if __name__ == "__main__":
    selec_flag = int(sys.argv[1])
    print(selec_flag)
    if selec_flag == 1:
        linux_ipc_sem_pub()
    else:
        linux_ipc_sem_sum()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

Python进程与Python进程间通信测试

测试脚本:

# 接收端
python3 linux_ipc_pyapi_sem_test.py 2

# 发送端
python3 linux_ipc_pyapi_sem_test.py 1
  • 1
  • 2
  • 3
  • 4
  • 5

测试运行无误,基本可以使用

Python进程与C进程间通信测试

C进程测试代码:

/*
 * main.c
 *
 *  Created on: Dec 22, 2021
 *      Author: noven_zhang
 */

#include <linux_ipc_sem.h>


static char *sem_name = "/test1";
const int test_count = 100000;
int32_t linux_ipc_sem_pub()
{
	sem_t *flag = NULL;

	flag = py_sem_open(sem_name, O_CREAT, 0666, 0);

	int i = 0;
	for(i = 0; i < test_count; i++)
	{
		py_sem_post(flag);
		usleep(30000);
	}
	if (NULL == flag)
	{
		fprintf(stderr, "failed to open semaphore: %s\n", strerror(errno));
		exit(EXIT_FAILURE);
	}

	py_sem_close(flag);
	py_sem_unlink(sem_name);

	return 0;
}

int32_t linux_ipc_sem_sum()
{
	int32_t i = 0;
	int32_t sem_val = 0;
	sem_t *flag = NULL;
	flag = py_sem_open(sem_name,O_CREAT, 0666, 0);
	if (NULL == flag)
	{
		fprintf(stderr, "failed to open semaphore: %s\n", strerror(errno));
		exit(EXIT_FAILURE);
	}

	for (i = 0; i < test_count; i++)
	{
		py_sem_wait(flag);
		sem_val = py_sem_getvalue(flag);
		fprintf(stdout, "[%3d]semaphore value : %d\n", i, sem_val);
	}

	return 0;
}


int main(int argc, char **argv)
{
	/* 测试信号量使用 */

	if (argc <= 1)
	{
		fprintf(stderr, "please input argv\n");
		exit(EXIT_FAILURE);
	}

	if (1 == atoi(argv[1]))
	{
		fprintf(stdout, "publisher !\n");
		linux_ipc_sem_pub();
	}
	else if(2 == atoi(argv[1]))
	{
		fprintf(stdout, "consumer !\n");
		linux_ipc_sem_sum();
	}

	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

编译生成运行程序:

# 编译生成C进程
gcc -o linux_ipc_sem_test -llinux_ipc_sem_pyapi

# 运行python api测试进程,接收端
python3 linux_ipc_pyapi_sem_test.py 2

# 运行C测试进程,发送端
./linux_ipc_sem_test 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结论

初步测试,该方法可以实现使用python进程与C进程的信号量通信

如果有问题,请在评论区指正。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/628428
推荐阅读
相关标签
  

闽ICP备14008679号