赞
踩
跳转到根目录:知行合一:投资篇
已完成:
1、投资&技术
1.1.1 投资-编程基础-numpy
1.1.2 投资-编程基础-pandas
1.2 金融数据处理
1.3 金融数据可视化
2、投资方法论
2.1.1 预期年化收益率
2.1.2 一个关于y=ax+b的故事
2.1.3-数据标准化
2.1.4-相关性分析
2.2.1.1-一个关于定投的故(姿)事(势)
2.2.1.2-网格交易
2.2.1.3-移动平均线
3、投资实证
[3.1 2023这一年] 被鸽
网格交易,说最简单的,就是跌了买,涨了卖。
为了少说废话,这里就举1个例子:
name | open | close | date | 操作 | 操作价格 |
---|---|---|---|---|---|
券商ETF | 0.98 | 0.98 | 2016/9/14 | 买 | 0.98 |
券商ETF | 0.983 | 0.962 | 2016/9/26 | 买 | 0.97 |
券商ETF | 0.982 | 1.005 | 2016/10/18 | 卖 | 1 |
券商ETF | 1.001 | 1.025 | 2016/10/24 | 卖 | 1.01 |
券商ETF | 1.001 | 1.025 | 2016/10/24 | 卖 | 1.02 |
券商ETF | 1.021 | 1.031 | 2016/11/1 | 卖 | 1.03 |
券商ETF | 1.025 | 1.05 | 2016/11/3 | 卖 | 1.04 |
券商ETF | 1.025 | 1.05 | 2016/11/3 | 卖 | 1.05 |
券商ETF | 1.055 | 1.088 | 2016/11/11 | 卖 | 1.06 |
券商ETF | 1.055 | 1.088 | 2016/11/11 | 卖 | 1.07 |
券商ETF | 1.055 | 1.088 | 2016/11/11 | 卖 | 1.08 |
券商ETF | 1.09 | 1.097 | 2016/11/14 | 卖 | 1.09 |
券商ETF | 1.064 | 1.034 | 2016/12/5 | 买 | 1.06 |
券商ETF | 1.064 | 1.034 | 2016/12/5 | 买 | 1.05 |
券商ETF | 1.064 | 1.034 | 2016/12/5 | 买 | 1.04 |
券商ETF | 1.038 | 1.026 | 2016/12/8 | 买 | 1.03 |
券商ETF | 1.029 | 1.001 | 2016/12/12 | 买 | 1.02 |
券商ETF | 1.029 | 1.001 | 2016/12/12 | 买 | 1.01 |
券商ETF | 1.001 | 0.988 | 2016/12/14 | 买 | 1 |
券商ETF | 1.001 | 0.988 | 2016/12/14 | 买 | 0.99 |
券商ETF | 0.981 | 0.976 | 2016/12/19 | 买 | 0.98 |
券商ETF | 0.975 | 0.963 | 2016/12/23 | 买 | 0.97 |
券商ETF | 0.995 | 1.002 | 2017/2/20 | 卖 | 1 |
券商ETF | 0.974 | 0.962 | 2017/3/22 | 买 | 0.97 |
券商ETF | 0.973 | 0.96 | 2017/3/29 | 买 | 0.96 |
券商ETF | 0.95 | 0.944 | 2017/4/19 | 买 | 0.95 |
券商ETF | 0.942 | 0.937 | 2017/5/2 | 买 | 0.94 |
网格的大小,可以自己定义,我定义的是0.01作为一个网格
买入一笔后,如果后面价格涨了x*0.01(这里x可以自己定,我定的是3),那么就卖出
买入一笔后,如果后面价格跌了y*0.01(y一般就是1,也就是1网),那么就继续买入
卖出一笔后,如果后面价格跌了x*0.01(这里x可以自己定,我定的是3),那么就买入
卖出一笔后,如果后面价格涨了y*0.01(y一般就是1,也就是1网),那么就继续卖出
第一天,默认就是买入
后续是按照收盘价和之前买入或卖出的价格进行比较,看是否进行买或卖的操作
2016/9/14买入价0.98,那么,我们预期要么是在1.01卖出,要么是在0.97买入
到了2016/9/26,收盘价是0.962,是>0.97的,那我们就直接认为我们挂单了0.97买入
在2016/9/26以0.97买入之后,我们预期要么是在1.00卖出,要么是在0.96买入
到了2016/10/18,收盘价是1.005,是>1.00的,我们可以成交1笔卖出。
def __init__(self, security, start_date=None, end_date=None) -> None:
super().__init__()
# security:是这次执行的代码,一般类似512000,510300之类
# start_date、end_date:会在所有的数据行中,截取对应的时间片段。不传值,那默认就是None,就会用全量数据进行测算。
# step_price:每个网格的大小,比如我们测算的是512000,是1左右净值的,1%作为网格,是合适的。
# steps:买入一笔后,如果后面价格涨了x*0.01(这里x可以自己定,我定的是3),那么就卖出。这里的x,就是我们这里的steps
self.args = {'security': security, 'start_date': start_date, 'end_date': end_date, 'step_price': 0.01, 'steps': 3}
# 加载数据,后面会具体解释加载过程
self.daily_df = self.load_data_2_df()
# 保留本次交易的行数据,方便后面的使用(比如判断上一笔,是买入还是卖出操作)
self.last_transaction = None
# 交易历史,最后输出到csv,方便查看
self.transactions = []
def load_data_2_df(self): # 这个案例,是通过既有数据来跑的,512000,包含了从 2016-09-14 ~ 2024-03-29 的日线数据。 # 如果想要自己获取数据,可以参考之前的文章:https://blog.csdn.net/sdfiiiiii/article/details/135289226,包含了从qstock获取、处理、存储数据等 df = pd.read_csv("https://gitee.com/kelvin11/public-resources/raw/master/512000.csv") # 如果指定了start_date、end_date,就进行数据的切割 if self.args['start_date']: df = df[df["date"] >= self.args['start_date']] if self.args['end_date']: df = df[df["date"] <= self.args['end_date']] # 转换为日期类型 df['date'] = df['date'].apply(pd.to_datetime, format='%Y-%m-%d') # 按照日期的正序排序(防止数据错位) df.sort_values(by="date", ascending=True) # 设置dataframe的索引,后面取数比较简便一些. df = df.set_index("date") # 设置了date为索引之后,dataframe里面就没有date这一列了,有时候为了方便处理,还是把date给加上 df['date'] = df.index.tolist() return df
主代码,意味着,这里是执行流程的核心。
其实也比较简单,就是遍历dataframe,逐行处理数据即可。
def process(self): # dataframe的遍历,逐行处理数据 for index, row in self.daily_df.iterrows(): # index是索引,就是日期;row是Series类型,一行完整的数据 if index != self.daily_df.iloc[0]['date']: # 非第一天 # 1.1 今日收盘价如果 < 上次操作的价格,那么可能要买。 if row['close'] < self.last_transaction['价格']: # want_buy,为什么可能要买,因为在网格中,如果之前是买入,当天价格下跌没有达到下一网,是不买的。这个逻辑在want_buy内部实现 self.want_buy(row=row) # 1.2 今日收盘价如果 > 上次操作的价格,那么可能要卖,want_sell elif row['close'] > self.last_transaction['价格']: # want_sell其实和want_buy是同样的解释。因为在网格中,如果收盘价大于上次交易价格,但没有达到实际要卖出的价格,那也是不卖的。这个逻辑在want_sell内部实现 self.want_sell(row=row) else: # hold方法,其实什么都没做,return None。意思就是持有不操作。 self.hold(row=row) print() else: # 是第一天,默认就是买入 # want_buy方法,是指可能要买,为什么可能要买,因为在网格中,如果之前是买入,当天价格下跌没有达到下一网,是不买的。这个逻辑在want_buy内部实现 self.want_buy(row=row) # 遍历完所有的数据之后,将所有的交易记录通过pandas存储到csv文件中 df = pd.DataFrame(self.transactions) df.to_csv('%s交易记录.csv' % self.args['security'], index=None)
def want_buy(self, row): if row['date'] == self.daily_df.iloc[0]['date']: # 这个是第一天的逻辑,直接执行买入。today_buy就真的是买入操作了,主要是记录买入价格和当前行数据 this_trans = self.today_buy(row, row['close']) # 将买入行相关的信息,存储到self.transactions操作历史中,方便整体输出 self.transactions.append(this_trans) else: # 这里就不是第一天了,要去判断是否能买的到(比如价格下跌没到下一网位置,就不买的。) if self.last_transaction['操作'] == '买': # 如果上一次的操作是'买',那么要构造1个买入价格的阶梯 [上次买入价格, 今日收盘价],按照'网格大小'构建一个阶梯price_stairs。 # 用到的是numpy的arange方法,举例:print(np.arange(10,1,-2)) # 输出[10 8 6 4 2] start_buy_price = self.last_transaction['价格'] price_stairs = np.arange(start_buy_price, row['close'], -self.args['step_price']) print('前一天是买入,价格%s。收盘价下跌, 构造的买入阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if len(price_stairs) >= 1: # 因为,第一个价格是上次买入价格,所以不要包含在本次的买入阶梯里面,做一些切割[1:] price_stairs = price_stairs[1:] for price in price_stairs: # 一般保留3位小数即可 price = round(price, 3) # today_buy,是真正的做买入动作了。 this_trans = self.today_buy(row, price) # 将买入信息,集中存储到transactions列表中,后续输出到文件 self.transactions.append(this_trans) else: print('未达到买入阶梯价,今日不执行买入') elif self.last_transaction['操作'] == '卖': # 构造买入阶梯价,2. 如果"上一次"是卖出,那么要从"-3*网格"开始买 start_buy_price = self.last_transaction['价格'] - self.args['steps'] * self.args['step_price'] # 构造价格区间的方法,跟上面是一样的。 price_stairs = np.arange(start_buy_price, row['close'], -self.args['step_price']) print('前一天是卖出,价格%s。收盘价下跌,构造的买入阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if start_buy_price == row['close']: # 如果期望开始买的价格,正好是当天收盘价,其实就是此次要买入的价格 price_stairs = np.array([start_buy_price]) if len(price_stairs) >= 1: # 下面的逻辑,跟上面是一样的。 for price in price_stairs: price = round(price, 3) this_trans = self.today_buy(row, price) self.transactions.append(this_trans) else: print('未达到买入阶梯价,今日不执行买入')
上面方法,用到的today_buy方法,其实就几行代码:
def today_buy(self, row, price):
# 扩充行数据元素,增加2列:操作 = 买,价格 = price
self.daily_df.loc[row['date'], '操作'] = '买'
self.daily_df.loc[row['date'], '价格'] = price
# 将买入这个操作的信息,保存在临时变量 self.last_transaction 中,方便后面处理,能快速定位到上次交易是买入还是卖出,以及其价格
self.last_transaction = self.daily_df.loc[row['date']]
return self.last_transaction
want_sell和want_buy方法及其相似,就不逐行解释了。
def want_sell(self, row): # 构造卖出阶梯价,1. 如果"前一天"是卖出,那么构造卖出就是按照下一个"网格"卖 if self.last_transaction['操作'] == '卖': start_sell_price = self.last_transaction['价格'] price_stairs = np.arange(start_sell_price, row['close'], self.args['step_price']) print('前一天是卖出,价格%s。收盘价上涨, 构造的卖出阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if len(price_stairs) >= 1: price_stairs = price_stairs[1:] for price in price_stairs: price = round(price, 3) this_trans = self.today_sell(row, price) self.transactions.append(this_trans) else: print('未达到卖出阶梯价,今日不执行卖出') # 构造卖出阶梯价,2. 如果"前一天"是买入出,那么要从"+3*网格"开始卖 elif self.last_transaction['操作'] == '买': start_sell_price = self.last_transaction['价格'] + self.args['steps'] * self.args['step_price'] price_stairs = np.arange(start_sell_price, row['close'], self.args['step_price']) print('前一天是买入,价格%s。收盘价上涨,构造的买卖出阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) print(price_stairs) if start_sell_price == row['close']: price_stairs = np.array([start_sell_price]) if len(price_stairs) >= 1: for price in price_stairs: price = round(price, 3) this_trans = self.today_sell(row, price) self.transactions.append(this_trans) else: print('未达到卖出阶梯价,今日不执行卖出')
也是跟today_buy大同小异,没什么要特别说明的地方。
def today_sell(self, row, price):
# 扩充行数据元素,增加2列:操作 = 卖,价格 = price
self.daily_df.loc[row['date'], '操作'] = '卖'
self.daily_df.loc[row['date'], '价格'] = price
# 将卖出这个操作的信息,保存在临时变量 self.last_transaction 中,方便后面处理,能快速定位到上次交易是买入还是卖出,以及其价格
self.last_transaction = self.daily_df.loc[row['date']]
return self.last_transaction
完整的代码已经全部都解释完了,就是上面的几个方法组合起来,就结束,整体代码量,加上充分的注释,163行。
废话少说,上干货,直接就能跑~
import pandas as pd import numpy as np class FixedGrid: def __init__(self, security, start_date=None, end_date=None) -> None: super().__init__() # security:是这次执行的代码,一般类似512000,510300之类 # start_date、end_date:会在所有的数据行中,截取对应的时间片段。不传值,那默认就是None,就会用全量数据进行测算。 # step_price:每个网格的大小,比如我们测算的是512000,是1左右净值的,1%作为网格,是合适的。 # steps:买入一笔后,如果后面价格涨了x*0.01(这里x可以自己定,我定的是3),那么就卖出。这里的x,就是我们这里的steps self.args = {'security': security, 'start_date': start_date, 'end_date': end_date, 'step_price': 0.01, 'steps': 3} # 加载数据,后面会具体解释加载过程 self.daily_df = self.load_data_2_df() # 保留本次交易的行数据,方便后面的使用(比如判断上一笔,是买入还是卖出操作) self.last_transaction = None # 交易历史,最后输出到csv,方便查看 self.transactions = [] def load_data_2_df(self): # 这个案例,是通过既有数据来跑的,512000,包含了从 2016-09-14 ~ 2024-03-29 的日线数据。 # 如果想要自己获取数据,可以参考之前的文章:https://blog.csdn.net/sdfiiiiii/article/details/135289226,包含了从qstock获取、处理、存储数据等 df = pd.read_csv("https://gitee.com/kelvin11/public-resources/raw/master/512000.csv") # 如果指定了start_date、end_date,就进行数据的切割 if self.args['start_date']: df = df[df["date"] >= self.args['start_date']] if self.args['end_date']: df = df[df["date"] <= self.args['end_date']] # 转换为日期类型 df['date'] = df['date'].apply(pd.to_datetime, format='%Y-%m-%d') # 按照日期的正序排序(防止数据错位) df.sort_values(by="date", ascending=True) # 设置dataframe的索引,后面取数比较简便一些. df = df.set_index("date") # 设置了date为索引之后,dataframe里面就没有date这一列了,有时候为了方便处理,还是把date给加上 df['date'] = df.index.tolist() return df def today_buy(self, row, price): # 扩充行数据元素,增加2列:操作 = 买,价格 = price self.daily_df.loc[row['date'], '操作'] = '买' self.daily_df.loc[row['date'], '价格'] = price # 将买入这个操作的信息,保存在临时变量 self.last_transaction 中,方便后面处理,能快速定位到上次交易是买入还是卖出,以及其价格 self.last_transaction = self.daily_df.loc[row['date']] return self.last_transaction def today_sell(self, row, price): # 扩充行数据元素,增加2列:操作 = 卖,价格 = price self.daily_df.loc[row['date'], '操作'] = '卖' self.daily_df.loc[row['date'], '价格'] = price # 将卖出这个操作的信息,保存在临时变量 self.last_transaction 中,方便后面处理,能快速定位到上次交易是买入还是卖出,以及其价格 self.last_transaction = self.daily_df.loc[row['date']] return self.last_transaction def want_buy(self, row): if row['date'] == self.daily_df.iloc[0]['date']: # 这个是第一天的逻辑,直接执行买入。today_buy就真的是买入操作了,主要是记录买入价格和当前行数据 this_trans = self.today_buy(row, row['close']) # 将买入行相关的信息,存储到self.transactions操作历史中,方便整体输出 self.transactions.append(this_trans) else: # 这里就不是第一天了,要去判断是否能买的到(比如价格下跌没到下一网位置,就不买的。) if self.last_transaction['操作'] == '买': # 如果上一次的操作是'买',那么要构造1个买入价格的阶梯 [上次买入价格, 今日收盘价],按照'网格大小'构建一个阶梯price_stairs。 # 用到的是numpy的arange方法,举例:print(np.arange(10,1,-2)) # 输出[10 8 6 4 2] start_buy_price = self.last_transaction['价格'] price_stairs = np.arange(start_buy_price, row['close'], -self.args['step_price']) print('前一天是买入,价格%s。收盘价下跌, 构造的买入阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if len(price_stairs) >= 1: # 因为,第一个价格是上次买入价格,所以不要包含在本次的买入阶梯里面,做一些切割[1:] price_stairs = price_stairs[1:] for price in price_stairs: # 一般保留3位小数即可 price = round(price, 3) # today_buy,是真正的做买入动作了。 this_trans = self.today_buy(row, price) # 将买入信息,集中存储到transactions列表中,后续输出到文件 self.transactions.append(this_trans) else: print('未达到买入阶梯价,今日不执行买入') elif self.last_transaction['操作'] == '卖': # 构造买入阶梯价,2. 如果"上一次"是卖出,那么要从"-3*网格"开始买 start_buy_price = self.last_transaction['价格'] - self.args['steps'] * self.args['step_price'] # 构造价格区间的方法,跟上面是一样的。 price_stairs = np.arange(start_buy_price, row['close'], -self.args['step_price']) print('前一天是卖出,价格%s。收盘价下跌,构造的买入阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if start_buy_price == row['close']: # 如果期望开始买的价格,正好是当天收盘价,其实就是此次要买入的价格 price_stairs = np.array([start_buy_price]) if len(price_stairs) >= 1: # 下面的逻辑,跟上面是一样的。 for price in price_stairs: price = round(price, 3) this_trans = self.today_buy(row, price) self.transactions.append(this_trans) else: print('未达到买入阶梯价,今日不执行买入') def want_sell(self, row): # 构造卖出阶梯价,1. 如果"前一天"是卖出,那么构造卖出就是按照下一个"网格"卖 if self.last_transaction['操作'] == '卖': start_sell_price = self.last_transaction['价格'] price_stairs = np.arange(start_sell_price, row['close'], self.args['step_price']) print('前一天是卖出,价格%s。收盘价上涨, 构造的卖出阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) if len(price_stairs) >= 1: price_stairs = price_stairs[1:] for price in price_stairs: price = round(price, 3) this_trans = self.today_sell(row, price) self.transactions.append(this_trans) else: print('未达到卖出阶梯价,今日不执行卖出') # 构造卖出阶梯价,2. 如果"前一天"是买入出,那么要从"+3*网格"开始卖 elif self.last_transaction['操作'] == '买': start_sell_price = self.last_transaction['价格'] + self.args['steps'] * self.args['step_price'] price_stairs = np.arange(start_sell_price, row['close'], self.args['step_price']) print('前一天是买入,价格%s。收盘价上涨,构造的买卖出阶梯是:%s' % (self.last_transaction['价格'], price_stairs)) print(price_stairs) if start_sell_price == row['close']: price_stairs = np.array([start_sell_price]) if len(price_stairs) >= 1: for price in price_stairs: price = round(price, 3) this_trans = self.today_sell(row, price) self.transactions.append(this_trans) else: print('未达到卖出阶梯价,今日不执行卖出') def hold(self, row): return None def process(self): # dataframe的遍历,逐行处理数据 for index, row in self.daily_df.iterrows(): # index是索引,就是日期;row是Series类型,一行完整的数据 if index != self.daily_df.iloc[0]['date']: # 非第一天 # 1.1 今日收盘价如果 < 上次操作的价格,那么可能要买。 if row['close'] < self.last_transaction['价格']: # want_buy,为什么可能要买,因为在网格中,如果之前是买入,当天价格下跌没有达到下一网,是不买的。这个逻辑在want_buy内部实现 self.want_buy(row=row) # 1.2 今日收盘价如果 > 上次操作的价格,那么可能要卖,want_sell elif row['close'] > self.last_transaction['价格']: # want_sell其实和want_buy是同样的解释。因为在网格中,如果收盘价大于上次交易价格,但没有达到实际要卖出的价格,那也是不卖的。这个逻辑在want_sell内部实现 self.want_sell(row=row) else: # hold方法,其实什么都没做,return None。意思就是持有不操作。 self.hold(row=row) print() else: # 是第一天,默认就是买入 # want_buy方法,是指可能要买,为什么可能要买,因为在网格中,如果之前是买入,当天价格下跌没有达到下一网,是不买的。这个逻辑在want_buy内部实现 self.want_buy(row=row) # 遍历完所有的数据之后,将所有的交易记录通过pandas存储到csv文件中 df = pd.DataFrame(self.transactions) df.to_csv('%s交易记录.csv' % self.args['security'], index=None) if __name__ == '__main__': # this = FixedGrid(security='512000', start_date='2020-09-14', end_date='2023-09-14') this = FixedGrid(security='512000', start_date=None, end_date=None) this.process()
其实这里还是要说的,网格这个东西,我们还缺了一个前置条件:
所以,这个策略,是为了告诉我们,如果我都ok,那理论上,一年下来,我能卖出多少回?也就是做了多少次的T,能赚多少钱?
我把这个统计的结果文件,放在了:512000交易记录
为了对这个策略有信心,我只看,每年,卖出了多少回?
插个题外话,这个策略有很多改进的地方,比如:
我自己已经非常忠实的执行了网格交易有一段时间,对结果还是比较满意的,贴一下记录,有兴趣的可以交流。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。