PandasTA 源码解析(三)

# 设置文件编码为 UTF-8
# -*- coding: utf-8 -*-

# 导入必要的模块
import importlib  # 动态导入模块的工具
import os  # 提供与操作系统交互的功能
import sys  # 提供与 Python 解释器交互的功能
import types  # 提供对 Python 类型和类的支持

# 从 os.path 模块中导入指定函数,避免命名冲突
from os.path import abspath, join, exists, basename, splitext
# 从 glob 模块中导入指定函数,用于文件匹配
from glob import glob

# 导入 pandas_ta 模块,并指定 AnalysisIndicators 类
import pandas_ta
from pandas_ta import AnalysisIndicators

def bind(function_name, function, method):
    辅助函数,将自定义指标模块中定义的函数和类方法绑定到活动的 pandas_ta 实例。

        function_name (str): 在 pandas_ta 中指标的名称
        function (fcn): 指标函数
        method (fcn): 与传递的函数对应的类方法
    # 将指标函数绑定到 pandas_ta 实例
    setattr(pandas_ta, function_name, function)
    # 将类方法绑定到 AnalysisIndicators 类
    setattr(AnalysisIndicators, function_name, method)

def create_dir(path, create_categories=True, verbose=True):

        path (str): 指标树的完整路径
        create_categories (bool): 如果为 True,则创建类别子文件夹
        verbose (bool): 如果为 True,则打印结果的详细输出

    # 确保传递的目录存在/可读
    if not exists(path):
        # 如果 verbose 为 True,则打印已创建主目录的消息
        if verbose:
            print(f"[i] Created main directory '{path}'.")

    # 列出目录的内容
    # dirs = glob(abspath(join(path, '*')))

    # 可选地添加任何缺少的类别子目录
    if create_categories:
        for sd in [*pandas_ta.Category]:
            d = abspath(join(path, sd))
            if not exists(d):
                # 如果 verbose 为 True,则打印已创建空子目录的消息
                if verbose:
                    dirname = basename(d)
                    print(f"[i] Created an empty sub-directory '{dirname}'.")

def get_module_functions(module):

        module: Python 模块

        dict: 模块函数映射
            "func1_name": func1,
            "func2_name": func2,...
    module_functions = {}

    for name, item in vars(module).items():
        if isinstance(item, types.FunctionType):
            module_functions[name] = item

    return module_functions

def import_dir(path, verbose=True):
    # 确保传递的目录存在/可读
    if not exists(path):
        print(f"[X] Unable to read the directory '{path}'.")

    # 列出目录的内容
    dirs = glob(abspath(join(path, "*")))

    # 遍历整个目录,导入找到的所有模块
    # 对每个目录进行遍历
    for d in dirs:
        # 获取目录的基本名称
        dirname = basename(d)

        # 仅在目录是有效的 pandas_ta 类别时才进行处理
        if dirname not in [*pandas_ta.Category]:
            # 如果启用了详细输出,则打印消息跳过非有效 pandas_ta 类别的子目录
            if verbose:
                print(f"[i] Skipping the sub-directory '{dirname}' since it's not a valid pandas_ta category.")

        # 对该类别(目录)中找到的每个模块进行处理
        for module in glob(abspath(join(path, dirname, "*.py"))):
            # 获取模块的名称(不带扩展名)
            module_name = splitext(basename(module))[0]

            # 确保提供的路径被包含在我们的 Python 路径中
            if d not in sys.path:

            # (重新)加载指标模块
            module_functions = load_indicator_module(module_name)

            # 确定要绑定到 pandas_ta 的哪些模块函数
            fcn_callable = module_functions.get(module_name, None)
            fcn_method_callable = module_functions.get(f"{module_name}_method", None)

            # 如果找不到可调用的函数,则打印错误消息并继续下一个模块
            if fcn_callable == None:
                print(f"[X] Unable to find a function named '{module_name}' in the module '{module_name}.py'.")
            # 如果找不到可调用的方法函数,则打印错误消息并继续下一个模块
            if fcn_method_callable == None:
                missing_method = f"{module_name}_method"
                print(f"[X] Unable to find a method function named '{missing_method}' in the module '{module_name}.py'.")

            # 如果模块名称尚未在相应类别中,则将其添加到类别中
            if module_name not in pandas_ta.Category[dirname]:

            # 将函数绑定到 pandas_ta
            bind(module_name, fcn_callable, fcn_method_callable)
            # 如果启用了详细输出,则打印成功导入自定义指标的消息
            if verbose:
                print(f"[i] Successfully imported the custom indicator '{module}' into category '{dirname}'.")
