当前位置:   article > 正文

mosec部署chatglm2-6B_mosec 推理

mosec 推理

公司内部要搭建chatglm2-6b聊天助手,领导希望我们研究一下,如何部署,主要是要在能够稳定服务公司内部的同时,节省GPU。我们内部讨论了一些主要有几个方向:

  1. 多实例部署:这个最简单,但就是费显卡,一个实例一个显卡(当然MIG的话可以一卡多用)
  2. dynamic batching:动态组batch这个肯定好呀,一次推理10个20个问题的,那多好
  3. fastllm:这个是chatglm-6b模型的C++优化版本,推理效率提升2~3倍至少

前前后后忙活了好一阵,回头看,只如初见。
今天来看看我们找到的Mosec库,在dynamic batching方面的优势,以及踩过的坑。有路过的大神,还请多指导。


所谓dynamic batching,我是这样理解的,两个指标:

  • batch_size
  • timeout

在timeout之前如果达到batch_size,就传递batch_size数据进行推理。
在batch_size之前如果达到timeout,就传递当前batch数进行推理。

本着这个原则,搜索了很长时间,找到了mosec这个库,非常符合我的要求。对模型、对推理完全无侵入,简单方便的实现dynamic batching需求。

1.ChatGLM部署的三个需求,MOSEC都支持

1.dynamic batching

下面是mosec dynamic batching的最简单实现,直接就可以验证是否有效的启用了dynamic batching。就是因为简单,我简单验证了一下,直接成功。

  • max_batch_size:动态batch_size最大值
  • max_wait_time:dynamic batching等待最大时间
  • 启动命令python filename.py --port=8000 --timeout=100000:可以指定端口和请求超时时间
  • 验证地址http://ip:port/inference:post请求,你的所有参数都被封装到了forward的data里面接收了
from mosec import Server, Worker

class ChatGLMWorker(Worker):
    def __init__(self):
        super().__init__()        

    def forward(self, data: str) -> Returns:
        # 验证动态batch是否生效,len>1说明生效
        print(len(data))
        return data

if __name__ == "__main__":
    server = Server()
    server.append_worker(ChatGLMWorker, max_batch_size=64, max_wait_time=1000)
    server.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
2.多实例

既然这么简单就可以开启dynamic batching,那这推理性能岂不是提升几倍都是没有任何问题的。但是一个推理实例肯定也不够用,支持多实例吗?支持,还是很简单。

  • server.append_worker中num参数指定worker数量
  • server.append_worker中env参数指定环境变量:显卡、权重等,这样切换显卡就很方便。我的需求就是每个显卡跑一个实例
from mosec import Server, Worker

class ChatGLMWorker(Worker):
    def __init__(self):
        super().__init__()     
        self.device = os.environ["CUDA_DEVICES"]
        self.checkpoint = os.environ["CHECKPOINT_PATH"]   

    def forward(self, data: str) -> Returns:
        # 验证动态batch是否生效,len>1说明生效
        print(len(data))
        return data

if __name__ == "__main__":
    # gpu nums
    NUM_DEVICE = 4
    # model weight
    checkpoint_path = "chatglm2-6b"

    def _get_worker_config(cid: int) -> dict:
        return {"CUDA_DEVICES": str(cid), "CHECKPOINT_PATH": checkpoint_path}

    server = Server()
    server.append_worker(ChatGLMWorker,
                         num=NUM_DEVICE,
                         max_batch_size=64,
                         max_wait_time=1000,
                         env=[_get_worker_config(x) for x in range(NUM_DEVICE)])
    server.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
3.流式输出

LLM时代,问答时间肯定是不能保证的,3S以下那是肯定不可能的。因此不怎么被提及的流式输出(Server Sent Event实现)变得一下子流行起来了。我需要流式输出,Mosec能做吗?Mosec做的复杂吗?能做,还简单。

  • 继承SSEWorker类,该类实现了send_stream_event方法:参数1是文本内容,参数2是batch索引号
  • forward方法中获取模型推理结果后,循环调用send_stream_event即可
  • forward方法直接返回data,官方示例让返回None,但是实际放回None会抛异常
from mosec import Server, SSEWorker

class ChatGLMWorker(SSEWorker):
    def __init__(self):
        super().__init__()     
        self.device = os.environ["CUDA_DEVICES"]
        self.checkpoint = os.environ["CHECKPOINT_PATH"]   

    def forward(self, data: str) -> Returns:
        # 验证动态batch是否生效,len>1说明生效
        print(len(data))
        for i in data:
            self.send_stream_event(i, index=i)
        return data

if __name__ == "__main__":
    # gpu nums
    NUM_DEVICE = 4
    # model weight
    checkpoint_path = "chatglm2-6b"

    def _get_worker_config(cid: int) -> dict:
        return {"CUDA_DEVICES": str(cid), "CHECKPOINT_PATH": checkpoint_path}

    server = Server()
    server.append_worker(ChatGLMWorker,
                         num=NUM_DEVICE,
                         max_batch_size=64,
                         max_wait_time=1000,
                         env=[_get_worker_config(x) for x in range(NUM_DEVICE)])
    server.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

2.Mosec部署ChatGLM的问题

既然我的需要,Mosec都支持,那还等什么,开始撸代码吧。Mosec这么简单,集成起来真的是不费九牛二虎之力。很快我就可以在我的A800上测试了,很快出现了一些意料之外的问题。

1.直接调用chatglm.stream_generate实现批量推理
  • chatglm提供的chat接口、stream_chat接口是不支持批量推理的
def chat(self, tokenizer, query: str, history: List[Tuple[str, str]] = None, max_length: int = 2048, num_beams=1,
             do_sample=True, top_p=0.7, temperature=0.95, logits_processor=None, **kwargs):
    pass
def stream_chat(self, tokenizer, query: str, history: List[Tuple[str, str]] = None, max_length: int = 2048,
                    do_sample=True, top_p=0.7, temperature=0.95, logits_processor=None, **kwarg):
    pass
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
def test_batch_generation(self):
        model, tokenizer = get_model_and_tokenizer()
        sentences = [
            "你好",
            "介绍一下清华大学"
        ]
        parameters = [(False, 2048, 1),
                      (False, 64, 1),
                      (True, 2048, 1),
                      (True, 64, 1),
                      (True, 2048, 4)]
        expected_out_sentences = [
            ['你好 你好
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/746115
推荐阅读
相关标签