当前位置:   article > 正文

智谱zhipuai python SDK,使用fastapi实现流式传输_fastapi流式传输

fastapi流式传输

演示代码为调用tools的功能
这边只做演示使用

Fastapi接口构造

# fastapi接口
@app.post("/v1/chat/completions", status_code=status.HTTP_200_OK)
async def create_chat_completion(request: ChatCompletionRequest):
    """
    生成聊天
    :param request:
    :return:
    """
    query = request.messages[-1].content
    print(query)
    prev_messages = request.messages[:-1]
    if len(prev_messages) and prev_messages[0].role == Role.SYSTEM:
        system = prev_messages.pop(0).content
    else:
        system = None

    system = system.replace('\n', '')
    messages = []
    messages.append(
        {
            "role": "system",
            "content": system
        },
    )
    messages.append(
        {
            "role": "user",
            "content": query
        }
    )
    return EventSourceResponse(tools_select(messages))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

功能实现

调用的tools_select

该函数的功能主要是开一个新的线程来fetch_data(),用于获取返回的信息,因为zhipu的sdk是同步操作,所以我们要同步执行,异步获取返回信息

async def tools_select(messages):
    """
    数据获取(同步操作)在一个单独的线程中进行,不会阻塞主事件循环。
    数据的发送(异步操作)通过异步生成器逐个进行,每当有数据可用时就立即处理和发送。
    :param messages:
    :return:
    """
    # Queue 提供了阻塞(如 get()、put())和非阻塞(如 get_nowait()、put_nowait())的方法来处理队列元素。
    # 阻塞操作会等待直到队列中有可用的数据或者有空间来放置新的数据。
    q = queue.Queue()  # queue.Queue() 是 Python 中的一个标准库,用于在多个线程之间安全地交换信息或数据

    # 创建并启动数据获取线程
    thread = threading.Thread(target=fetch_data, args=(q, messages))
    thread.start()

    # 异步处理队列中的数据
    while True:
        content = await asyncio.to_thread(q.get) # 从队列中获取数据
        if content is None:  # 使用 None 作为结束信号
            break

        content = content.replace('\n', '').strip()
        content = re.sub(r'\s+', ' ', content)
        yield content

    thread.join()  # 确保线程完成
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
'
运行

fetch_data()函数的实现

使用Queue来发送消息,由于需要判断是否调用工具这边又封装了一个函数process_async_generator()

def fetch_data(q, messages):
    response = client.chat.completions.create(
        model="GLM-3-Turbo",
        messages=messages,
        tools=tools,
        stream=True,
    )
    for chunk in response:
        print(chunk.choices[0].delta)
        print(chunk.choices[0].delta.tool_calls)
        if chunk.choices[0].delta.tool_calls:
            messages.append(chunk.choices[0].delta.model_dump())
            # parse_function_call(chunk, messages)
            asyncio.run(process_async_generator(q, chunk, messages))
        else:
            q.put(chunk.choices[0].delta.content)
    q.put(None)  # 发送结束信号
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
'
运行

process_async_generator()函数的实现

该函数主要是用来处理parse_function_call工具调用的功能,同样通过异步操作实现了流式的返回

async def parse_function_call(model_response: ChatCompletionChunk, messages):
    # 处理函数调用结果,根据模型返回参数,调用对应的函数。
    # 调用函数返回结果后构造tool message,再次调用模型,将函数结果输入模型
    # 模型会将函数调用结果以自然语言格式返回给用户。
    if model_response.choices[0].delta.tool_calls:
        tool_call = model_response.choices[0].delta.tool_calls[0]
        args = tool_call.function.arguments
        print("args", args)
        # str转dict
        # args = json.loads(args)
        function_result = {}
        if tool_call.function.name == "search_score":
            function_result = search_score(**json.loads(args))
        messages.append({
            "role": "system",
            "content": f"请把查询出来的所有内容以Markdown的格式都返回给用户,包括学校的联系方式地址等,如果没有返回内容就正常回答",
        })
        messages.append({
            "role": "tool",
            "content": f"{json.dumps(function_result)}",
            "tool_call_id": tool_call.id
        })
        response = client.chat.completions.create(
            model="GLM-3-Turbo",
            # model='glm-4',
            messages=messages,
            tools=tools,
            stream=True,
            temperature=0.8
        )
        for chunk in response:
            content = chunk.choices[0].delta.content
            yield content


async def process_async_generator(q, chunk, messages):
    async for result in parse_function_call(chunk, messages):
        q.put(result)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/825200
推荐阅读
相关标签
  

闽ICP备14008679号