# 将 import_dir 函数的文档字符串赋值给 import_dir.__doc__,用于说明该函数的作用和用法
import_dir.__doc__ = \
Import a directory of custom indicators into pandas_ta

    path (str): Full path to your indicator tree  # 参数:指定自定义指标所在目录的完整路径
    verbose (bool): If True verbose output of results  # 参数:如果为 True,则输出详细的结果信息

This method allows you to experiment and develop your own technical analysis
indicators in a separate local directory of your choice but use them seamlessly
together with the existing pandas_ta functions just like if they were part of

If you at some late point would like to push them into the pandas_ta library
you can do so very easily by following the step by step instruction here

A brief example of usage:

1. Loading the 'ta' module:
>>> import pandas as pd
>>> import pandas_ta as ta

2. Create an empty directory on your machine where you want to work with your
indicators. Invoke pandas_ta.custom.import_dir once to pre-populate it with
sub-folders for all available indicator categories, e.g.:

>>> import os
>>> from os.path import abspath, join, expanduser
>>> from pandas_ta.custom import create_dir, import_dir
>>> ta_dir = abspath(join(expanduser("~"), "my_indicators"))
>>> create_dir(ta_dir)

3. You can now create your own custom indicator e.g. by copying existing
ones from pandas_ta core module and modifying them.

IMPORTANT: Each custom indicator should have a unique name and have both
a) a function named exactly as the module, e.g. 'ni' if the module is ni.py
b) a matching method used by AnalysisIndicators named as the module but
   ending with '_method'. E.g. 'ni_method'

In essence these modules should look exactly like the standard indicators
available in categories under the pandas_ta-folder. The only difference will
be an addition of a matching class method.

For an example of the correct structure, look at the example ni.py in the
examples folder.

The ni.py indicator is a trend indicator so therefore we drop it into the
sub-folder named trend. Thus we have a folder structure like this:

├── candles/
└── trend/
.      └── ni.py
└── volume/

4. We can now dynamically load all our custom indicators located in our
designated indicators directory like this:

>>> import_dir(ta_dir)

If your custom indicator(s) loaded succesfully then it should behave exactly
like all other native indicators in pandas_ta, including help functions.

def load_indicator_module(name):
     Helper function to (re)load an indicator module.

        dict: module functions mapping
            "func1_name": func1,
            "func2_name": func2,...

    # 加载指标模块
        module = importlib.import_module(name)
    except Exception as ex:  # 捕获异常,如果加载模块出错则打印错误信息并退出程序
        print(f"[X] An error occurred when attempting to load module {name}: {ex}")

    # 刷新之前加载的模块,以便重新加载
    module = importlib.reload(module)
    # 返回模块函数的字典映射,包括模块中定义的所有函数
    return get_module_functions(module)
# `.\pandas-ta\pandas_ta\cycles\ebsw.py`

