概述
importmathimportrandomimporttushare as tsimportpandas as pd
random.seed(0)defgetData(id,start,end):
df=ts.get_hist_data(id,start,end)
DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])
P1= pd.DataFrame(columns=['high','low','close','open','volume'])
DATA2=pd.DataFrame(columns=['R'])
DATA['MA20']=df['ma20']
DATA['MA5']=df['ma5']
P=df['close']
P1['high']=df['high']
P1['low']=df['low']
P1['close']=df['close']
P1['open']=df['open']
P1['volume']=df['volume']
DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)
DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)
DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)
DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))
DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))
DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))
DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)
DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)
DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)
templist=(P-P.shift(1))/P.shift(1)
tempDATA=[]for indextemp intemplist:
tempDATA.append(1/(1+math.exp(-indextemp*100)))
DATA['r'] =tempDATA
DATA=DATA.dropna(axis=0)
DATA2['R']=DATA['r']del DATA['r']
DATA=DATA.T
DATA2=DATA2.T
DATAlist=DATA.to_dict("list")
result=[]for key inDATAlist:
result.append(DATAlist[key])
DATAlist2=DATA2.to_dict("list")
result2=[]for key inDATAlist2:
result2.append(DATAlist2[key])returnresultdefgetDataR(id,start,end):
df=ts.get_hist_data(id,start,end)
DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])
P1= pd.DataFrame(columns=['high','low','close','open','volume'])
DATA2=pd.DataFrame(columns=['R'])
DATA['MA20']=df['ma20'].shift(1)
DATA['MA5']=df['ma5'].shift(1)
P=df['close']
P1['high']=df['high']
P1['low']=df['low']
P1['close']=df['close']
P1['open']=df['open']
P1['volume']=df['volume']
DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)
DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)
DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)
DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))
DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))
DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))
DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)
DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)
DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)
templist=(P-P.shift(1))/P.shift(1)
tempDATA=[]for indextemp intemplist:
tempDATA.append(1/(1+math.exp(-indextemp*100)))
DATA['r'] =tempDATA
DATA=DATA.dropna(axis=0)
DATA2['R']=DATA['r']del DATA['r']
DATA=DATA.T
DATA2=DATA2.T
DATAlist=DATA.to_dict("list")
result=[]for key inDATAlist:
result.append(DATAlist[key])
DATAlist2=DATA2.to_dict("list")
result2=[]for key inDATAlist2:
result2.append(DATAlist2[key])returnresult2defrand(a, b):return (b - a) * random.random() +adef make_matrix(m, n, fill=0.0):
mat=[]for i inrange(m):
mat.append([fill]*n)returnmatdefsigmoid(x):return 1.0 / (1.0 + math.exp(-x))defsigmod_derivate(x):return x * (1 -x)classBPNeuralNetwork:def __init__(self):
self.input_n=0
self.hidden_n=0
self.output_n=0
self.input_cells=[]
self.hidden_cells=[]
self.output_cells=[]
self.input_weights=[]
self.output_weights=[]
self.input_correction=[]
self.output_correction=[]defsetup(self, ni, nh, no):
self.input_n= ni + 1self.hidden_n=nh
self.output_n=no#init cells
self.input_cells = [1.0] *self.input_n
self.hidden_cells= [1.0] *self.hidden_n
self.output_cells= [1.0] *self.output_n#init weights
self.input_weights =make_matrix(self.input_n, self.hidden_n)
self.output_weights=make_matrix(self.hidden_n, self.output_n)#random activate
for i inrange(self.input_n):for h inrange(self.hidden_n):
self.input_weights[i][h]= rand(-0.2, 0.2)for h inrange(self.hidden_n):for o inrange(self.output_n):
self.output_weights[h][o]= rand(-2.0, 2.0)#init correction matrix
self.input_correction =make_matrix(self.input_n, self.hidden_n)
self.output_correction=make_matrix(self.hidden_n, self.output_n)defpredict(self, inputs):#activate input layer
for i in range(self.input_n - 1):
self.input_cells[i]=inputs[i]#activate hidden layer
for j inrange(self.hidden_n):
total= 0.0
for i inrange(self.input_n):
total+= self.input_cells[i] *self.input_weights[i][j]
self.hidden_cells[j]=sigmoid(total)#activate output layer
for k inrange(self.output_n):
total= 0.0
for j inrange(self.hidden_n):
total+= self.hidden_cells[j] *self.output_weights[j][k]
self.output_cells[k]=sigmoid(total)returnself.output_cells[:]defback_propagate(self, case, label, learn, correct):#feed forward
self.predict(case)#get output layer error
output_deltas = [0.0] *self.output_nfor o inrange(self.output_n):
error= label[o] -self.output_cells[o]
output_deltas[o]= sigmod_derivate(self.output_cells[o]) *error#get hidden layer error
hidden_deltas = [0.0] *self.hidden_nfor h inrange(self.hidden_n):
error= 0.0
for o inrange(self.output_n):
error+= output_deltas[o] *self.output_weights[h][o]
hidden_deltas[h]= sigmod_derivate(self.hidden_cells[h]) *error#update output weights
for h inrange(self.hidden_n):for o inrange(self.output_n):
change= output_deltas[o] *self.hidden_cells[h]
self.output_weights[h][o]+= learn * change + correct *self.output_correction[h][o]
self.output_correction[h][o]=change#update input weights
for i inrange(self.input_n):for h inrange(self.hidden_n):
change= hidden_deltas[h] *self.input_cells[i]
self.input_weights[i][h]+= learn * change + correct *self.input_correction[i][h]
self.input_correction[i][h]=change#get global error
error = 0.0
for o inrange(len(label)):
error+= 0.5 * (label[o] - self.output_cells[o]) ** 2
returnerrordef train(self, cases, labels, limit=10000, learn=0.05, correct=0.1):for i inrange(limit):
error= 0.0
for i inrange(len(cases)):
label=labels[i]
case=cases[i]
error+=self.back_propagate(case, label, learn, correct)deftest(self,id):
result=getData("000001", "2015-01-05", "2015-01-09")
result2=getDataR("000001", "2015-01-05", "2015-01-09")
self.setup(11, 5, 1)
self.train(result, result2,10000, 0.05, 0.1)for t inresulttest:print(self.predict(t))
最后
以上就是靓丽小土豆为你收集整理的python必备源代码-神经网络(python源代码)的全部内容,希望文章能够帮你解决python必备源代码-神经网络(python源代码)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复