当前位置:   article > 正文

python连接websocket

python连接websocket
import _thread
import json
import ssl
import time
import websocket

def on_message(ws, message):
    print(ws)
    print(message)


def on_error(ws, error):
    print(ws)
    print(error)


def on_close(ws):
    print(ws)
    print("closed")


def on_open(ws):
    def run(*args):
        # 建立连接后自动订阅
        ws.send(json.dumps({}))
        while True:
            time.sleep(10)
            ws.send(json.dumps({"ping": int(time.time() * 1000)}))
    _thread.start_new_thread(run, ())


if __name__ == "__main__":
    url = ""
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(url,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    # wss跳过SSL验证
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/85471
推荐阅读
相关标签
  

闽ICP备14008679号