赞
踩
Triton是NVIDIA推出的模型推理服务器,vLLM是伯克利大学推出的大模型推理引擎。
一般而言,Triton主要负责调度策略来提高服务的吞度,比如动态批处理、多实例并发等,配合TensorRT、ONNX等后端来联合使用,后者负责推理内核来降低延迟;而在Triton+vLLM的组合中,Triton不会做任何的调度处理,而是将请求全部打给vLLM,让vLLM根据PagedAttention和异步API自行处理请求,vLLM的调度策略更适配大语言模型decode场景的KV-Cache,提高GPU的利用率,因此在Triton+vLLM的组合中,由vLLM来负责调度,而Triton负责辅助vLLM的推理过程,添加前后处理,以及部署为服务,暴露对应的HTTP和GRPC接口。
vLLM承包了推理的调度策略和推理后端,其中推理后端vLLM提供了FlashAttention,XFormers等框架配合PagedAttention作为推理内核。
Triton+vLLM的部署各部分功能介绍
笔者的机器环境为显卡driver版本为535.154.05,该驱动最高支持的cuda版本为12.2。
下载Triton的Docker镜像,到NVIDIA官网查看符合cuda版本的镜像
Triton镜像版本
下载23.10版本的Triton镜像,该镜像提供了推理服务器环境,是模型服务的基础镜像,该镜像的Python3版本为3.10
docker pull nvcr.io/nvidia/tritonserver:23.08-py3
启动容器,在容器中通过pip安装vLLM
pip install vllm -i https://pypi.tuna.tsinghua.edu.cn/simple
将容器commit为新镜像
docker commit xxxx tritonserver:vllm_env
本案例以qwen1.5-1.8b-chat模型作为部署对象,读者可以根据自身机器的情况选择qwen1.5其他尺寸的模型,部署方案不变。
和Triton的一般使用一样,创建一个model_repository,在其下创建一个模型文件vllm_qwen1.5-1.8b-chat,该目录下设置后端逻辑代码model.py,模型配置文件model.json以及服务配置文件config.pbtxt,模型文件等内容,结构如下
(vllm) [xxx@xxx vllm_qwen1.5-1.8b-chat]$ tree . ├── 1 │ ├── model.json │ ├── model.py │ └── vllm_qwen1.5-1.8b-chat │ ├── config.json │ ├── configuration.json │ ├── generation_config.json │ ├── generation_config.json.bak │ ├── LICENSE │ ├── merges.txt │ ├── model.safetensors │ ├── README.md │ ├── tokenizer_config.json │ ├── tokenizer.json │ └── vocab.json └── config.pbtxt
其中1代表版本号,默认Triton会启动最大的一个版本号为服务,config.pbtxt为模型服务的配置文件,内容如下
name: "vllm_qwen1.5-1.8b-chat" backend: "python" max_batch_size: 0 input [ {name: "prompt", data_type: TYPE_STRING, dims: [1]}, {name: "stream", data_type: TYPE_BOOL, dims: [1], optional: True}, {name: "sampling_parameters", data_type: TYPE_STRING, dims: [1], optional: True} ] output [ {name: "response", data_type: TYPE_STRING, dims: [-1]} ] model_transaction_policy { decoupled: True} instance_group [ { count: 1 kind: KIND_GPU gpus: [ 0 ] } ]
其中重点设置说明如下
在模型版本1目录下,model.json设置了vLLM读取模型文件时的配置,内容如下
{
"model": "vllm_qwen1.5-1.8b-chat",
"tokenizer": "vllm_qwen1.5-1.8b-chat",
"disable_log_requests": "true",
"gpu_memory_utilization": 0.7,
"enforce_eager": "true",
"dtype": "half",
"tensor_parallel_size": 1
}
其中model,tokenizer指定了模型和分词器的路径,tensor_parallel_size代表使用的GPU数量,gpu_memory_utilization表示允许模型的权重以及KV-Cache所占据的GPU显存的比率。model.py为后端逻代码,将在下一节单独说明。
model.py使用Python脚本实现了后端对请求的处理,内容如下
import asyncio import json import os import threading from typing import Dict, List from copy import deepcopy import logging import numpy as np from transformers import AutoTokenizer import triton_python_backend_utils as pb_utils from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.lora.request import LoRARequest from vllm.sampling_params import SamplingParams from vllm.utils import random_uuid _VLLM_ENGINE_ARGS_FILENAME = "model.json" logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) os.environ["CUDA_VISIBLE_DEVICES"] = '0' class TritonPythonModel: def initialize(self, args): self.logger = logging self.model_config = json.loads(args["model_config"]) # assert are in decoupled mode. Currently, Triton needs to use # decoupled policy for asynchronously forwarding requests to # vLLM engine. # TODO 确认decoupled模式打开 self.using_decoupled = pb_utils.using_decoupled_model_transaction_policy(self.model_config) assert self.using_decoupled, "vLLM Triton backend must be configured to use decoupled model transaction policy" # TODO vllm模型启动配置文件 engine_args_filepath = os.path.join(pb_utils.get_model_dir(), _VLLM_ENGINE_ARGS_FILENAME) assert os.path.isfile(engine_args_filepath), \ f"'{_VLLM_ENGINE_ARGS_FILENAME}' containing vllm engine args must be provided in '{pb_utils.get_model_dir()}'" with open(engine_args_filepath) as file: vllm_engine_config = json.load(file) vllm_engine_config["model"] = os.path.join(pb_utils.get_model_dir(), vllm_engine_config["model"]) vllm_engine_config["tokenizer"] = os.path.join(pb_utils.get_model_dir(), vllm_engine_config["tokenizer"]) # Create an AsyncLLMEngine from the config from JSON # TODO 读取模型和分词器 self.llm_engine = AsyncLLMEngine.from_engine_args(AsyncEngineArgs(**vllm_engine_config)) self.tokenizer = AutoTokenizer.from_pretrained(vllm_engine_config["tokenizer"], resume_download=True) output_config = pb_utils.get_output_config_by_name(self.model_config, "response") self.output_dtype = pb_utils.triton_string_to_numpy(output_config["data_type"]) # Counter to keep track of ongoing request counts self.ongoing_request_count = 0 # Starting asyncio event loop to process the received requests asynchronously. self._loop = asyncio.get_event_loop() self._loop_thread = threading.Thread(target=self.engine_loop, args=(self._loop,)) self._shutdown_event = asyncio.Event() self._loop_thread.start() def create_task(self, coro): """ Creates a task on the engine's event loop which is running on a separate thread. """ assert ( self._shutdown_event.is_set() is False ), "Cannot create tasks after shutdown has been requested" return asyncio.run_coroutine_threadsafe(coro, self._loop) def engine_loop(self, loop): """ Runs the engine's event loop on a separate thread. """ asyncio.set_event_loop(loop) self._loop.run_until_complete(self.await_shutdown()) async def await_shutdown(self): """ Primary coroutine running on the engine event loop. This coroutine is responsible for keeping the engine alive until a shutdown is requested. """ # first await the shutdown signal while self._shutdown_event.is_set() is False: await asyncio.sleep(5) # Wait for the ongoing_requests while self.ongoing_request_count > 0: self.logger.info( "[vllm] Awaiting remaining {} requests".format( self.ongoing_request_count ) ) await asyncio.sleep(5) for task in asyncio.all_tasks(loop=self._loop): if task is not asyncio.current_task(): task.cancel() self.logger.info("[vllm] Shutdown complete") def get_sampling_params_dict(self, params_json): """ This functions parses the dictionary values into their expected format. """ params_dict = json.loads(params_json) # Special parsing for the supported sampling parameters bool_keys = ["ignore_eos", "skip_special_tokens", "use_beam_search"] for k in bool_keys: if k in params_dict: params_dict[k] = bool(params_dict[k]) float_keys = [ "frequency_penalty", "length_penalty", "presence_penalty", "temperature", # TODO 如果要greedy search,temperature设置为0 "top_p", ] for k in float_keys: if k in params_dict: params_dict[k] = float(params_dict[k]) int_keys = ["best_of", "max_tokens", "min_tokens", "n", "top_k"] for k in int_keys: if k in params_dict: params_dict[k] = int(params_dict[k]) return params_dict def create_response(self, vllm_output): """ Parses the output from the vLLM engine into Triton response. """ text_outputs = [ output.text.encode("utf-8") for output in vllm_output.outputs ] triton_output_tensor = pb_utils.Tensor( "response", np.asarray(text_outputs, dtype=self.output_dtype) ) return pb_utils.InferenceResponse(output_tensors=[triton_output_tensor]) def create_stream_response(self, vllm_output, previous_outputs_lengths): """ Parses the output from the vLLM engine, extracts only newly generated text and packs it into Triton response. """ if previous_outputs_lengths is None: return self.create_response(vllm_output) text_outputs = [ (output.text[prev_output_length:]).encode("utf-8") for output, prev_output_length in zip( vllm_output.outputs, previous_outputs_lengths ) ] triton_output_tensor = pb_utils.Tensor( "response", np.asarray(text_outputs, dtype=self.output_dtype) ) return pb_utils.InferenceResponse(output_tensors=[triton_output_tensor]) def build_message(self, prompt: str, history: List[Dict] = None): history = deepcopy(history) if len(history or []) == 0: history = [{"role": "system", "content": "You are a helpful assistant."}] history.append({"role": "user", "content": prompt}) return history async def generate(self, request): """ Forwards single request to LLM engine and returns responses. """ response_sender = request.get_response_sender() self.ongoing_request_count += 1 try: request_id = random_uuid() prompt = pb_utils.get_input_tensor_by_name( request, "prompt" ).as_numpy()[0] if isinstance(prompt, bytes): prompt = prompt.decode("utf-8") stream = pb_utils.get_input_tensor_by_name(request, "stream") if stream: stream = stream.as_numpy()[0] else: stream = False # Request parameters are not yet supported via # BLS. Provide an optional mechanism to receive serialized # parameters as an input tensor until support is added parameters_input_tensor = pb_utils.get_input_tensor_by_name( request, "sampling_parameters" ) if parameters_input_tensor: parameters = parameters_input_tensor.as_numpy()[0].decode("utf-8") else: parameters = request.parameters() sampling_params_dict = self.get_sampling_params_dict(parameters) sampling_params = SamplingParams(**sampling_params_dict) prev_outputs = None # TODO 构造最终的prompt message = self.build_message(prompt) message_template = self.tokenizer.apply_chat_template( message, tokenize=False, add_generation_prompt=True ) model_inputs = self.tokenizer(message_template).input_ids async for output in self.llm_engine.generate( prompt=prompt, sampling_params=sampling_params, request_id=request_id, prompt_token_ids=model_inputs ): if response_sender.is_cancelled(): self.logger.info("[vllm] Cancelling the request") await self.llm_engine.abort(request_id) self.logger.info("[vllm] Successfully cancelled the request") break if stream: prev_outputs_lengths = None if prev_outputs is not None: prev_outputs_lengths = [ len(prev_output.text) for prev_output in prev_outputs.outputs ] if output.finished: response_sender.send( self.create_stream_response(output, prev_outputs_lengths), flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL, ) else: response_sender.send( self.create_stream_response(output, prev_outputs_lengths) ) prev_outputs = output # TODO 最后一次输出是完整的text last_output = output if not stream: response_sender.send( self.create_response(last_output), flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL, ) except Exception as e: self.logger.info(f"[vllm] Error generating stream: {e}") error = pb_utils.TritonError(f"Error generating stream: {e}") triton_output_tensor = pb_utils.Tensor( "text_output", np.asarray(["N/A"], dtype=self.output_dtype) ) response = pb_utils.InferenceResponse( output_tensors=[triton_output_tensor], error=error ) response_sender.send( response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL ) raise e finally: self.ongoing_request_count -= 1 def verify_loras(self, request): # We will check if the requested lora exists here, if not we will send a # response with `LoRA not found` information. In this way we may avoid # further processing. verified_request = None lora_error = None lora_name = None parameters_input_tensor = pb_utils.get_input_tensor_by_name( request, "sampling_parameters" ) if parameters_input_tensor: parameters = parameters_input_tensor.as_numpy()[0].decode("utf-8") sampling_params_dict = self.get_sampling_params_dict(parameters) lora_name = sampling_params_dict.pop("lora_name", None) if lora_name is not None: if not self.enable_lora: lora_error = pb_utils.TritonError("LoRA feature is not enabled.") self.logger.info( "[vllm] LoRA is not enabled, please restart the backend with LoRA enabled." ) elif lora_name not in self.supported_loras: lora_error = pb_utils.TritonError( f"LoRA {lora_name} is not supported, we currently support {self.supported_loras}" ) self.logger.info(f"[vllm] LoRA {lora_name} not found.") if lora_error is not None: output_tensor = pb_utils.Tensor( "text_output", np.asarray(["[Error] Unsupported LoRA."], dtype=self.output_dtype), ) response = pb_utils.InferenceResponse( output_tensors=[output_tensor], error=lora_error ) response_sender = request.get_response_sender() response_sender.send( response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL ) else: verified_request = request return verified_request def execute(self, requests): """ Triton core issues requests to the backend via this method. When this method returns, new requests can be issued to the backend. Blocking this function would prevent the backend from pulling additional requests from Triton into the vLLM engine. This can be done if the kv cache within vLLM engine is too loaded. We are pushing all the requests on vllm and let it handle the full traffic. """ for request in requests: request = self.verify_loras(request) if request is not None: self.create_task(self.generate(request)) return None def finalize(self): """ Triton virtual method; called when the model is unloaded. """ self.logger.info("[vllm] Issuing finalize to vllm backend") self._shutdown_event.set() if self._loop_thread is not None: self._loop_thread.join() self._loop_thread = None
以上代码对Triton Inference Server的vLLM后端的github项目实例代码做了一定的修改。
只需要关注generate在其中加入一定的前处理,比如prompt格式构造,分词编码,再调用异步vLLM引擎llm_engine.generate进行推理即可,结果返回可以修改create_response进行定义返回。
采用Docker启动tritonserver:vllm_env服务,命令如下
docker run --gpus all --rm --rm \
-p18999:8000 -p18998:8001 -p18997:8002 \
--shm-size=1G -e PYTHONIOENCODING=utf-8 \
--ulimit memlock=-1 --ulimit stack=67108864 \
-v /home/model_repository/:/models tritonserver:vllm_env\
tritonserver --model-repository=/models \
--model-control-mode explicit \
--load-model vllm_qwen1.5-1.8b-chat
暴露三个端口,其中8000对应HTTP请求,8001对应GRPC请求,可自行设置端口映射,将宿主机上的模型路径model_repository映射到容器,采用explicit摸索启动模型,手动指定启动vllm_qwen1.5-1.8b-chat,其和model_repository下的模型文件名保持一致,启动日志如下
I0429 09:29:09.299653 1 model_lifecycle.cc:461] loading: vllm_qwen1.5-1.8b-chat:1 I0429 09:29:14.111038 1 python_be.cc:2199] TRITONBACKEND_ModelInstanceInitialize: vllm_qwen1.5-1.8b-chat_0_0 (GPU device 0) WARNING 04-29 09:29:17 config.py:1011] Casting torch.bfloat16 to torch.float16. INFO 04-29 09:29:17 llm_engine.py:98] Initializing an LLM engine (v0.4.1) with config: model='/models/vllm_qwen1.5-1.8b-chat/1/vllm_qwen1.5-1.8b-chat', speculative_config=None, tokenizer='/models/vllm_qwen1.5-1.8b-chat/1/vllm_qwen1.5-1.8b-chat', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=8192, download_dir=None, load_format=auto, tensor_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=true, kv_cache_dtype=auto, quantization_param_path=None, device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='outlines'), seed=0) Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained. INFO 04-29 09:29:17 utils.py:608] Found nccl from library /root/.config/vllm/nccl/cu12/libnccl.so.2.18.1 INFO 04-29 09:29:17 selector.py:65] Cannot use FlashAttention backend for Volta and Turing GPUs. INFO 04-29 09:29:17 selector.py:33] Using XFormers backend. INFO 04-29 09:29:21 model_runner.py:173] Loading model weights took 3.4594 GB INFO 04-29 09:29:22 gpu_executor.py:119] # GPU blocks: 856, # CPU blocks: 1365 ... I0429 09:29:25.895005 1 server.cc:662] +------------------------+---------+--------+ | Model | Version | Status | +------------------------+---------+--------+ | vllm_qwen1.5-1.8b-chat | 1 | READY | +------------------------+---------+--------+ ... I0429 09:29:25.930286 1 grpc_server.cc:2513] Started GRPCInferenceService at 0.0.0.0:8001 I0429 09:29:25.930826 1 http_server.cc:4497] Started HTTPService at 0.0.0.0:8000 I0429 09:29:25.973064 1 http_server.cc:270] Started Metrics Service at 0.0.0.0:8002
使用curl直接请求,给定一个prompt,让大模型进行回答,以generate作为请求路由入口
(vllm) [xxx@xxx]$ curl -X POST localhost:18999/v2/models/vllm_qwen1.5-1.8b-chat/generate -d '{"prompt": "逻辑回归是什么?", "stream": false, "sampling_parameters": "{\"temperature\": 0.7, \"top_p\": 0.95, \"max_tokens\": 1024}"}'
{"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"逻辑回归是一种机器学习算法,它是一种二分类模型,用于预测一个连续变量(如二分类问题)的输出值,其目的是在给定一组特征数据(称为输入变量)的情况下,确定一个或多个变量(称为输出变量)的值。逻辑回归的基本思想是,通过建立一个函数来描述输入变量和输出变量之间的关系,然后通过拟合这个函数来预测输出变量的值。\n\n逻辑回归的基本步骤如下:\n\n1. 数据预处理:首先,需要对输入数据进行预处理,包括缺失值处理、异常值处理、特征缩放等,以确保数据的平稳性和一致性。\n\n2. 构建逻辑回归模型:逻辑回归模型通常由两个部分组成:一个逻辑函数(也称为逻辑门)和一个损失函数。逻辑函数用于将输入变量的特征映射到输出变量的类别,例如二分类问题中的二元逻辑函数(如sigmoid或ReLU函数);损失函数用于评估模型预测的准确性和泛化能力,通常使用均方误差(MSE)或交叉熵损失函数(Categorical Crossentropy)。\n\n3. 拟合逻辑回归模型:使用训练数据对逻辑回归模型进行拟合,通过优化损失函数的参数,使模型的预测结果与实际输出变量的类别最接近。常用的优化方法包括梯度下降、随机梯度下降、Adam优化器等。\n\n4. 模型评估和预测:在训练完成后,使用测试数据对拟合后的逻辑回归模型进行评估,通常使用准确率、精确率、召回率、F1分数等指标来评估模型的性能。然后,使用训练好的逻辑回归模型对新的输入数据进行预测,预测结果与实际输出变量的类别最接近的预测值即为最佳预测。\n\n逻辑回归在许多实际应用中都有广泛的应用,例如分类问题(如垃圾邮件分类、情感分析等)、回归问题(如房价预测、销售预测等)等。通过构建合适的逻辑回归模型,可以有效地解决这些复杂的问题,并在实际应用中发挥其预测和决策支持作用。"}
其中stream代表是否采用流式输出,sampling_parameters指定了generate的配置参数,包括温度系数temperature,top_p,最大生成token数1024等。
如果想要流式输出,使用generate_stream作为入口,设置stream为true
(vllm) [xxx@xxx]$ curl -X POST localhost:18999/v2/models/vllm_qwen1.5-1.8b-chat/generate_stream -d '{"prompt": "逻辑回归是什么?", "history": "[]", "stream": true, "sampling_parameters": "{\"temperature\": 0.7,\"top_p\": 0.95, \"max_tokens\": 20}"}' data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"逻辑"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"回归"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"是一种"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"统计"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"学习"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"方法"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":","} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"用于"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"解决"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"分类"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"和"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"回归"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"问题"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"。"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"它"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"是一种"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"监督"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"学习"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":"算法"} data: {"model_name":"vllm_qwen1.5-1.8b-chat","model_version":"1","response":","}
此时一个请求会对用多个输出,每次输出一次推理的结果,最终的回答是所有输出的拼接合并
本次压测考察推理服务在一段时间不间断不同并发数的请求下,推理成功的吞吐量和95%响应时间,具体解释如下
笔者的环境为一块GTX 2080ti的GPU,显存11G,忽略网络延迟。推理的大模型是qwen1.5-1.8b-chat,只部署一块GPU下一个实例,分别对比PyTorch作为后端部署和vLLM作为后端部署的各项压测指标,推理服务器采用Triton,压测结果如下
性能测试对比
当并发为1时,vLLM的预测延迟是Pytorch的一半,吞吐提升1倍,推理后端FlashAttention/XFormers相比于torch的sqpa效率更高,随着并发的增大,vLLM的处理异步请求和PagedAttention的优势开始体现,当并发为32时,vLLM的吞吐达到Pytorch的6倍,延迟依旧能够维持在较低水平,相比于Pytorch降低88%。简单而言,单句推理vLLM是Pytorch性能的2倍,在并发和跑批的场景下vLLM是Pytorch性能的至少6倍以上。
作为一名热心肠的互联网老兵,我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。
但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的 AI大模型资料
包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/766685
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。