赞
踩
近几年国内外头部的AI公司相继发布了自己的大模型REST API 服务,一些还支持在线体验。
如果用python调用各家的REST API ,就可以批量测试不同模型的效果
本文整理了主要几个大模型提供商的REST API 用法,方便集中查阅
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class BaiduLLMs(object): ''' https://cloud.baidu.com/doc/WENXINWORKSHOP/s/clntwmv7t ''' def __init__(self): self.models_map={} self.models_map['Baidu-ERNIE-Bot-4']="chat/completions_pro" self.models_map['Baidu-ERNIE-Bot-8K']="chat/ernie_bot_8k" self.models_map['Baidu-ERNIE-Bot']="chat/completions" self.models_map['Baidu-ERNIE-Bot-turbo']="chat/eb-instant" self.models_map['Baidu-ERNIE-Bot-turbo-AI']="chat/ai_apaas" self.models_map['Baidu-BLOOMZ-7B']="chat/bloomz_7b1" self.models_map['Baidu-Qianfan-BLOOMZ-7B-compressed'] ="chat/qianfan_bloomz_7b_compressed" self.models_map['Baidu-Llama-2-7b-chat']="chat/llama_2_7b" self.models_map['Baidu-Llama-2-13b-chat']="chat/llama_2_13b" self.models_map['Baidu-Llama-2-70b-chat']="chat/llama_2_70b" self.models_map['Baidu-Qianfan-Chinese-Llama-2-7B']="chat/qianfan_chinese_llama_2_7b" self.models_map['Baidu-Qianfan-Chinese-Llama-2-13B']="chat/qianfan_chinese_llama_2_13b" self.models_map['Baidu-ChatGLM2-6B-32K']="chat/chatglm2_6b_32k" self.models_map['Baidu-XuanYuan-70B-Chat-4bit']="chat/xuanyuan_70b_chat" self.models_map['Baidu-AquilaChat-7B']="chat/aquilachat_7b" def get_access_token(self): url = "https://aip.baidubce.com/oauth/2.0/token" params = {"grant_type": "client_credentials", "client_id": os.getenv("BAIDU_API_KEY"), "client_secret": os.getenv("BAIDU_SECRET_KEY")} return str(requests.post(url, params=params).json().get("access_token")) def models(self): return self.models_map.keys() def request(self,question,model_name): if model_name not in self.models_map: return None url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/{}?access_token={}".format(self.models_map[model_name],self.get_access_token()) payload = json.dumps({ "messages": [ { "role": "user", "content": question } ], "disable_search": False, "enable_citation": False, "stream": False, "temperature":0.1, "top_p":0.85, "penalty_score":1 }) headers = { 'Content-Type': 'application/json' } for i in range(2): fp = StringIO() try: response = requests.request("POST", url, headers=headers, data=payload,timeout=30) if response.status_code == 200: result = response.json()["result"] return model_name,result else: logger.info("请求失败,状态码:", response.status_code) logger.info("请求失败,body:", response.text) time.sleep(10) except: traceback.print_exc(file=fp) print(fp.getvalue(),model_name) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=BaiduLLMs() for model_name in infer.models(): result=infer.request(question,'Baidu-ERNIE-Bot-8K') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
要实现上述功能,我们需要使用一些Python库,如`ffmpeg`和`rtmpdump`。首先,确保你已经安装了这些库。如果没有,你可以使用pip来安装: ```bash pip install ffmpeg-python ``` 以下是一个简单的Python脚本,它实现了你描述的功能: ```python import ffmpeg import os import time def save_to_mp4(input_stream, output_file, duration): if duration > 5 * 60: # 超过5分钟 # 遇到关键帧时关闭文件 if input_stream.info().key_frame == 1: input_stream.close() return input_stream.write_to(output_file) def process_rtsp(rtsp_url, output_dir): input_stream = ffmpeg.input(rtsp_url) video, audio = ffmpeg.select_streams(input_stream, 'v', 'a') duration = 0 start_time = time.time() while True: packet = video.read() if not packet: # 读取完毕 break duration += time.time() - start_time start_time = time.time() filename = os.path.join(output_dir, f"output_{int(time.time())}.mp4") save_to_mp4(video, filename, duration) # 将音视频推送到rtmp服务器 ffmpeg.input(filename).output('rtmp://127.0.0.1/live/stream', format='flv').run() os.remove(filename) # 删除已处理文件 if __name__ == "__main__": rtsp_url = "你的RTSP地址" # 替换为你的RTSP地址 output_dir = "输出目录" # 替换为你希望保存MP4文件的目录 process_rtsp(rtsp_url, output_dir) ``` 注意:这个脚本使用了`ffmpeg-python`库来处理RTSP流,并使用`ffmpeg`命令行工具来处理MP4文件和推送到RTMP服务器。确保你的系统上已经安装了`ffmpeg`。此外,这个脚本会持续读取RTSP流,并将数据保存为MP4文件,然后推送到RTMP服务器。当文件时长超过5分钟且遇到关键帧时,它会关闭文件并创建新文件。最后,它删除了已处理文件以节省空间。
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class AliLLMs(object): ''' https://help.aliyun.com/zh/dashscope/developer-reference/api-details?spm=a2c4g.11186623.0.i2 ''' def __init__(self): self.models_map={} self.models_map['Ali-qwen-turbo']="qwen-turbo" self.models_map['Ali-qwen-plus']="qwen-plus" self.models_map['Ali-qwen-max']="qwen-max" self.models_map['Ali-qwen-1.8b-chat']="qwen-1.8b-chat" self.models_map['Ali-qwen-7b-chat']="qwen-7b-chat" self.models_map['Ali-qwen-14b-chat']="qwen-14b-chat" self.models_map['Ali-qwen-72b-chat'] ="qwen-72b-chat" self.models_map['Ali-chatglm-6b-v2']="chatglm-6b-v2" self.models_map['Ali-chatglm3-6b']="chatglm3-6b" self.models_map['Ali-baichuan-7b-v1']="baichuan-7b-v1" self.models_map['Ali-baichuan2-7b-chat-v1']="baichuan2-7b-chat-v1" #self.models_map['Ali-dolly-12b-v2']="dolly-12b-v2" def models(self): return self.models_map.keys() def request(self,question,model_name): import dashscope from http import HTTPStatus dashscope.api_key=os.getenv("DASHSCOPE_API_KEY") if model_name not in self.models_map: logger.info("not find") return None for i in range(2): fp = StringIO() try: if model_name in ["Ali-chatglm-6b-v2","Ali-baichuan-7b-v1","Ali-dolly-12b-v2"]: if model_name in ["Ali-baichuan-7b-v1","Ali-dolly-12b-v2"]: response = dashscope.Generation.call( model=self.models_map[model_name], prompt=question ) else: response = dashscope.Generation.call( model=self.models_map[model_name], prompt=question, history=[] ) if response.status_code == HTTPStatus.OK: return model_name,response.output['text'] else: logger.info('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( response.request_id, response.status_code, response.code, response.output )) time.sleep(10) else: response = dashscope.Generation.call( model=self.models_map[model_name], prompt=question, seed=1234, top_p=0.85, top_k=4, result_format='message', enable_search=False, #max_tokens=1500, temperature=0.1, repetition_penalty=1.0, stream=False ) if response.status_code == HTTPStatus.OK: return model_name,response.output.choices[0]['message']['content'] else: logger.info('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( response.request_id, response.status_code, response.code, response.message )) time.sleep(10) except: traceback.print_exc(file=fp) print(fp.getvalue()) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=AliLLMs() for model_name in infer.models(): result=infer.request(question,'Ali-qwen-max') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests dashscope
python llm.py
在Python中,你可以使用OpenCV和moviepy库来实现这个需求。但是需要注意的是,OpenCV并不直接支持RTSP流的音频保存,所以这里只处理视频部分。对于音频部分,你可能需要结合其他库如gstreamer或者ffmpeg进行更深度的开发。以下是一个简化的示例: ```python import cv2 import time from moviepy.editor import VideoFileClip # 1. 接收rtsp音视频数据 cap = cv2.VideoCapture('rtsp://your_rtsp_stream') fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用mp4v编解码器 current_file_index = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: break # 2. 将视频保存为mp4格式 current_time = time.time() duration = current_time - start_time if duration > 300: # 当超过5分钟(300秒) if cv2.waitKey(1) & 0xFF == ord('q'): # 检查是否按下'q'键退出 break # 关闭当前文件并创建新文件 out.release() current_file_index += 1 file_name = f"output_{current_file_index}.mp4" out = cv2.VideoWriter(file_name, fourcc, 20.0, (frame.shape[1], frame.shape[0])) # 假设帧率为20fps start_time = current_time # 写入视频帧到当前的VideoWriter对象 out.write(frame) # 在这里添加关键帧检测逻辑,但OpenCV并没有直接提供关键帧检测API, # 通常需要通过FFmpeg或其他方式解析H.264等编码的NALU单元获取关键帧信息。 # 释放资源 cap.release() out.release() # 3. 将音视频推送到rtmp://127.0.0.1/live # 这部分较为复杂,通常会借助ffmpeg命令行工具完成,Python中可以调用os.system执行命令: # os.system("ffmpeg -re -i output.mp4 -c copy -f flv rtmp://127.0.0.1/live") # 若要在Python中完全控制推流过程,可以使用诸如libav或pyav这样的库,但这超出了本问题的范围。 ``` 注意:上述代码仅处理了视频录制与分段存储的部分,并未涉及音频及RTMP推送,实际项目中可能需要配合FFmpeg或其他工具完成全部功能。
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re import _thread as thread import base64 import datetime import hashlib import hmac import json from urllib.parse import urlparse import ssl from datetime import datetime from time import mktime from urllib.parse import urlencode from wsgiref.handlers import format_date_time import websocket # answer = "" class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, Spark_url): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.host = urlparse(Spark_url).netloc self.path = urlparse(Spark_url).path self.Spark_url = Spark_url # 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + self.host + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + self.path + " HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"' authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": self.host } # 拼接鉴权参数,生成url url = self.Spark_url + '?' + urlencode(v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 return url # 收到websocket错误的处理 def on_error(ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(ws,one,two): print(" ") # 收到websocket连接建立的处理 def on_open(ws): thread.start_new_thread(run, (ws,)) def run(ws, *args): data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question)) ws.send(data) # 收到websocket消息的处理 def on_message(ws, message): # print(message) data = json.loads(message) code = data['header']['code'] if code != 0: print(f'请求错误: {code}, {data}') ws.close() else: choices = data["payload"]["choices"] status = choices["status"] content = choices["text"][0]["content"] #print(content,end ="") global answer answer += content # print(1) if status == 2: ws.close() def gen_params(appid, domain,question): """ 通过appid和用户的提问来生成请参数 """ data = { "header": { "app_id": appid, "uid": "1234" }, "parameter": { "chat": { "domain": domain, "temperature": 0.1, "top_k":4, "max_tokens": 2048,# "auditing": "default" } }, "payload": { "message": { "text": question } } } return data def SparkAPI(appid, api_key, api_secret, Spark_url,domain, question): wsParam = Ws_Param(appid, api_key, api_secret, Spark_url) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) ws.appid = appid ws.question = question ws.domain = domain ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) class XfLLMs(object): ''' https://www.xfyun.cn/doc/spark/Web.html#_1-接口说明 ''' def __init__(self): self.models_map={} self.models_map['Xf-Spark-v1.5']={"domain":"general","url":"ws://spark-api.xf-yun.com/v1.1/chat"} self.models_map['Xf-Spark-v3.1']={"domain":"generalv3","url":"ws://spark-api.xf-yun.com/v3.1/chat"} def models(self): return self.models_map.keys() def _get_text(self,role, content):# jsoncon = {} jsoncon["role"] = role jsoncon["content"] = content return jsoncon def _get_length(self,text): length = 0 for content in text: temp = content["content"] leng = len(temp) length += leng return length def _check_len(self,text): while self._get_length(text) > 8000: del text[0] return text def request(self,question,model_name): global answer appid = os.getenv("APP_ID") api_secret = os.getenv("API_SECRET") api_key = os.getenv("API_KEY") if model_name not in self.models_map: logger.info("not find") return None for i in range(2): fp = StringIO() try: text = [] question = self._check_len([self._get_text("user", question)]) answer = "" SparkAPI( appid, api_key, api_secret, self.models_map[model_name]['url'], self.models_map[model_name]['domain'], question, ) text.clear() return model_name,answer except: traceback.print_exc(file=fp) print(fp.getvalue()) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=XfLLMs() for model_name in infer.models(): result=infer.request(question,'Xf-Spark-v3.1') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
```python import cv2 import numpy as np import time import subprocess def receive_rtsp_data(rtsp_url): cap = cv2.VideoCapture(rtsp_url) return cap def save_video_to_mp4(cap, output_path): fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4)))) return out def process_video(cap, out): frame_count = 0 start_time = time.time() while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 out.write(frame) if frame_count % 1800 == 0: # Check every 3 seconds for keyframe keyframe_detected = detect_keyframe(frame) if keyframe_detected: out.release() break current_time = time.time() if current_time - start_time > 5 * 60: # 5 minutes out.release() break def detect_keyframe(frame): # This is a simple example of keyframe detection using color histogram hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) hist = cv2.normalize(hist, hist).flatten() return np.max(hist) > 255 / 4 def push_to_rtmp(output_path): cmd = f"ffmpeg -re -i {output_path} -c copy -f flv rtmp://127.0.0.1/live" subprocess.call(cmd, shell=True) if __name__ == "__main__": rtsp_url = "rtsp://your_rtsp_url" output_path = "output.mp4" cap = receive_rtsp_data(rtsp_url) out = save_video_to_mp4(cap, output_path) process_video(cap, out) push_to_rtmp(output_path) cap.release() out.release() ```
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class BaichuanLLMs(object): ''' https://platform.baichuan-ai.com/docs/api ''' def __init__(self): self.models_map={} self.models_map['Baichuan2-Turbo']="Baichuan2-Turbo" self.models_map['Baichuan2-Turbo-192k']="Baichuan2-Turbo-192k" def models(self): return self.models_map.keys() def request(self,question,model_name): if model_name not in self.models_map: return None url = "https://api.baichuan-ai.com/v1/chat/completions" api_key = os.getenv("BAICHUAN_KEY") data = { "model": self.models_map[model_name], "messages": [ { "role": "user", "content": question } ], "stream": False, "top_p":0.85, "top_k":4, "temperature":0.1, "with_search_enhance":False } json_data = json.dumps(data) headers = { "Content-Type": "application/json", "Authorization": "Bearer " + api_key } for i in range(2): fp = StringIO() try: response = requests.post(url, data=json_data, headers=headers, timeout=60) if response.status_code == 200: return model_name,response.json()['choices'][0]['message']["content"] else: logger.info("请求失败,状态码:", response.status_code) logger.info("请求失败,body:", response.text) time.sleep(10) except: traceback.print_exc(file=fp) logger.info(fp.getvalue()) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=BaichuanLLMs() for model_name in infer.models(): result=infer.request(question,'Baichuan2-Turbo') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
{"error":{"code":"insufficient_quota","param":null,"type":"insufficient_quota","message":"You exceeded your current quota, please check your plan and billing details."}}
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class GoogleLLMs(object): def __init__(self): self.models_map={} self.models_map['Google-gemini-pro']="gemini-pro" def models(self): return self.models_map.keys() def request(self,question,model_name): import google.generativeai as genai genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) generation_config = { "temperature": 0.1, "top_p": 0.85, "top_k": 4, } if model_name not in self.models_map: return None for i in range(2): fp = StringIO() try: model = genai.GenerativeModel(self.models_map[model_name],generation_config=generation_config) response = model.generate_content(question) return model_name,response.text except: traceback.print_exc(file=fp) logger.info(fp.getvalue()) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=GoogleLLMs() for model_name in infer.models(): result=infer.request(question,'Google-gemini-pro') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n google python=3.9
conda activate google
pip install python-dotenv requests google-generativeai
pip install protobuf==3.19
python llm.py
```python import cv2 import ffmpeg import time import os # RTSP URL rtsp_url = 'rtsp://192.168.1.100:554/live/ch00_0' # Output MP4 file path output_file = 'output.mp4' # RTMP URL rtmp_url = 'rtmp://127.0.0.1/live' # Create a VideoCapture object cap = cv2.VideoCapture(rtsp_url) # Check if the VideoCapture object was successfully created if not cap.isOpened(): print("Error: Could not open RTSP stream.") exit() # Create a VideoWriter object for the output MP4 file writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), 30, (640, 480)) # Create a FFmpeg instance for RTMP streaming ffmpeg_process = ffmpeg.input(rtsp_url).output(rtmp_url, vcodec='libx264', acodec='aac').run_async(pipe_stdin=True) # Start the RTMP streaming ffmpeg_process.stdin.write(b'G') # Initialize the start time and the current file duration start_time = time.time() current_duration = 0 # Loop over the frames from the RTSP stream while True: # Read the next frame from the RTSP stream ret, frame = cap.read() # Check if the frame was successfully read if not ret: break # Write the frame to the output MP4 file writer.write(frame) # Update the current file duration current_duration = time.time() - start_time # Check if the current file duration is greater than 5 minutes if current_duration > 300: # Close the current MP4 file writer.release() # Create a new MP4 file output_file = 'output_{}.mp4'.format(int(time.time())) writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), 30, (640, 480)) # Reset the start time and the current file duration start_time = time.time() current_duration = 0 # Release the VideoCapture object cap.release() # Close the output MP4 file writer.release() # Stop the RTMP streaming ffmpeg_process.stdin.write(b'q') ffmpeg_process.wait() # Delete the MP4 files that are older than 5 minutes for file in os.listdir('.'): if file.endswith('.mp4'): file_time = os.path.getmtime(file) if time.time() - file_time > 300: os.remove(file) ```
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class AzureLLMs(object): def __init__(self): self.models_map={} self.models_map['Azure-gpt-35-turbo']={"engine":"xxx","version":"2023-07-01-preview"} self.models_map['Azure-gpt-35-turbo-16k']={"engine":"xxxx","version":"2023-07-01-preview"} def models(self): return self.models_map.keys() def request(self,question,model_name): if model_name not in self.models_map: return None import openai openai.api_type = openai.api_base = openai.api_version = self.models_map[model_name]['version'] openai.api_key = for i in range(2): fp = StringIO() try: response = openai.ChatCompletion.create( engine=self.models_map[model_name]['engine'], messages = [{"role":"system","content":"You are an AI assistant that helps people find information."}, {'role':'user','content':question}], temperature=0.1, #max_tokens=800, top_p=0.85, frequency_penalty=0, presence_penalty=0, stop=None) message = response["choices"][0]["message"]["content"] return model_name,message except: traceback.print_exc(file=fp) print(fp.getvalue()) time.sleep(10) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=AzureLLMs() for model_name in infer.models(): result=infer.request(question,'Azure-gpt-35-turbo-16k') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.9
conda activate llm
pip install python-dotenv requests
pip install openai==0.28
python llm.py
要实现这些功能,你可以使用Python中的OpenCV库和FFmpeg工具。下面是一个简单的实现示例: ```python import cv2 import subprocess def save_video(rtsp_url, output_path): cap = cv2.VideoCapture(rtsp_url) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) current_file = None current_writer = None current_duration = 0 while True: ret, frame = cap.read() if not ret: break if current_file is None or current_duration >= 300: if current_writer is not None: current_writer.release() push_to_rtmp(current_file) current_file = output_path + f"/output_{int(time.time())}.mp4" current_writer = cv2.VideoWriter(current_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) current_duration = 0 current_writer.write(frame) current_duration += 1 / fps if current_writer is not None: current_writer.release() push_to_rtmp(current_file) cap.release() def push_to_rtmp(file_path): subprocess.call(['ffmpeg', '-re', '-i', file_path, '-c:v', 'copy', '-c:a', 'copy', 'rtmp://127.0.0.1/live']) # 使用示例 save_video('rtsp://your_rtsp_url', 'output_directory') ``` 请确保你已经安装了OpenCV和FFmpeg,并将`rtsp://your_rtsp_url`替换为实际的RTSP URL,将`output_directory`替换为保存输出文件的目录。
import os from dotenv import load_dotenv load_dotenv(dotenv_path = ".env",override = True) import requests import json import time import sys,traceback from io import StringIO import re class OpenaiLLMs(object): def __init__(self): self.models_map={} self.models_map['Openai-gpt-3.5-turbo-1106'] ={"name":"gpt-3.5-turbo-1106","model":"chat"} self.models_map['Openai-text-davinci-003'] ={"name":"text-davinci-003","model":"completions"} self.models_map['Openai-gpt-3.5-turbo-0301'] ={"name":"gpt-3.5-turbo-0301","model":"chat"} self.models_map['Openai-davinci-instruct-beta'] ={"name":"davinci-instruct-beta","model":"completions"} self.models_map['Openai-gpt-3.5-turbo-16k-0613'] ={"name":"gpt-3.5-turbo-16k-0613","model":"chat"} self.models_map['Openai-gpt-3.5-turbo-instruct'] ={"name":"gpt-3.5-turbo-instruct","model":"completions"} self.models_map['Openai-gpt-3.5-turbo-16k'] ={"name":"gpt-3.5-turbo-16k","model":"chat"} self.models_map['Openai-text-davinci-001'] ={"name":"text-davinci-001","model":"completions"} self.models_map['Openai-gpt-3.5-turbo-0613'] ={"name":"gpt-3.5-turbo-0613","model":"chat"} self.models_map['Openai-gpt-3.5-turbo-instruct-0914'] ={"name":"gpt-3.5-turbo-instruct-0914","model":"completions"} self.models_map['Openai-gpt-3.5-turbo'] ={"name":"gpt-3.5-turbo","model":"chat"} self.models_map['Openai-text-davinci-002'] ={"name":"text-davinci-002","model":"completions"} def models(self): return self.models_map.keys() def request(self,question,model_name): if model_name not in self.models_map: return None key=os.getenv("OPENAI_API_KEY") headers = { 'Authorization': f'Bearer {key}', 'Content-Type':'application/json' } if self.models_map[model_name]['model']=="chat": url = "https://api.openai.com/v1/chat/completions" query = { "model":self.models_map[model_name]['name'], "messages":[ {"role":"user","content":question} ], "temperature":0.1, "top_p":0.85 } else: url = "https://api.openai.com/v1/completions" query = { "model":self.models_map[model_name]['name'], "prompt":question, "temperature":0.1, "top_p":0.85 } for i in range(2): fp = StringIO() try: response = requests.post(url=url, data=json.dumps(query), headers=headers,timeout=60) if response.status_code == 200: if self.models_map[model_name]['model']=="chat": content=response.json()['choices'][0]['message']["content"].strip() else: content=response.json()['choices'][0]['text'].strip() return model_name,content else: print("请求失败,状态码:", response.status_code) print("请求失败,body:", response.text) time.sleep(30) except: traceback.print_exc(file=fp) print(fp.getvalue(),response) time.sleep(30) return None question=u''' 采用python实现以下功能: 1.接收rtsp音视频数据 2.将音视频保存为mp4格式 3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 4.将音视频推送到rtmp://127.0.0.1/live ''' infer=OpenaiLLMs() for model_name in infer.models(): result=infer.request(question,'Openai-gpt-3.5-turbo-1106') if result: name,content=result with open("{}.txt".format(name),"w") as f: f.write(content) break
conda create -n llm python=3.9
conda activate llm
pip install python-dotenv requests
pip install openai==0.28
python llm.py
```python import cv2 import subprocess # 接收rtsp音视频数据 rtsp_url = 'rtsp://example.com/live/stream' cap = cv2.VideoCapture(rtsp_url) # 将音视频保存为mp4格式 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter('output.mp4', fourcc, 20.0, (640, 480)) # 当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件 frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if ret: out.write(frame) frame_count += 1 if frame_count % (5*60*20) == 0: # 5分钟的帧数 out.release() out = cv2.VideoWriter('output_{}.mp4'.format(frame_count), fourcc, 20.0, (640, 480)) else: break # 将音视频推送到rtmp://127.0.0.1/live rtmp_url = 'rtmp://127.0.0.1/live' command = ['ffmpeg', '-i', 'output.mp4', '-c', 'copy', '-f', 'flv', rtmp_url] subprocess.call(command) cap.release() out.release() cv2.destroyAllWindows() ``` 请注意,上述代码中使用了OpenCV库来处理音视频数据,同时使用了ffmpeg来推送音视频到rtmp服务器。在运行代码之前,请确保已经安装了OpenCV和ffmpeg,并且已经正确配置了环境变量。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。