赞
踩
import _thread import json import ssl import time import websocket def on_message(ws, message): print(ws) print(message) def on_error(ws, error): print(ws) print(error) def on_close(ws): print(ws) print("closed") def on_open(ws): def run(*args): # 建立连接后自动订阅 ws.send(json.dumps({})) while True: time.sleep(10) ws.send(json.dumps({"ping": int(time.time() * 1000)})) _thread.start_new_thread(run, ()) if __name__ == "__main__": url = "" websocket.enableTrace(True) ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) # wss跳过SSL验证 ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。