当前位置:   article > 正文

Python实现WebSocket(心跳检查、进程通信)_python中怎么给websocket加入心跳机制

python中怎么给websocket加入心跳机制

Python实现WebSocket服务器(心跳检查、进程通信)

心跳检查

websocket适用于高频、服务端主动推送的场景需求,所以采用TCP长连接模式,长连接需要保活,即确保连接在被使用的时候是可用状态。

连接的关闭分为两种情况:

  1. 连接正常关闭,客户端调用close() 、shutdown()连接优雅关闭。   
    判断方法:服务端recv()到0字节。

  2. 连接的对端异常关闭,比如网络断掉,突然断电等。
    判断方法:

(一):利用TCP自带的保活(Keepalive)机制
作用层次:传输层

socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  • 1

保活机制会定时发送探测报文来识别对方是否可达。
一般的默认定时间隔是 2 小时,可以根据自己的需要在操作系统层面去调整这个间隔,不管是 Linux 还是 Windows 系统。

(二):自己编写心跳包程序
作用层次:应用层
上层应用主动的定时发送一个小数据包作为“心跳”,探测是否能成功送达到另外一端。 保活功能大多数情况下用于服务端探测客户端的场景,一旦识别客户端不可达,则断开连接,缓解服务端压力。

心跳检测步骤:
1服务端每隔一个时间间隔发生一个检测包给客户端
2服务端发包时启动一个超时定时器
3客户端接收到检测包,应该回应一个包
4如果服务端收到客户端的应答包,则说明客户端正常,删除超时定时器
5如果服务端的超时定时器超时,依然没有收到应答包,则说明客户端挂了

进程通信

在使用多进程时,有时候在需要多个进程之间传递数据,即进程间通信(IPC,InterProcess Communication)。IPC的方式通常有管道(包括无名管道和命名管道)、消息队列、信号量、共享存储、Socket、Streams等。其中 Socket和Streams支持不同主机上的两个进程IPC。

multiprocessing库提供了两种方法:

  1. 共享内存
    使用multiprocessing 的 Value 或 Array 将数据存储在共享内存映射中。
  2. 服务进程
    使用multiprocessing.Manager()返回的manager对象创建需要在进程之间通信的dict、list、queue等对象。
    原理:manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问,从而达到多进程间数据通信且安全。
    优点:可以支持任意对象类型,使用更灵活;单个manager对象可以通过网络由不同计算机上的进程共享。
    缺点:比使用共享内存慢。
#coding:gbk

from multiprocessing import Pool, Process,Manager
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父进程基于Manager创建Queue,并传给各个子进程:
    q = Manager().Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 等待pw结束:
    pw.join()
    # 启动子进程pr,读取:
    pr.start()
    pr.join()
    print('所有数据都写入并且读完')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

注意,如果把join注释就会报错:
AttributeError:‘ForkAwareLocal’ object has no attribute ‘connection’
原因
主进程和子进程都在执行,主进程里有个Queue,子进程要修改这个Queue。
进程和进程之间要通信的话,需要创建连接的,manager对象控制的server进程就创建了这个连接,进程之间通过连接进行操作。
主进程执行到底部,说明执行完了,会把manager里面的连接断开。
主进程把连接断开了,子进程就连接不上主进程。
调用join()要求主进程等待当前子进程的结束,这样主进程就停止下来,并没有执行完。主进程没有执行完,连接还没有断开,那子进程就可以连接它进行通信了。

WebSocket完整代码

gl.py

# 全局变量
ipc_queue = None
message_queues = None
client_socket_fd_map = None
client_socket_heartbeat_map = None
inputs = None
outputs = None
m = None
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

websocket.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import base64
import hashlib
import select
import time
import multiprocessing
import gl


HOST = '127.0.0.1'
PORT = 8003
BUF_SIZE = 8096


def web_socket_init():
    '''
    初始化进程通信用的变量
    基于Manager对象创建IPC(Inter-Process Communication进程间通信)的消息载体dict、list、Queue
    :return:
    '''
    # 客户端消息队列dict: 保存不同客户端对应的消息队列
    gl.message_queues = gl.m.dict()
    # 客户端socket dict: 保存不同客户端对应的socket
    gl.client_socket_fd_map = gl.m.dict()
    # 统一接收消息队列:保存所有业务层发送过来的消息
    gl.ipc_queue = gl.m.Queue()


def select_process():
    '''
    启动2个子进程:websocket服务器和IPC消息队列
    :return:
    '''
    ps = multiprocessing.Process(target=start_socket_select_server,
                            args=(gl.message_queues, gl.client_socket_fd_map))
    ps.start()

    pi = multiprocessing.Process(target=ipc_queue_receive,
                            args=(gl.message_queues, gl.ipc_queue))
    pi.start()