# -*- coding: utf-8 -*-
# 从 numpy 模块导入 cos 函数并命名为 npCos
from numpy import cos as npCos
# 从 numpy 模块导入 exp 函数并命名为 npExp
from numpy import exp as npExp
# 从 numpy 模块导入 nan 常量并命名为 npNaN
from numpy import nan as npNaN
# 从 numpy 模块导入 pi 常量并命名为 npPi
from numpy import pi as npPi
# 从 numpy 模块导入 sin 函数并命名为 npSin
from numpy import sin as npSin
# 从 numpy 模块导入 sqrt 函数并命名为 npSqrt
from numpy import sqrt as npSqrt
# 从 pandas 模块中导入 Series 类
from pandas import Series
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义函数 ebsw,用于计算 Even Better SineWave (EBSW) 指标
def ebsw(close, length=None, bars=None, offset=None, **kwargs):
    """Indicator: Even Better SineWave (EBSW)"""
    # 校验参数
    # 如果 length 存在且大于38,则转换为整数,否则默认为40
    length = int(length) if length and length > 38 else 40
    # 如果 bars 存在且大于0,则转换为整数,否则默认为10
    bars = int(bars) if bars and bars > 0 else 10
    # 验证 close 是否为有效 Series 对象
    close = verify_series(close, length)
    # 获取偏移量
    offset = get_offset(offset)

    # 如果 close 为空,则返回 None
    if close is None: return

    # 初始化变量
    alpha1 = HP = 0  # alpha 和 HighPass
    a1 = b1 = c1 = c2 = c3 = 0
    Filt = Pwr = Wave = 0

    lastClose = lastHP = 0
    FilterHist = [0, 0]  # 过滤器历史记录

    # 计算结果
    m = close.size
    result = [npNaN for _ in range(0, length - 1)] + [0]
    for i in range(length, m):
        # 使用周期为 length 的高通滤波器过滤短于 Duration 输入的周期成分
        alpha1 = (1 - npSin(360 / length)) / npCos(360 / length)
        HP = 0.5 * (1 + alpha1) * (close[i] - lastClose) + alpha1 * lastHP

        # 使用超级平滑滤波器平滑数据(方程 3-3)
        a1 = npExp(-npSqrt(2) * npPi / bars)
        b1 = 2 * a1 * npCos(npSqrt(2) * 180 / bars)
        c2 = b1
        c3 = -1 * a1 * a1
        c1 = 1 - c2 - c3
        Filt = c1 * (HP + lastHP) / 2 + c2 * FilterHist[1] + c3 * FilterHist[0]

        # 计算波动和功率的3根均线
        Wave = (Filt + FilterHist[1] + FilterHist[0]) / 3
        Pwr = (Filt * Filt + FilterHist[1] * FilterHist[1] + FilterHist[0] * FilterHist[0]) / 3

        # 将平均波动归一化到平均功率的平方根
        Wave = Wave / npSqrt(Pwr)

        # 更新存储和结果
        FilterHist.append(Filt)  # 添加新的 Filt 值
        FilterHist.pop(0)  # 移除列表中的第一个元素(最早的)-> 更新/修剪
        lastHP = HP
        lastClose = close[i]

    # 创建结果 Series 对象
    ebsw = Series(result, index=close.index)

    # 如果有偏移量,则进行偏移
    if offset != 0:
        ebsw = ebsw.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        ebsw.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        ebsw.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    ebsw.name = f"EBSW_{length}_{bars}"
    ebsw.category = "cycles"

    return ebsw

