赞
踩
在当今的大数据时代,数据量呈爆炸式增长,如何高效地处理这些海量数据成为了一个重要的问题。Stream流式输出作为一种新型的数据处理方式,能够实时处理数据,提高数据处理效率,因此受到了广泛的关注和应用。本文将介绍Stream流式输出的概念、优点、应用场景以及实现方式。
一、Stream流式输出的概念
Stream流式输出是一种数据处理方式,它将数据以流的形式进行传输和处理。在这种处理方式中,数据不再是集中存储在某个地方,而是以分散的方式存储在各个节点上,并不断流动。数据流的处理是在流动的过程中完成的,因此能够实时地处理数据,提高了数据处理效率。
二、Stream流式输出的优点
实时性:Stream流式输出能够实时地处理数据,减少了数据处理的延迟,使得数据处理的结果更加及时。
高效性:由于数据是分散存储的,因此可以并行处理数据,提高了数据处理效率。同时,由于数据处理是在流动的过程中完成的,可以避免数据的重复传输和处理。
可扩展性:Stream流式输出具有良好的可扩展性,当数据量增加时,可以通过增加节点来扩展系统的处理能力。
灵活性:Stream流式输出可以灵活地处理各种类型的数据,包括结构化数据、半结构化数据和非结构化数据。
三、Stream流式输出的应用场景
实时数据分析:通过Stream流式输出,可以对海量数据进行实时分析,从而得到实时的分析结果。例如,在金融领域中,可以对股票交易数据进行实时分析,得到实时的股票走势预测。
实时推荐系统:通过Stream流式输出,可以根据用户的实时行为数据,推荐个性化的内容。例如,在电商平台上,可以根据用户的浏览和购买行为,推荐相关的商品和活动。
实时监控系统:通过Stream流式输出,可以对各种类型的实时数据进行监控,如网络流量、设备运行状态等。这种系统可以及时发现异常情况,并采取相应的措施进行处理。
语音识别和自然语言处理:通过Stream流式输出,可以对语音数据进行实时识别和处理,实现语音转文字、机器翻译等功能。这种技术可以大大提高语音识别的准确性和实时性。
物联网数据处理:物联网设备会产生大量的实时数据,通过Stream流式输出可以对这些数据进行实时处理和分析,从而更好地了解设备的运行状况和环境情况。
四、Stream流式输出的实现方式
选择合适的编程语言和框架:例如Python、Java和Apache Flink等都是实现Stream流式输出的常用编程语言和框架。这些框架提供了丰富的API和工具,使得开发人员可以更加方便地实现Stream流式输出。
数据源接入:需要将各种类型的数据源接入到Stream流式输出系统中,如数据库、消息队列、API接口等。接入过程中需要考虑数据的格式、传输协议等问题。
数据处理逻辑编写:根据实际需求编写数据处理逻辑,可以使用各种函数和算子对数据进行过滤、转换、聚合等操作。在编写逻辑时需要考虑数据的实时性和准确性问题。
分布式部署和集群管理:由于Stream流式输出需要处理海量数据,因此需要进行分布式部署和集群管理。需要考虑节点的负载均衡、容错等问题。
五、Python中fastapi的StreamingResponse流式输出
FastAPI是一个现代、快速(高性能)的基于Python 3.6+的Web框架,用于构建API。它旨在使API设计和开发更加简单和直观。FastAPI使用标准Python类型提示,使其具有出色的代码可读性和可靠性。
FastAPI中的StreamingResponse流式输出介绍:
在FastAPI中,StreamingResponse用于处理大文件或大量数据,使其能够以流的形式发送给客户端。这对于发送大型文件或实时生成的内容非常有用,因为它允许服务器逐步生成和发送数据,而不是一次性加载所有数据到内存中。
以下是如何使用StreamingResponse的一个简单示例:
from fastapi import FastAPI, StreamingResponse
import os
app = FastAPI()
@app.get("/stream")
async def read_stream():
file_path = "large_file.txt" # 这是一个大文件路径
response = StreamingResponse(io.BytesIO(b""), media_type="application/octet-stream", filename=os.path.basename(file_path))
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
response.body.write(chunk)
return response
在上述代码中,我们首先导入必要的模块,然后定义了一个路由处理函数read_stream。这个函数读取一个大文件,并使用StreamingResponse逐步将文件内容发送给客户端。
使用StreamingResponse的注意事项:
性能和内存管理:流式响应主要用于处理大型数据,如文件上传或下载。对于小数据或简单的响应,通常不需要使用流式响应,因为它可能会增加额外的复杂性和开销。
客户端支持:流式响应需要客户端支持持久连接(HTTP keep-alive)。如果客户端不支持持久连接,那么流式响应将不起作用。因此,请确保客户端配置正确。
错误处理:当处理流式响应时,应特别注意错误处理。如果流式生成器出现异常或错误,应确保适当地捕获和处理这些异常,以避免影响整个应用程序的稳定性。
资源释放:在使用流式响应时,要确保及时释放资源。在上述示例中,我们使用了with open(file_path, “rb”) as f:来确保文件资源在操作完成后被正确关闭。
媒体类型和内容协商:当使用流式响应时,应考虑客户端请求的媒体类型和内容协商。根据客户端的需求和请求头信息,选择适当的媒体类型(如application/json、application/octet-stream等)来返回响应。
六、示例
一个完整的加载chatglm训练后模型的流式输出例子:
from fastapi import FastAPI, Request
from transformers import AutoConfig, AutoModel, AutoTokenizer
import os
import uvicorn, json, datetime
import torch
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from fastapi.responses import StreamingResponse
import jieba
from collections import Counter
DEVICE = "cuda"
DEVICE_ID = "0"
CUDA_DEVICE = f"{DEVICE}:{DEVICE_ID}" if DEVICE_ID else DEVICE
def get_model():
CHECKPOINT_PATH="output/adgen-chatglm2-6b-pt-512-2e-2/checkpoint-1000"
tokenizer = AutoTokenizer.from_pretrained("/home/otxdev/Langchain-Chatchat-v1/chatglm2-6b", trust_remote_code=True)
config = AutoConfig.from_pretrained("/home/otxdev/Langchain-Chatchat-v1/chatglm2-6b", trust_remote_code=True, pre_seq_len=512)
model = AutoModel.from_pretrained("/home/otxdev/Langchain-Chatchat-v1/chatglm2-6b", config=config, trust_remote_code=True)
prefix_state_dict = torch.load(os.path.join(CHECKPOINT_PATH, "pytorch_model.bin"))
new_prefix_state_dict = {}
for k, v in prefix_state_dict.items():
if k.startswith("transformer.prefix_encoder."):
new_prefix_state_dict[k[len("transformer.prefix_encoder."):]] = v
model.transformer.prefix_encoder.load_state_dict(new_prefix_state_dict)
model=model.half().cuda()
model.transformer.prefix_encoder.float().cuda()
model=model.eval()
return tokenizer, model
tokenizer, model = get_model()
def torch_gc():
if torch.cuda.is_available():
with torch.cuda.device(CUDA_DEVICE):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
app = FastAPI()
#此处代码解决跨域问题
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Item(BaseModel):
query: str
knowledge_base_name: str
top_k: int
score_threshold: int
history: list
stream: bool
model_name: str
temperature: float
prompt_name: str
@app.post("/chat/chat")
async def create_item(query: Item):
async def chat_iterator():
global model, tokenizer
# response=["我","是一只","羊"]
def count_words(response):
words = jieba.lcut(response)
word_count = Counter(words)
return len(word_count)
a = count_words(dict(query)["query"])
print(a)
current_length = 0
for token1, history, past_key_values in model.stream_chat(tokenizer, dict(query)["query"],
dict(query)["history"],
past_key_values=None,
return_past_key_values=True):
print(token1)
token=token1[current_length:]
b = count_words(token)
yield "data: " + json.dumps(
{"text": token, "token_user": a, "token_answer": b}, ensure_ascii=False) + "\n\n"
current_length = len(token1)
return StreamingResponse(chat_iterator(), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7861)
大家可以根据例子来修改自己的流式api代码来实现流式输出概念。
更新:一个例子
api.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
# 流式 API 路由,输出 1 到 100 的数字
@app.get("/multimodal")
async def stream_numbers():
async def generate():
for number in range(1, 101):
# 每次 yield 一个数字
yield f"{number}\n"
await asyncio.sleep(0.1) # 为了演示,添加一个短暂的延迟
return StreamingResponse(generate(), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8501)
请求.py
import requests
url = 'http://localhost:8501/multimodal'
headers = {'Content-Type': 'application/json', 'Authorization': ''}
json_data = {'question': "这个图片里面讲的是什么?",
'history': [],
'imagepath': "/home/otxdev/Qwen-VL/demo.jpg",
"stream": True}
# 发起 POST 请求,设置 stream 参数为 True
response = requests.get(url, headers=headers, json=json_data, stream=True)
# 检查响应状态码
if response.status_code == 200:
# 使用 iter_content 迭代处理响应内容
for chunk in response.iter_content(chunk_size=128):
# 处理每一部分的数据,这里可以根据需要进行处理
print(chunk)
else:
print(f"Request failed with status code: {response.status_code}")
print(response.text)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。