def ipc_queue_receive(mq, ipc_q):
    '''
    统一接收业务层要发送的消息,分配到各客户端对应的消息队列中
    :param mq: 客户端消息队列dict
    :param ipc_q: 统一接收消息队列
    :return:
    '''
    gl.message_queues = mq
    gl.ipc_queue = ipc_q
    print('IPC Receive')
    while True:
        info = gl.ipc_queue.get(block=True)
        fd = int(info[0])
        msg = info[1]
        if fd in gl.message_queues.keys():
            gl.message_queues[fd].put(msg)


def start_socket_select_server(mq, client_socket_fd_map):
    '''
    websocket服务器
    :param mq: 客户端消息队列dict
    :param client_socket_fd_map: 客户端socket dict
    :return:
    '''
    # Manager对象本身是server服务,无法序列化,从而无法作为函数入参传递,必须在本子进程中另外创建
    # 用途:基于此Manager对象为新加入的客户端创建一个自己的消息队列
    m = multiprocessing.Manager()

    gl.message_queues = mq
    # select读监听列表
    gl.inputs = []
    # select写监听列表
    gl.outputs = []

    socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    socketserver.bind((HOST, PORT))
    socketserver.listen(5)
    print('websocket 服务器启动成功,监听IP', (HOST, PORT))

    # 将socketserver放入select读监听列表
    gl.inputs.append(socketserver)

    # 初始化心跳检测使用变量
    gl.client_socket_fd_map = client_socket_fd_map
    gl.client_socket_heartbeat_map = {}
    heartbeat_check_time = time.clock()
    heartbeat_check_intervel_sec = 1

    while True:
        readable, writeable, exceptional = select.select(gl.inputs, gl.outputs, gl.inputs)
        # print('select finish, inputs size: %d, outputs size: %d' % (len(gl.inputs), len(gl.outputs)))
        for s in readable:
            print('readable:', s.fileno())
            if s is socketserver:
                # websocket 服务器接收客户端连接请求
                conn, address = socketserver.accept()
                print('new connection from:', address)
                # 将客户端socket放入select读监听列表
                gl.inputs.append(conn)
                # 为该客户端创建一个自己的消息队列
                q = m.Queue()
                gl.message_queues[conn.fileno()] = q
            else:
                if s not in gl.outputs and s in gl.inputs:
                    # 与新连接的客户端socket握手
                    data = s.recv(1024)
                    if data:
                        print('handshake from [%s]' % s.getpeername()[0])
                        headers = get_headers(data)
                        response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \
                                       "Upgrade:websocket\r\n" \
                                       "Connection:Upgrade\r\n" \
                                       "Sec-WebSocket-Accept:%s\r\n" \
                                       "WebSocket-Location:ws://%s%s\r\n\r\n"

                        magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
                        value = headers['Sec-WebSocket-Key'] + magic_string

                        ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())
                        response_str = response_tpl % (ac.decode('utf-8'), headers['Host'], headers['url'])

                        s.send(bytes(response_str, encoding='utf-8'))

                        # 将客户端socket放入select写监听列表
                        if s not in gl.outputs:
                            gl.outputs.append(s)
                            gl.client_socket_fd_map[s.fileno()] = s

                    # TCP连接:客户端主动关闭, 服务端会recv到0字节,反过来一样
                    else:
                        remove_connection(s.fileno(),gl.client_socket_fd_map)
                        print('1,客户端主动断开')
                else:
                    # websocket通信
                    try:
                        # 场景需求:服务器主动推送数据给客户端,客户端只需任意回复心跳检查即可,因此没有对recv的数据进行【解包】
                        info = s.recv(BUF_SIZE)
                    except Exception as e:
                        info = None
                    if info:
                        # 更新最后一次读记录,扣减心跳检查发送次数
                        if gl.client_socket_heartbeat_map[s.fileno()]['c'] > 0:
                            gl.client_socket_heartbeat_map[s.fileno()]['c'] -= 1

                    # TCP连接:客户端主动关闭, 服务端会recv到0字节,反过来一样
                    else:
                        remove_connection(s.fileno(),gl.client_socket_fd_map)
                        print('2,客户端主动断开')

        while True:
            write_doing_flag = True
            for s in writeable:
                w_fd = s.fileno()
                if w_fd not in gl.message_queues.keys():
                    continue
                if not gl.message_queues[w_fd].empty():
                    next_msg = gl.message_queues[w_fd].get_nowait()
                    send_ret = send_msg(s, next_msg)
                    print('send:', w_fd, next_msg)

                    if w_fd not in gl.client_socket_heartbeat_map.keys():
                        gl.client_socket_heartbeat_map[w_fd] = {}
                    if send_ret > 0:
                        # 更新客户端socket的写时间,重设心跳检查次数记录
                        gl.client_socket_heartbeat_map[w_fd]['w'] = time.clock()
                        gl.client_socket_heartbeat_map[w_fd]['c'] = 0

                    write_doing_flag = False
            if write_doing_flag:
                break

        # 心跳检测:判断客户端是否异常断开
        cur = time.clock()
        if cur - heartbeat_check_time > heartbeat_check_intervel_sec:
            heartbeat_check_time = cur
            tmp = gl.client_socket_heartbeat_map.copy()
            for k, v in tmp.items():
                write_delta = cur - v['w']
                count = v['c']

                # 超过10次未回应,则认为客户端异常断开,关闭该连接
                if count > 10:
                    print('k: %s, v: %s, cur: %s, write_delta: %s,' % (k, v, cur, write_delta))
                    remove_connection(k, gl.client_socket_fd_map)
                    print('心跳检测: 客户端[%s]超10次未回应,断开连接' % k)
                elif write_delta > heartbeat_check_intervel_sec:
                    # 发送心跳检查
                    msg = 'heart test'
                    send_msg(gl.client_socket_fd_map[k], msg)
                    gl.client_socket_heartbeat_map[k]['c'] += 1
                    print('k: %s, c:%d' % (k, gl.client_socket_heartbeat_map[k]['c']))