# 设置函数 ebsw 的文档字符串
ebsw.__doc__ = \
"""Even Better SineWave (EBSW) *beta*

This indicator measures market cycles and uses a low pass filter to remove noise.
Its output is bound signal between -1 and 1 and the maximum length of a detected
trend is limited by its length input.

Written by rengel8 for Pandas TA based on a publication at 'prorealcode.com' and
a book by J.F.Ehlers.
# 这个实现在逻辑上似乎有限制。最好实现与prorealcode中的版本完全相同,并比较行为。

    J.F.Ehlers的《Cycle Analytics for Traders》,2014


    close (pd.Series): 'close'的系列
    length (int): 最大周期/趋势周期。值在40-48之间的效果如预期,最小值为39。默认值:40。
    bars (int): 低通滤波的周期。默认值:10
    drift (int): 差异周期。默认值:1
    offset (int): 结果的偏移周期数。默认值:0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): 填充方法的类型

    pd.Series: 生成的新特征。
# 设置文件编码格式为 UTF-8
# 从当前目录下的 ebsw 模块中导入 ebsw 函数
from .ebsw import ebsw
# -*- coding: utf-8 -*-
# 从pandas_ta.overlap模块导入简单移动平均函数sma
from pandas_ta.overlap import sma
# 从pandas_ta.utils模块导入get_offset和verify_series函数
from pandas_ta.utils import get_offset, verify_series

def ao(high, low, fast=None, slow=None, offset=None, **kwargs):
    """Indicator: Awesome Oscillator (AO)"""
    # 验证参数
    # 如果fast存在且大于0,则转换为整数,否则默认为5
    fast = int(fast) if fast and fast > 0 else 5
    # 如果slow存在且大于0,则转换为整数,否则默认为34
    slow = int(slow) if slow and slow > 0 else 34
    # 如果slow小于fast,则交换它们的值
    if slow < fast:
        fast, slow = slow, fast
    # 计算_length为fast和slow中的最大值
    _length = max(fast, slow)
    # 验证high和low的Series长度为_length
    high = verify_series(high, _length)
    low = verify_series(low, _length)
    # 获取offset的偏移量
    offset = get_offset(offset)

    # 如果high或low为None,则返回空
    if high is None or low is None: return

    # 计算结果
    # 计算中间价,即(high + low) / 2
    median_price = 0.5 * (high + low)
    # 计算fast期间的简单移动平均
    fast_sma = sma(median_price, fast)
    # 计算slow期间的简单移动平均
    slow_sma = sma(median_price, slow)
    # 计算AO指标,即fast期间的SMA减去slow期间的SMA
    ao = fast_sma - slow_sma

    # 偏移结果
    if offset != 0:
        # 将结果向前偏移offset个周期
        ao = ao.shift(offset)

    # 处理填充
    # 如果kwargs中有"fillna"参数,则用指定值填充空值
    if "fillna" in kwargs:
        ao.fillna(kwargs["fillna"], inplace=True)
    # 如果kwargs中有"fill_method"参数,则使用指定的填充方法
    if "fill_method" in kwargs:
        ao.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名和分类
    # 设置AO指标的名称,格式为"AO_{fast}_{slow}"
    ao.name = f"AO_{fast}_{slow}"
    # 设置AO指标的类别为动量
    ao.category = "momentum"

    # 返回AO指标
    return ao

# 更新函数文档字符串
ao.__doc__ = \
"""Awesome Oscillator (AO)

The Awesome Oscillator is an indicator used to measure a security's momentum.
AO is generally used to affirm trends or to anticipate possible reversals.


    Default Inputs:
        fast=5, slow=34
    SMA = Simple Moving Average
    median = (high + low) / 2
    AO = SMA(median, fast) - SMA(median, slow)

    high (pd.Series): Series of 'high's
    low (pd.Series): Series of 'low's
    fast (int): The short period. Default: 5
    slow (int): The long period. Default: 34
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
# 设置文件编码为 UTF-8
# 导入所需模块
from pandas_ta import Imports
# 导入移动平均函数
from pandas_ta.overlap import ma
# 导入辅助函数
from pandas_ta.utils import get_offset, tal_ma, verify_series

def apo(close, fast=None, slow=None, mamode=None, talib=None, offset=None, **kwargs):
    """Indicator: Absolute Price Oscillator (APO)"""
    # 验证参数有效性,如果未指定则使用默认值
    fast = int(fast) if fast and fast > 0 else 12
    slow = int(slow) if slow and slow > 0 else 26
    # 如果慢周期小于快周期,交换它们
    if slow < fast:
        fast, slow = slow, fast
    # 验证并准备输入序列
    close = verify_series(close, max(fast, slow))
    # 确定移动平均的模式,默认为简单移动平均
    mamode = mamode if isinstance(mamode, str) else "sma"
    # 获取偏移量
    offset = get_offset(offset)
    # 确定是否使用 TA-Lib 库进行计算,默认为 True
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 如果输入序列为空,则返回 None
    if close is None: return

    # 计算结果
    if Imports["talib"] and mode_tal:
        # 如果 TA-Lib 可用且需要使用它,则调用 TA-Lib 库计算 APO
        from talib import APO
        # 使用 TA-Lib 计算 APO
        apo = APO(close, fast, slow, tal_ma(mamode))
        # 否则使用自定义移动平均函数计算 APO
        # 计算快速和慢速移动平均线
        fastma = ma(mamode, close, length=fast)
        slowma = ma(mamode, close, length=slow)
        # 计算 APO
        apo = fastma - slowma

    # 根据偏移量调整结果
    if offset != 0:
        apo = apo.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        apo.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        apo.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和分类
    apo.name = f"APO_{fast}_{slow}"
    apo.category = "momentum"

    # 返回计算结果
    return apo

