赞
踩
笔者最近在测试星火大模型的时候,他们是使用websocket 来建立对话,而且星火大模型开放的测试代码,质量上不咋地(20231030记录),还需要对websocket有一定的了解,才适合自己微调。
安装:
pip install websocket
pip install websocket-client
参考【python: websocket获取实时数据的几种常见链接方式】常见的两种。
需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)
import time from websocket import create_connection url = 'wss://i.cg.net/wi/ws' while True: # 一直链接,直到连接上就退出循环 time.sleep(2) try: ws = create_connection(url) print(ws) break except Exception as e: print('连接异常:', e) continue while True: # 连接上,退出第一个循环之后,此循环用于一直获取数据 ws.send('{"event":"subscribe", "channel":"btc_usdt.ticker"}') response = ws.recv() print(response)
import websocket def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据 print(message) def on_error(ws, error): # 程序报错时,就会触发on_error事件 print(error) def on_close(ws): print("Connection closed ……") def on_open(ws): # 连接到服务器之后就会触发on_open事件,这里用于send数据 req = '{"event":"subscribe", "channel":"btc_usdt.deep"}' print(req) ws.send(req) if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp("wss://i.cg.net/wi/ws", on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(ping_timeout=30)
第二种方式里面,run_forever
其实是流式返回内容,大概可以看,流式输出的样例:
{"code":0,"sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":0}
### error: 'content'
{"code":0,"fileRefer":"{\"43816997a7a44a299d0bfb7c360c5838\":[2,0,1]}","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":99}
### error: 'content'
{"code":0,"content":"橘","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}
橘{"code":0,"content":"子。","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}
子。{"code":0,"content":"","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":2}
### closed ###
那么run_forever
流式输出,正常的内容如何保存呢,进入下一章
run_forever
内容保存参考【将Websocket数据保存到Pandas】
来看一下,文中的案例:
import json import pandas as pd import websocket df = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side', 'size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID']) def on_message(ws, message): msg = json.loads(message) print(msg) global df # `ignore_index=True` has to be provided, otherwise you'll get # "Can only append a Series if ignore_index=True or if the Series has a name" errors df = df.append(msg, ignore_index=True) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): return if __name__ == "__main__": ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD", on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) ws.run_forever()
其中global df
是在定义全局变量df
,可以在函数中把流式数据拿出来,还是很不错的
CallbackToIterator()
来返回在开源项目中ChuanhuChatGPT,看到了使用的方式spark.py,个人还没有尝试,只是贴在这里。
贴一下这个函数:
class CallbackToIterator: def __init__(self): self.queue = deque() self.cond = Condition() self.finished = False def callback(self, result): with self.cond: self.queue.append(result) self.cond.notify() # Wake up the generator. def __iter__(self): return self def __next__(self): with self.cond: # Wait for a value to be added to the queue. while not self.queue and not self.finished: self.cond.wait() if not self.queue: raise StopIteration() return self.queue.popleft() def finish(self): with self.cond: self.finished = True self.cond.notify() # Wake up the generator if it's waiting. # 主函数截取 def get_answer_stream_iter(self): wsParam = Ws_Param(self.appid, self.api_key, self.api_secret, self.spark_url) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp( wsUrl, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_open, ) ws.appid = self.appid ws.domain = self.domain # Initialize the CallbackToIterator ws.iterator = CallbackToIterator() # Start the WebSocket connection in a separate thread thread.start_new_thread( ws.run_forever, (), {"sslopt": {"cert_reqs": ssl.CERT_NONE}} ) # Iterate over the CallbackToIterator instance answer = "" total_tokens = 0 for message in ws.iterator: data = json.loads(message) code = data["header"]["code"] if code != 0: ws.close() raise Exception(f"请求错误: {code}, {data}") else: choices = data["payload"]["choices"] status = choices["status"] content = choices["text"][0]["content"] if "usage" in data["payload"]: total_tokens = data["payload"]["usage"]["text"]["total_tokens"] answer += content if status == 2: ws.iterator.finish() # Finish the iterator when the status is 2 ws.close() yield answer, total_tokens
截取了部分代码,这里先是定义ws.iterator = CallbackToIterator()
然后通过迭代从for message in ws.iterator:
拿出数据,看上去也是可行的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。