当前位置:   article > 正文

fastapi+mongo+qlib:体系化构建AI量化投研平台_ai技术栈中 fastapi

ai技术栈中 fastapi

百天计划之第34篇,关于“AI量化投资研究平台”建设。

从今天开始要开始一条主线——就是开始搭建整个投研平台

如果说8月开始是知识点的梳理,一些基础技术的准备(以qlib和机器学习为核心),9月开始重点是“以解决真正的投资决策问题”为目标

切入点是可转债

当下A股市场持续下行,在市场总体下行的时候,讲真的策略不是最重要的。

对于新手而言,可转债和优秀指数基金,永远是最好的标的,会让你感受到,投资其实未必有那么大的风险(提示:投资有风险)。

01 架构

如下是一个极简的AI投研架构,极简意味着非核心部件都采用最简单的组件。

定时任务没有使用dagster或者airflow,因为这意味着需要引入一套全新的技术栈,需要人力云独立运维,我们使用fastapi内置的异步task。

持久化的中间件选mongo为主,并兼容qlib数据库,便于查询与并行计算。

后端使用fastapi——由于要深度使用mongo,则弃用django,而在flask与fastapi之间,考虑到我们使用api的场景偏多,考虑对api更加友好的fastapi框架,性能也会更好。

02 价值点

重点之核心在于“为谁解决什么问题”。

“半个成品远胜于一个半成品”。

目标远大很好,但饭要一口口吃,月亮虽好,六便士也很重要。

投研平台价值点就是辅助用户投资决策效率,提升胜率

面对可转债400多支,如何快速纵览数据历史,有什么规律,方便我们去分析。内置量化好的指标,对全市场进行透视,挖掘,聚类,排序。在此基础上定制一些策略,回测并归因等等。

总之,面对实用编程。

03 后端基础框架fastapi

fastapi是一个现代,异步python框架,官网如下:

https://fastapi.tiangolo.com/zh/

安装非常轻量:

pip install fastapi

pip install "uvicorn[standard]" ——安装运行的服务器。

hello FastApi:

from typing import Union
from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "基于Fastapi的AI量化投研平台!"}

是不是非常简单,大家可能会想起Flask,是的。

Fastapi还内置精美的swagger UI,

让我们调试接口非常方便,这也是我优选Fastapi框架的原因之一。

04 基于FastApi的定时任务

传统的定时任务一般是apscheduler,它功能强大,但说实话,并不好用。

我们的定时任务其实比较简单,就是定时去做一些事情,不需要特别精确,一般就是收盘后要去更新数据,然后做一些指标的预计算,或者数据同步之类的操作。

fastapi本身是异步框架,内置了定时任务的功能。

import asyncio
from loguru import logger
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
    *,
    seconds: float,
    wait_first: bool = False,
    raise_exceptions: bool = False,
    max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    '''
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    '''
    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        '''
        将修饰函数转换为自身重复且定期调用的版本.
        '''
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False
        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0
            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)
            ensure_future(loop())
        return wrapped
    return decorator

我们基于异步功能来实现一个装饰器repeat_task。

一次封装,再使用就无比简洁了:

@app.on_event('startup')
@repeat_task(seconds=6, wait_first=True)
def repeat_task_aggregate_request_records() -> None:
    logger.info('触发重复任务: 聚合请求记录')
 

05 可转债数据

我们需要规划一下数据库表名字:

债券:bond_, 股票:stock_, 基金:fund_,不带复数s。

基础信息:_basic, 日线:_daily,财务指标 _fin

一般而言,任何一个交易品种,我们都需要先入库两种数据:

一是基础信息列表,二是交易日频数据。

基本信息只需要构建一次,若有变动不定期手动刷新即可:

def build_cb_basic():
    df = get_cb_basic()
    df['_id'] = df['ts_code']
    write_df('bond_basic', df, drop_tb_if_exist=True)

可以看出,一共A股历史上发行了799支可转债,目前可交易的400多支。

交易数据,这是需要每天收盘后去增量更新的,我们的逻辑是如果没有数据,从19900101开始读,如果有数据,则从当前日期增量更新即可。

mongo的优点可以自动忽略_id相同的行,所以特别方便。

def update_all_bond_daily():
    # 获取所有列表,有日期,从最近的日期开始读。
    items = list(get_db()['bond_basic'].find({}, {'ts_code': 1, '_id': 0}))
    if items and len(items) == 0:
        logger.error("读可转债列表为空")
        return
    for i, item in enumerate(items):
        code = item['ts_code']
        logger.debug("{}-{}-{}".format(i, code, i / len(items)))
        date = get_daily_last_date(code)
        df = get_bond_daily(code, date)
        df['_id'] = df['ts_code'] + '_' + df['trade_date']
        print(df.tail())
        write_df('bond_daily', df)
        break

小结一下:

我们开始体系化构建整体平台。

后端框架是fastapi,从fastapi内置的定时任务开始,构建可转债的基本数据和日频数据。

qlib因子分析之alphalens源码解读

基于alphalens对qlib的alpha158做单因子分析

人生B计划,不确定时代的应对之道

飞狐,科技公司CTO,用AI技术做量化投资;以投资视角观历史,解时事;专注个人成长与财富自由。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/498342
推荐阅读
相关标签
  

闽ICP备14008679号