# 设置 APO 函数的文档字符串
apo.__doc__ = \
"""Absolute Price Oscillator (APO)

The Absolute Price Oscillator is an indicator used to measure a security's
momentum.  It is simply the difference of two Exponential Moving Averages
(EMA) of two different periods. Note: APO and MACD lines are equivalent.


    Default Inputs:
        fast=12, slow=26
    SMA = Simple Moving Average
    APO = SMA(close, fast) - SMA(close, slow)

    close (pd.Series): Series of 'close's
    fast (int): The short period. Default: 12
    slow (int): The long period. Default: 26
    mamode (str): See ```help(ta.ma)```py. Default: 'sma'
    talib (bool): If TA Lib is installed and talib is True, Returns the TA Lib
        version. Default: True
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
# -*- coding: utf-8 -*-
# 导入 pandas_ta 库中的 ma 函数
from pandas_ta.overlap import ma
# 导入 pandas_ta 库中的 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义 Bias 指标函数,接受 close、length、mamode、offset 等参数
def bias(close, length=None, mamode=None, offset=None, **kwargs):
    """Indicator: Bias (BIAS)"""
    # 验证参数合法性
    # 如果未指定 length 或 length 小于等于 0,则默认为 26
    length = int(length) if length and length > 0 else 26
    # 如果未指定 mamode 或 mamode 不是字符串,则默认为 "sma"
    mamode = mamode if isinstance(mamode, str) else "sma"
    # 验证 close 是否为 Series,并指定长度为 length
    close = verify_series(close, length)
    # 获取 offset
    offset = get_offset(offset)

    # 如果 close 为 None,则返回 None
    if close is None: return

    # 计算结果
    # 计算移动平均线,参数为 mamode、close 和 length
    bma = ma(mamode, close, length=length, **kwargs)
    # 计算 Bias,即 (close / bma) - 1
    bias = (close / bma) - 1

    # 偏移
    # 如果 offset 不为 0,则对 Bias 进行偏移
    if offset != 0:
        bias = bias.shift(offset)

    # 处理填充
    # 如果 kwargs 中包含 "fillna",则使用该值填充 NaN
    if "fillna" in kwargs:
        bias.fillna(kwargs["fillna"], inplace=True)
    # 如果 kwargs 中包含 "fill_method",则使用指定的填充方法
    if "fill_method" in kwargs:
        bias.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    # 指标名称为 "BIAS_移动平均线名称",类别为 "momentum"
    bias.name = f"BIAS_{bma.name}"
    bias.category = "momentum"

    # 返回 Bias
    return bias

# 设置 Bias 函数的文档字符串
bias.__doc__ = \
"""Bias (BIAS)

Rate of change between the source and a moving average.

    Few internet resources on definitive definition.
    Request by Github user homily, issue #46

    Default Inputs:
        length=26, MA='sma'

    BIAS = (close - MA(close, length)) / MA(close, length)
         = (close / MA(close, length)) - 1

    close (pd.Series): Series of 'close's
    length (int): The period. Default: 26
    mamode (str): See ```help(ta.ma)```py. Default: 'sma'
    drift (int): The short period. Default: 1
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
# -*- coding: utf-8 -*-
# 从 pandas_ta 库中导入必要的模块
from pandas_ta import Imports
# 从 pandas_ta.utils 模块中导入 get_offset, non_zero_range, verify_series 函数
from pandas_ta.utils import get_offset, non_zero_range, verify_series

def bop(open_, high, low, close, scalar=None, talib=None, offset=None, **kwargs):
    """Indicator: Balance of Power (BOP)"""
    # 验证参数
    open_ = verify_series(open_)
    high = verify_series(high)
    low = verify_series(low)
    close = verify_series(close)
    scalar = float(scalar) if scalar else 1
    offset = get_offset(offset)
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 计算结果
    if Imports["talib"] and mode_tal:
        from talib import BOP
        bop = BOP(open_, high, low, close)
        high_low_range = non_zero_range(high, low)
        close_open_range = non_zero_range(close, open_)
        bop = scalar * close_open_range / high_low_range

    # 偏移
    if offset != 0:
        bop = bop.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        bop.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        bop.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名和分类
    bop.name = f"BOP"
    bop.category = "momentum"

    return bop

