赞
踩
day04-Max与Min-中位数-统计-卷积-移动与指数平均线-布林带-线性模型
4.最大值和最小值
1、max/min:
numpy.max(数组),返回数组中的最大值
numpy.min(数组),返回数组中的最小值
2、argmax/argmin
numpy.argmax(数组),返回数组中的最大值的索引
numpy.argmin(数组),返回数组中的最小值的索引
3、maximum/minimum
numpy.maximum(数组1, 数组2, ...),将维度相同元素中的最大值返回
numpy.minimum(数组1, 数组2, ...),将维度相同元素中的最大值返回
4、ptp:返回一个数组中最大值和最小值的差
代码:maxmin.py
示例:
import numpy as np
a = np.random.randint(10,100,9).reshape(3,3)#随机整数在[10,100)中取9个随机数
print(a)
print(np.max(a),np.argmax(a))
print(np.min(a),np.argmin(a))
print(np.ptp(a))
b = np.random.randint(10,100,9).reshape(3,3)
print(b)
print(np.ptp(b))
print(np.maximum(a,b))
print(np.minimum(a,b))
print(np.ptp(np.minimum(a,b)))
练习:计算股票的价格波动范围:在一定时期内最高的最高价减去最低的最低价。
代码:range.py
示例:
import numpy as np
dates,highest_prices,lowest_prices = np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,4,5),dtype='U10,f8,f8',unpack=True
)
max_price = np.max(highest_prices)
min_price = np.min(lowest_prices)
print(min_price,'--',max_price) #获得最高值,最小值
max_index = np.argmax(highest_prices)
min_index = np.argmin(lowest_prices)
print(dates[min_index],dates[max_index])#获得最高值,最小值对应的日期
highest_ptp = np.ptp(highest_prices)#最高价极差
lowest_ptp = np.ptp(lowest_prices)#最低价极差
print(lowest_ptp,highest_ptp)
5.中位数
将多个样本按大小顺序排列,居于中间位置的元素就是中位数。
median = np.median(无序样本)
示例:
import numpy as np
closeing_prices = np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(6),unpack=True
)
#中位数手工算法
size = closeing_prices.size
sorted_prices = np.msort(closeing_prices) #排序
median = (sorted_prices[int((size-1)/2)]+
sorted_prices[int(size /2)]) /2
print(median)
#以下用numpy方式
median = np.median(closeing_prices)
print(median)
6、标准差及相关几个统计指标
样本:[s1,s2, ...,sn]
均值:m=(s1+s2+...+sn)/n mean = numpy.mean(S)
离差:D = [d1,d2,...dn], di=si-m
离差方: Q = [q1,q2,...,qn], qi=di^2
总体方差:p=(q1+q2+...+qn)/n
总体标准差:std=sqrt(p) <- 方均根
样本方差:p=(q1+q2+...+qn)/n-1
样本标准差:std=sqrt(p) <- 方均根
numpy.std(S) -> 计算总体标准差
numpy.std(S,ddof=1) -> 计算样本标准差
numpy.var(S) -> 计算总体方差
numpy.var(S,ddof=1) -> 计算样本方差
示例:
import numpy as np
closeing_prices = np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(6),unpack=True
)
#标准方差计算方式一:
mean = np.mean(closeing_prices) #计算平均数
devs = closeing_prices - mean #离差
dsqs = devs ** 2 #离差方
pvar = np.mean(dsqs) #总体方差
pstd = np.sqrt(pvar) #总体标准差
svar = np.sum(dsqs) / (dsqs.size -1 ) #样本方差
sstd = np.sqrt(svar) #样本标准差
print(pstd,sstd)
#标准方差计算方法二:
pvar = np.var(closeing_prices)
svar = np.var(closeing_prices,ddof=1)
pstd = np.std(closeing_prices)
sstd = np.std(closeing_prices,ddof=1)
print(pvar,svar)
print(pstd,sstd)
股票收益率:
closing_prices: [c1,c2,c3,...,cn]
\/ \/
diff: [ d1,d2,...,dn-1]
收益率: [ r1,r2,...,rn-1], ri=di/ci
numpy.std(收益率)
代码:returns.py
对数收益率:
closing_prices: [c1,c2,c3,...,cn]
log: [l1,l2,l3,...,ln]
\/ \/
对数收益率: [ t1,t2,...,tn-1]
loga-logb = log(a/b)
股票波动率:
std(T)/menu(T)/(sqrt(1/trading_days))
7、时间数据处理
1、按星期取平均值
示例:
import numpy as np
import datetime as dt
def dmy2wday(dmy):
dmy = str(dmy,encoding='utf-8')
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date()
wday = date.weekday() #用0-6表示星期一至星期天
return wday
wdays,closeing_prices = np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,6),unpack=True,
converters={1:dmy2wday}
)
ave_closing_priecs = np.zeros(5)
for wday in range(ave_closing_priecs.size):
'''# 方法一:
ave_closing_priecs[wday] = \
np.take(closeing_prices,
np.where(wdays == wday)).mean()
# 方法二:
ave_closing_priecs[wday] = \
closeing_prices[np.where(wdays==wday)].mean()
'''
#方法三:
ave_closing_priecs[wday] = \
closeing_prices[wdays==wday].mean()
for wday,ave_closing_priecs in zip([
'MON','TUE','WED','THU','FRI'],
ave_closing_priecs):
print(wday,np.round(ave_closing_priecs,2))
2、按星期汇总数据
数组的轴向扩展
np.apply_along_axis(处理函数,轴向,数组)
沿着数组中指定的轴向,调用处理函数,
并将每次调用结果作为数组返回
示例一:
import numpy as np
import datetime as dt
def foo(arg):
print('foo: ', arg)
return arg.sum()
a = np.arange(1,10).reshape(3,3)
print(a)
b = np.apply_along_axis(foo,0,a)
print(b)
c = np.apply_along_axis(foo,0,a)
print(c)
示例二:
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2wday(dmy):
dmy = str(dmy,encoding='utf-8') #将字符集统一
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date() #将字符串转为日期格式
#print(date)
wday = date.weekday()
return wday
wdays,opening_prices,highest_prices,lowest_prices,closeing_prices \
= np.loadtxt(
'./aapl.csv',
delimiter=',',
usecols=(1,3,4,5,6),
unpack=True,
converters={1:dmy2wday}
)
first_monday = np.where(wdays == 0)[0][0]
last_friday = np.where(wdays == 4)[0][-1]
indexes = np.arange(first_monday,last_friday)
indexes = np.split(indexes,4)
def week_summary(indexes):
#print(indexes)
opening_price = opening_prices[indexes[0]]
highest_price = highest_prices[indexes].max()
lowest_price = lowest_prices[indexes].min()
closeing_price = closeing_prices[indexes[-1]]
return opening_price,highest_price,\
lowest_price,closeing_price
summaries = np.apply_along_axis(week_summary,1,indexes)
print(summaries)
#导出文件,%g,是紧凑格式显示小数,如果%f,则会有0.00
np.savetxt('./sumarry.csv',summaries,delimiter=',',fmt='%g')
8、卷积
卷积积分
激励函数:g(t)
单位激励下的响应函数:f(t)
响应函数:∫g(t)f(t)dt
C = numpy.convolve(序列a,序列b,卷积类型)
序列a:激励,被卷积序列
序列b:卷积核序列
卷积类型:
有效卷积:valid, 所得序列数与b序列数相同
同维卷积:same,所得序列数与a序列数相同
完全卷积:full, 所得序列数等a序列数+b序列数
示例:
import numpy as np
a = np.array([1,2,3,4,5]) #被卷积序列
b = np.array([6,7,8]) #卷积核序列
c = np.convolve(a,b,'full')
print("完全卷积:",c)
d = np.convolve(a,b,'same')
print("同维卷积:",d)
e = np.convolve(a,b,'valid')
print("有效卷积:",e)
9、移动平均线
示例:
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf-8') #将字符集统一
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date() #将字符串转为日期格式
ymd = date.strftime('%Y-%m-%d') #将日期转为字符串
return ymd
dates,closeing_prices \
= np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,6),unpack=True,
dtype='M8[D],f8',converters={1:dmy2ymd}
)
sma51 = np.zeros(closeing_prices.size - 4) #设置5天数据
#计算五日均线
for i in range(sma51.size):
sma51[i] = closeing_prices[i:i+5].mean()
#利用卷积计算
sma52 = np.convolve(closeing_prices,
np.ones(5)/5,'valid')
#利用卷积计算10日均线
sma10 = np.convolve(closeing_prices,
np.ones(10)/10,'valid')
mp.figure('sma',facecolor='lightgray')
mp.title('sma',fontsize=20)
mp.xlabel('Date',fontsize=14)
mp.ylabel('Price',fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(
md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%Y-%m-%d'))
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
dates = dates.astype(md.datetime.datetime) #将numpy的日期类型转为matplotlib日期类型
mp.plot(dates,closeing_prices,c='lightgray',
label='closeing_prices')
mp.plot(dates[4:],sma51,c='orangered',
label='SMA5(1)')#5天移动平均线
mp.plot(dates[4:],sma52,c='limegreen',alpha=0.3,
linewidth=6,label='SMA5(2)')
mp.plot(dates[9:],sma10,c='dodgerblue',alpha=0.3,
linewidth=2,label='SMA10')
mp.legend()
mp.gcf().autofmt_xdate() #自动调整水平坐标的日期标签
mp.show()
10、指数平均线
示例 示例:
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf-8') #将字符集统一
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date() #将字符串转为日期格式
ymd = date.strftime('%Y-%m-%d') #将日期转为字符串
return ymd
dates,closeing_prices \
= np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,6),unpack=True,
dtype='M8[D],f8',converters={1:dmy2ymd}
)
#构造指数序列
#利用卷积计算5日均线
weights = np.exp(np.linspace(-1,0,5))#根据实际情况选择取值范围
weights /= weights.sum()
ema05 = np.convolve(closeing_prices,
weights[::-1],'valid')
#利用卷积计算10日均线
weights = np.exp(np.linspace(-1,0,10))
weights /= weights.sum()
ema10 = np.convolve(closeing_prices,
weights[::-1],'valid')
mp.figure('ema',facecolor='lightgray')
mp.title('ema',fontsize=20)
mp.xlabel('Date',fontsize=14)
mp.ylabel('Price',fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(
md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%Y-%m-%d'))
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
dates = dates.astype(md.datetime.datetime) #将numpy的日期类型转为matplotlib日期类型
mp.plot(dates,closeing_prices,c='lightgray',
label='closeing_prices')
mp.plot(dates[4:],ema05,c='orangered',
label='EMA5')#5天移动平均线
mp.plot(dates[9:],ema10,c='limegreen',alpha=0.3,
linewidth=3,label='EMA10')
mp.legend()
mp.gcf().autofmt_xdate() #自动调整水平坐标的日期标签
mp.show()
11.布林带
布林带分为上中下三个轨道:
中轨:移动平均线(可用指数均线或移动均线)
上轨:中轨+标准差x2
下轨:中轨-标准差x2
代码:sbb.py
示例:
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf-8')
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date()
ymd = date.strftime('%Y-%m-%d')
return ymd
dates,closeing_prices \
= np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,6),unpack=True,
dtype='M8[D],f8',converters={1:dmy2ymd}
)
weights = np.exp(np.linspace(-1,0,5))
weights /= weights.sum()
#用5日指数均线构造布林带中轨
medios5 = np.convolve(closeing_prices,
weights[::-1],'valid')
stds = np.zeros(medios5.size)
for i in range(stds.size):
stds[i] = closeing_prices[i:i+5].std()
stds *= 2
lowers = medios5 - stds #下轨
uppers = medios5 + stds #上轨
mp.figure('Mpp',facecolor='lightgray')
mp.title('mpp',fontsize=20)
mp.xlabel('Date',fontsize=14)
mp.ylabel('Price',fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(
md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%Y-%m-%d'))
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
dates = dates.astype(md.datetime.datetime)
mp.plot(dates,closeing_prices,c='lightgray',label='Closeing_prices')
mp.plot(dates[4:],uppers,c='dodgerblue',label='Upper')
mp.plot(dates[4:],medios5,c='orangered',label='Medio')
mp.plot(dates[4:],lowers,c='limegreen',label='Lower')
mp.legend()
mp.gcf().autofmt_xdate()
mp.show()
12.线性模型
1、概念
y=f(x)
如果△x / △y = 常量,则y=f(x)为线性函数
2、线性预测
设常量a,b,c,d,e,f , 变量X,Y,Z,假设有以下方程组:
aX+bY+cZ = d
bX+cY+dZ = e
cX+dY+eZ = f
得矩阵:
|a b c| |X| |d|
|b c d| * |Y| = |e|
|c d e| |Z| |f|
a矩阵 x矩阵 b矩阵
b也叫向量
通过numpy函数解得 x矩阵:
x矩阵 = np.linalg.lstsq(a,b)[0]
可得X,Y,Z的值,则得知
dX+eY+fZ = g
g就叫未来的线性预测
如股票,则可通过历史一个月或一年或几年的数据进行预测下一个交易日价格与检验。
示例:
import numpy as np
import datetime as dt
import pandas as pd
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf-8')
date = dt.datetime.strptime(dmy,'%d-%m-%Y').date()
ymd = date.strftime('%Y-%m-%d')
return ymd
dates,closeing_prices \
= np.loadtxt(
'./aapl.csv',delimiter=',',
usecols=(1,6),unpack=True,
dtype='M8[D],f8',converters={1:dmy2ymd} )
N = 5 #用五天数据预测帝六天
#只是需要2N的历史数据,才可预测第N+1天的数据
pred_prices = np.zeros(closeing_prices.size - 2*N +1) #存放预测结果
#向数组中填充数据
for i in range(pred_prices.size):
#准备a数组
a = np.zeros((N,N)) #因为N=5,即表示5行,5列
for j in range(N):
a[j,] = closeing_prices[i + j : i + j + N]
b = closeing_prices[i + N : i + N * 2] #获得向量值
x = np.linalg.lstsq(a,b)[0] #解线性方程,获得预测值
pred_prices[i] = b.dot(x)
mp.figure('Liner Prediction',facecolor='lightgray') #设置线性预测窗口
mp.title('Liner Prediction',fontsize=20)
mp.xlabel('Date',fontsize=14)
mp.ylabel('Price',fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(
md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%Y-%m-%d'))
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
dates = dates.astype(md.datetime.datetime)
mp.plot(dates,closeing_prices,'o-',c='lightgray',label='Liner Prediction')
last_date = dates[-1] + pd.tseries.offsets.BDay()#设置预测日期,跳过休息日
dates = np.append(dates,last_date) #添加预测日期
mp.plot(dates[2 * N :],pred_prices,'o-',c='orangered',
linewidth=3,label='Pred_prices')
mp.legend()
mp.gcf().autofmt_xdate()
mp.show()
3、线性拟合
1、概念一:
设有方程 kX + b = Y 存在n个样本,即:
kX1 + b = Y1
kX2 + b = Y2
...
kXn + b = Yn
得出矩阵方程:
|x1 1| |k| |y1|
|x2 1| * |b|=|y2|
... ...
|xn 1| | | |yn|
a矩阵 x矩阵 b矩阵
通过numpy函数解得 x矩阵:
x矩阵 = np.linalg.lstsq(a,b)[0]
x矩阵就叫矩阵a,b的线性拟合
a,b可通过测量值,或历史数据收集样本,就会计算出接近k,b的数据
这个数据就是这些样本的线性拟合
如:通过多次测量电流和电压,根据公式IR=U , 得出电阻
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。