def remove_connection(fd, fd_map):
    '''
    停止对指定客户端的监听,删除相关关变量,关闭其socket连接
    :param fd: 要关闭客户端的文件描述符
    :param fd_map: 客户端socket dict
    :return:
    '''
    print('client [%s] closed' % fd)
    sock = fd_map[fd]
    gl.outputs.remove(sock)

    del gl.message_queues[fd]
    del gl.client_socket_fd_map[fd]
    del gl.client_socket_heartbeat_map[fd]

    gl.inputs.remove(sock)
    sock.close()

def get_headers(data):
    """
    将请求头格式化成字典
    :param data: 服务端接收到的数据,即conn.recv(8096)的返回值
    :return: 字典格式的请求头
    """
    header_dict = {}
    data = str(data, encoding='utf-8')

    header, body = data.split('\r\n\r\n', 1)
    header_list = header.split('\r\n')
    for i in range(0, len(header_list)):
        if i == 0:
            if len(header_list[i].split(' ')) == 3:
                header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
        else:
            k, v = header_list[i].split(':', 1)
            header_dict[k] = v.strip()
    return header_dict


def send_msg(conn, msg):
    """
    WebSocket服务端向客户端发送消息
    :param conn: 客户端连接到服务器端的socket对象,即: conn,address = socket.accept()
    :param msg_bytes: 向客户端发送的字节
    :return:
    """
    import struct

    msg_bytes = bytes(msg, encoding='utf-8')
    token = b"\x81"
    length = len(msg_bytes)
    if length < 126:
        token += struct.pack("B", length)
    elif length <= 0xFFFF:
        token += struct.pack("!BH", 126, length)
    else:
        token += struct.pack("!BQ", 127, length)

    msg = token + msg_bytes
    send_ret = conn.send(msg)
    return send_ret

def fd_send(fd, msg):
    '''
    业务层发消息的调用接口
    :param fd: 要发送消息的客户端socket对应的文件描述符
    :param msg: 消息内容
    :return:
    '''
    gl.ipc_queue.put((fd, msg))


if __name__ == '__main__':
    # 在主进程中创建Manager对象,启动一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
    gl.m = multiprocessing.Manager()
    # 初始化需要进程通信的变量
    web_socket_init()
    # 启动2个子进程:websocket服务器和IPC消息队列
    select_process()
    # 注意:主进程不能到此结束,否则Manager对象控制的server服务将断开,无法进程通信
    # 此处通过while循环模拟web application server
    while True:
        # 模拟业务层发送数据
        for i in range(10):
            time.sleep(1)
            for fd in gl.client_socket_fd_map.keys():
                fd_send(fd, str(i))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285

客户端JavaScript测试代码详见初识WebSocket及Python实现

参考文档

  1. 长连接和短连接详细解析
  2. SO_KEEPALIVE选项
  3. Socket心跳包机制及SO_KEEPALIVE套接口选项
  4. Python中多进程间通信(multiprocessing.Manager)
  5. 进程间的五种通信方式介绍
  6. Python 消息队列、多线程、多进程、协程(二)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/85421
推荐阅读
相关标签
  

闽ICP备14008679号