# 设置函数文档字符串
bop.__doc__ = \
"""Balance of Power (BOP)

Balance of Power measure the market strength of buyers against sellers.


    BOP = scalar * (close - open) / (high - low)

    open (pd.Series): Series of 'open's
    high (pd.Series): Series of 'high's
    low (pd.Series): Series of 'low's
    close (pd.Series): Series of 'close's
    scalar (float): How much to magnify. Default: 1
    talib (bool): If TA Lib is installed and talib is True, Returns the TA Lib
        version. Default: True
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
# 设置文件编码为 UTF-8
# 导入 DataFrame 类
from pandas import DataFrame
# 从 pandas_ta.utils 中导入函数 get_drift, get_offset, non_zero_range, verify_series
from pandas_ta.utils import get_drift, get_offset, non_zero_range, verify_series

# 定义函数 brar,计算 BRAR 指标
def brar(open_, high, low, close, length=None, scalar=None, drift=None, offset=None, **kwargs):
    """Indicator: BRAR (BRAR)"""
    # 验证参数
    # 如果 length 存在且大于 0,则将其转换为整数,否则默认为 26
    length = int(length) if length and length > 0 else 26
    # 如果 scalar 存在,则将其转换为浮点数,否则默认为 100
    scalar = float(scalar) if scalar else 100
    # 计算 high 与 open 的非零范围
    high_open_range = non_zero_range(high, open_)
    # 计算 open 与 low 的非零范围
    open_low_range = non_zero_range(open_, low)
    # 验证输入的数据列,并截取长度为 length
    open_ = verify_series(open_, length)
    high = verify_series(high, length)
    low = verify_series(low, length)
    close = verify_series(close, length)
    # 获取漂移值
    drift = get_drift(drift)
    # 获取偏移值
    offset = get_offset(offset)

    # 如果任何输入数据为空,则返回空
    if open_ is None or high is None or low is None or close is None: return

    # 计算结果
    # 计算 high_close_yesterday,即 high 与 close 的差值
    hcy = non_zero_range(high, close.shift(drift))
    # 计算 close_yesterday_low,即 close 与 low 的差值
    cyl = non_zero_range(close.shift(drift), low)
    # 将负值替换为零
    hcy[hcy < 0] = 0
    cyl[cyl < 0] = 0

    # 计算 AR 和 BR 指标
    # AR = scalar * HO 的长度为 length 的滚动和 / OL 的长度为 length 的滚动和
    ar = scalar * high_open_range.rolling(length).sum()
    ar /= open_low_range.rolling(length).sum()
    # BR = scalar * HCY 的长度为 length 的滚动和 / CYL 的长度为 length 的滚动和
    br = scalar * hcy.rolling(length).sum()
    br /= cyl.rolling(length).sum()

    # 偏移
    if offset != 0:
        ar = ar.shift(offset)
        br = ar.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        ar.fillna(kwargs["fillna"], inplace=True)
        br.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        ar.fillna(method=kwargs["fill_method"], inplace=True)
        br.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    _props = f"_{length}"
    ar.name = f"AR{_props}"
    br.name = f"BR{_props}"
    ar.category = br.category = "momentum"

    # 准备返回的 DataFrame
    brardf = DataFrame({ar.name: ar, br.name: br})
    brardf.name = f"BRAR{_props}"
    brardf.category = "momentum"

    return brardf

# 设置 brar 函数的文档字符串
brar.__doc__ = \

BR and AR

    No internet resources on definitive definition.
    Request by Github user homily, issue #46

    Default Inputs:
        length=26, scalar=100
    SUM = Sum

    HO_Diff = high - open
    OL_Diff = open - low
    HCY = high - close[-1]
    CYL = close[-1] - low
    HCY[HCY < 0] = 0
    CYL[CYL < 0] = 0
    AR = scalar * SUM(HO, length) / SUM(OL, length)
    BR = scalar * SUM(HCY, length) / SUM(CYL, length)

    open_ (pd.Series): Series of 'open's
    high (pd.Series): Series of 'high's
    low (pd.Series): Series of 'low's
    close (pd.Series): Series of 'close's
    length (int): The period. Default: 26
    scalar (float): How much to magnify. Default: 100
    drift (int): The difference period. Default: 1
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.DataFrame: ar, br columns.
# -*- coding: utf-8 -*-
# 导入必要的库
from pandas_ta import Imports
from pandas_ta.overlap import hlc3, sma
from pandas_ta.statistics.mad import mad
from pandas_ta.utils import get_offset, verify_series

# 定义计算 CCI 指标的函数
def cci(high, low, close, length=None, c=None, talib=None, offset=None, **kwargs):
    """Indicator: Commodity Channel Index (CCI)"""
    # 验证参数
    length = int(length) if length and length > 0 else 14
    c = float(c) if c and c > 0 else 0.015
    high = verify_series(high, length)
    low = verify_series(low, length)
    close = verify_series(close, length)
    offset = get_offset(offset)
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    if high is None or low is None or close is None: return

    # 计算结果
    if Imports["talib"] and mode_tal:
        from talib import CCI
        cci = CCI(high, low, close, length)
        typical_price = hlc3(high=high, low=low, close=close)
        mean_typical_price = sma(typical_price, length=length)
        mad_typical_price = mad(typical_price, length=length)

        cci = typical_price - mean_typical_price
        cci /= c * mad_typical_price

    # 偏移结果
    if offset != 0:
        cci = cci.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        cci.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        cci.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名和分类
    cci.name = f"CCI_{length}_{c}"
    cci.category = "momentum"

    return cci

# 设置函数文档
cci.__doc__ = \
"""Commodity Channel Index (CCI)

