当前位置:   article > 正文

python连接websocket

python连接websocket
import _thread
import json
import ssl
import time
import websocket

def on_message(ws, message):
    print(ws)
    print(message)


def on_error(ws, error):
    print(ws)
    print(error)


def on_close(ws):
    print(ws)
    print("closed")


def on_open(ws):
    def run(*args):
        # 建立连接后自动订阅
        ws.send(json.dumps({}))
        while True:
            time.sleep(10)
            ws.send(json.dumps({"ping": int(time.time() * 1000)}))
    _thread.start_new_thread(run, ())


if __name__ == "__main__":
    url = ""
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(url,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    # wss跳过SSL验证
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/blog/article/detail/85471
推荐阅读
相关标签
  

闽ICP备14008679号