Commodity Channel Index is a momentum oscillator used to primarily identify
overbought and oversold levels relative to a mean.


    Default Inputs:
        length=14, c=0.015
    SMA = Simple Moving Average
    MAD = Mean Absolute Deviation
    tp = typical_price = hlc3 = (high + low + close) / 3
    mean_tp = SMA(tp, length)
    mad_tp = MAD(tp, length)
    CCI = (tp - mean_tp) / (c * mad_tp)

    high (pd.Series): Series of 'high's
    low (pd.Series): Series of 'low's
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 14
    c (float): Scaling Constant. Default: 0.015
    talib (bool): If TA Lib is installed and talib is True, Returns the TA Lib
        version. Default: True
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
# -*- coding: utf-8 -*-
# 从 pandas_ta 库中导入 overlap 模块中的 linreg 函数
from pandas_ta.overlap import linreg
# 从 pandas_ta 库中导入 utils 模块中的 get_drift, get_offset, verify_series 函数
from pandas_ta.utils import get_drift, get_offset, verify_series

# 定义 Chande Forcast Oscillator (CFO) 函数
def cfo(close, length=None, scalar=None, drift=None, offset=None, **kwargs):
    """Indicator: Chande Forcast Oscillator (CFO)"""
    # 验证参数
    length = int(length) if length and length > 0 else 9
    scalar = float(scalar) if scalar else 100
    # 验证 close 参数,确保其为有效的 pd.Series 对象,并应用 length 长度验证
    close = verify_series(close, length)
    # 获取 drift 参数的值
    drift = get_drift(drift)
    # 获取 offset 参数的值
    offset = get_offset(offset)

    # 如果 close 为 None,则返回 None
    if close is None: return

    # 计算 Series 的线性回归
    cfo = scalar * (close - linreg(close, length=length, tsf=True))
    cfo /= close

    # 偏移
    if offset != 0:
        cfo = cfo.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        cfo.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        cfo.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名和分类
    cfo.name = f"CFO_{length}"
    cfo.category = "momentum"

    return cfo

# 设置 CFO 函数的文档字符串
cfo.__doc__ = \
"""Chande Forcast Oscillator (CFO)

The Forecast Oscillator calculates the percentage difference between the actual
price and the Time Series Forecast (the endpoint of a linear regression line).


    Default Inputs:
        length=9, drift=1, scalar=100
    LINREG = Linear Regression

    CFO = scalar * (close - LINERREG(length, tdf=True)) / close

    close (pd.Series): Series of 'close's
    length (int): The period. Default: 9
    scalar (float): How much to magnify. Default: 100
    drift (int): The short period. Default: 1
    offset (int): How many periods to offset the result. Default: 0

    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

    pd.Series: New feature generated.
