概述
在聚宽网实现回测:https://www.joinquant.com/algorithm/index/edit?algorithmId=a5d6decffd9805f8f7896e431ca619c4
还有米筐、优矿来进行回测。
一、双均线策略
import jqdata p1 = 5 p2 = 60 def initialize(context): set_benchmark('000300.XSHG') set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') g.security = ['601318.XSHG'] def handle_data(context, data): for stock in g.security: hist = attribute_history(stock, count=p2) ma5 = hist['close'][-5:].mean() ma60 = hist['close'].mean() if ma5 > ma60 and stock not in context.portfolio.positions: order = order_value(stock, context.portfolio.available_cash) if order: print('Buying: %dat Price: %.2f' % (order.amount, order.price)) elif ma5 < ma60 and stock in context.portfolio.positions: order = order_target_value(stock, 0) if order: print('Selling: %d at Price: %.2f' % (order.amount, order.price)) record(ma5=ma5, ma60=ma60)
二、因子选股
# 导入函数库 import jqdata # 初始化函数,设定基准等等 def initialize(context): set_benchmark('000300.XSHG') set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') g.security = get_index_stocks('000300.XSHG') g.q = query(valuation).filter(valuation.code.in_(g.security)) g.N = 10 run_monthly(handle, 1) def handle(context): df = get_fundamentals(g.q) df = df.sort(columns='market_cap') df = df[:g.N] tohold = df['code'].values for stock in context.portfolio.positions: if stock not in tohold: #卖出 order_target(stock, 0) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] cash = context.portfolio.available_cash n = len(tobuy) #买入 for stock in tobuy: order_value(stock, int(cash/n))
三、均值回归
import jqdata import math import numpy as np import pandas as pd def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') g.benchmark = '000300.XSHG' set_benchmark(g.benchmark) g.ma_days = 30 g.stock_num = 10 run_monthly(handle, 1) #run_monthly(handle, 11) def handle(context): tohold = get_hold_list(context) for stock in context.portfolio.positions: if stock not in tohold: order_target_value(stock, 0) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] if len(tobuy)>0: cash = context.portfolio.available_cash cash_every_stock = cash / len(tobuy) for stock in tobuy: order_value(stock, cash_every_stock) def get_hold_list(context): stock_pool = get_index_stocks(g.benchmark) stock_score = pd.Series(index=stock_pool) for stock in stock_pool: df = attribute_history(stock, g.ma_days, '1d', ['close']) ma = df.mean()[0] current_price = get_current_data()[stock].day_open ratio = (ma - current_price) / ma stock_score[stock] = ratio return stock_score.nlargest(g.stock_num).index.values
四、布林带策略
# 导入函数库 import jqdata def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_benchmark('000300.XSHG') g.security = ['600036.XSHG'] g.N = 2 g.ma_days = 20 def handle_data(context, data): for stock in g.security: df = attribute_history(stock, g.ma_days) middle = df['close'].mean() upper = middle + g.N * df['close'].std() lower = middle - g.N * df['close'].std() p = get_current_data()[stock].day_open # 如果价格突破阻力线 if p >= upper and stock in context.portfolio.positions: order_target(stock, 0) cash = context.portfolio.available_cash / len(g.security) for stock in g.security: df = attribute_history(stock, g.ma_days) middle = df['close'].mean() upper = middle + g.N * df['close'].std() lower = middle - g.N * df['close'].std() p = get_current_data()[stock].day_open # 如果价格跌破支撑线 if p <= lower and stock not in context.portfolio.positions: order_target(stock, cash)
五、PEG策略
def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_benchmark('000300.XSHG') g.security = get_index_stocks('000300.XSHG') g.N = 20 run_monthly(handle, 1) def handle(context): df = get_fundamentals(query(valuation.code, valuation.pe_ratio, indicator.inc_net_profit_year_on_year).filter(valuation.code.in_(g.security))) df = df[(df['pe_ratio']>0) & (df['inc_net_profit_year_on_year']>0)] df['PEG'] = df['pe_ratio'] / df['inc_net_profit_year_on_year'] / 100 df = df.sort(columns='PEG')[:g.N] tohold = df['code'].values for stock in context.portfolio.positions: if stock not in tohold: order_target_value(stock, 0) tobuy = [stock for stock in tohold if stock not in context.portfolio.positions] if len(tobuy)>0: print('Buying') cash = context.portfolio.available_cash cash_every_stock = cash / len(tobuy) for stock in tobuy: order_value(stock, cash_every_stock)
六、羊驼交易法则-表现最优入池
def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_benchmark('000300.XSHG') g.security = get_index_stocks('000300.XSHG') g.period = 3 g.N = 10 g.M = 2 print("init") run_monthly(handle, 10) stocks = get_sorted_stocks(context, g.security)[:g.N] cash = context.portfolio.available_cash / len(stocks) for stock in stocks: order_value(stock, cash) def get_sorted_stocks(context, stocks): df = history(g.period, field='close', security_list=stocks).T df['ret'] = (df.iloc[:,-1] - df.iloc[:,0]) / df.iloc[:,0] df = df.sort(columns='ret', ascending=False) return df.index.values def handle(context): stocks = get_sorted_stocks(context, context.portfolio.positions.keys()) for stock in stocks[-g.M:]: order_target(stock, 0) stocks = get_sorted_stocks(context, g.security) cash = context.portfolio.available_cash / g.M for stock in stocks: if len(context.portfolio.positions) >= g.N: break if stock not in context.portfolio.positions: order_target(stock, cash)
七、海龟交易法则
可参考:https://www.joinquant.com/post/1401?f=study&m=algorithm
import jqdata import math import numpy as np import pandas as pd from collections import deque def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') g.security = '000060.XSHE' set_benchmark(g.security) g.in_day = 20 g.out_day = 10 g.today_units = 0 g.current_units = 0 g.N=deque(maxlen=19) g.current_N = 0 g.last_buy_price = 0 price = attribute_history(g.security, g.N.maxlen*2+1, '1d', ('high', 'low', 'close')) for i in range(g.N.maxlen+1, g.N.maxlen*2+1): li = [] for j in range(i-19,i+1): a = price['high'][j]-price['low'][j] b = abs(price['high'][j]-price['close'][j-1]) c = abs(price['low'][j]-price['close'][j-1]) li.append(max(a,b,c)) current_N = np.array(li).mean() g.N.append(current_N) def before_trading_start(context): g.current_N = cal_N() g.today_units = 0 def handle_data(context, data): dt = context.current_dt current_price = data[g.security].price #上一分钟价格 value = context.portfolio.total_value cash = context.portfolio.available_cash unit = math.floor(value * 0.01 / g.current_N) if g.current_units == 0: buy(current_price, cash, unit) else: if stop_loss(current_price): return if sell(current_price): return addin(current_price, cash, unit) def cal_N(): # if len(g.N) < g.N.maxlen: # price = attribute_history(g.security, g.N.maxlen+2, '1d', ('high', 'low', 'close')) # li = [] # for i in range(1, g.N.maxlen+2): # a = price['high'][i]-price['low'][i] # b = abs(price['high'][i]-price['close'][i-1]) # c = abs(price['low'][i]-price['close'][i-1]) # li.append(max(a,b,c)) # current_N = np.array(li).mean() # else: price = attribute_history(g.security, 2, '1d', ('high', 'low', 'close')) a = price['high'][1]-price['low'][1] b = abs(price['high'][1]-price['close'][0]) c = abs(price['low'][1]-price['close'][0]) current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+1) g.N.append(current_N) return current_N def buy(current_price, cash, unit): price = attribute_history(g.security, g.in_day, '1d', ('close',)) if current_price > max(price['close']): shares = cash / current_price if shares >= unit: print("buying %d" % unit) o = order(g.security, unit) g.last_buy_price = o.price g.current_units += 1 g.today_units += 1 return True return False def addin(current_price, cash, unit): if current_price >= g.last_buy_price + 0.5 * g.current_N: shares = cash / current_price if shares >= unit: print("adding %d" % unit) o = order(g.security, unit) g.last_buy_price = o.price g.current_units += 1 g.today_units += 1 return True return False def sell(current_price): price = attribute_history(g.security, g.out_day, '1d', ('close',)) if current_price < min(price['close']): print("selling") order_target(g.security, 0) g.current_units = g.today_units return True return False def stop_loss(current_price): if current_price < g.last_buy_price - 2 * g.current_N: print("stop loss") order_target(g.security, 0) g.current_units = g.today_units return True return False
八、鳄鱼交易法则
# 导入函数库 import jqdata import numpy as np # 初始化函数,设定基准等等 def initialize(context): set_option('use_real_price', True) set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock') set_benchmark('000300.XSHG') g.up_price = {} #向上碎形最高价 g.low_price = {} #向下碎形最低价 g.up_fractal_exists = {} #判断有效向上碎形 g.down_fractal_exists = {} #判断有效向下碎形 g.AO_index = {} #存放连续的AO指标数据 g.cal_AC_index = {} #计算AC指标中转存储 g.AC_index = {} #存放连续的AC指标数据 g.amount = {} #满仓仓位 g.stock = get_index_stocks('000300.XSHG') g.buy_stock = [] g.month = context.current_dt.month run_monthly(select_universe,1,'open') #重置全局变量 def reset_global(): g.up_price = {} #向上碎形最高价 g.low_price = {} #向下碎形最低价 g.up_fractal_exists = {} #判断有效向上碎形 g.down_fractal_exists = {} #判断有效向下碎形 g.AO_index = {} #存放连续的AO指标数据 g.cal_AC_index = {} #计算AC指标中转存储 g.AC_index = {} #存放连续的AC指标数据 g.amount = {} #满仓仓位 g.buy_stock = [] def initial_stock_global(stock): g.up_price[stock] = 0 g.low_price[stock] = 0 g.up_fractal_exists[stock] = False g.down_fractal_exists[stock] = False #判断有效向下碎形 g.AO_index[stock] = [0] #存放连续的AO指标数据 g.cal_AC_index[stock] = [0] #计算AC指标中转存储 g.AC_index[stock] = [0] #存放连续的AC指标数据 g.amount[stock] = 0 #满仓仓位 #轮换选股后清空持仓 def reset_position(context): for stock in g.buy_stock: order_target(stock,0) log.info("sell %s for reset position"%stock) #选股 def select_universe(context): #每三个月操作一次 month = context.current_dt.month if month%6 != g.month%6: return #清空全局变量 reset_position(context) reset_global() hist = history(30,'1d','close',g.stock,df = False) for stock in g.stock: if is_sleeping_alligator(stock,hist,20): g.buy_stock.append(stock) #初始化该股票全局变量 initial_stock_global(stock) print g.buy_stock return None #睡着的鳄鱼 def is_sleeping_alligator(stock,hist,nday): for i in range(nday): if is_struggle(stock,hist,i) == False: return False return True #均线纠缠,BRG三线非常接近 def is_struggle(stock,hist,delta): blue_line = hist[stock][-21-delta:-8-delta].mean() red_line = hist[stock][-13-delta:-5-delta].mean() green_line = hist[stock][-8-delta:-3-delta].mean() if abs(blue_line/red_line-1)<0.02 and abs(red_line/green_line-1)<0.02: return True else: return False #判断 向上 或 向下 碎形 def is_fractal(stock,direction): hist = attribute_history(stock, 5, fields=[direction]) if direction == 'high': if np.all(hist.iloc[:2] < hist.iloc[2]) and np.all(hist.iloc[3:] < hist.iloc[2]): g.up_price[stock] = hist.iloc[2].values return True elif direction == 'low': if np.all(hist.iloc[:2] > hist.iloc[2]) and np.all(hist.iloc[3:] > hist.iloc[2]): g.low_price[stock] = hist.iloc[2].values return True return False #通过比较碎形与红线位置,判断碎形是否有效 def is_effective_fractal(stock, direction): if is_fractal(stock,direction): hist = attribute_history(stock, 11) red_line = hist['close'][:-3].mean() close_price = hist['close'][-1] if direction == 'high': if close_price > red_line: g.up_fractal_exists[stock] = True else: g.up_fractal_exists[stock] = False elif direction == 'low': if close_price < red_line: g.down_fractal_exists[stock] = True else: g.down_fractal_exists[stock] = False #N日内最高价格的N日线 def nday_high_point(stock,n): hist = history(2*n,'1d','high',[stock],df = False)[stock] high_point = [] for i in range(n): high_point.append(max(hist[-5-i:-1-i])) return np.array(high_point).mean() #N日内最低价格的N日线 def nday_low_point(stock,n): hist = history(2*n,'1d','low',[stock],df = False)[stock] low_point = [] for i in range(n): low_point.append(max(hist[-5-i:-1-i])) return np.array(low_point).mean() #AO=5日内(最高-最低)/2的5日移动平均-34日内(最高-最低)/2的34日移动平均 def AO_index(stock): g.AO_index[stock].append(nday_high_point(stock,5)/2 + nday_low_point(stock,5)/2 - nday_high_point(stock,34)/2 - nday_low_point(stock,34)/2) return None #AO-AO的5日平均值的5日平均 def AC_index(stock): AO_index(stock) if len(g.AO_index[stock]) >= 5: g.cal_AC_index[stock].append(g.AO_index[stock][-1] - np.array(g.AO_index[stock][-5:]).mean()) if len(g.cal_AC_index[stock]) >=5: g.AC_index[stock].append(np.array(g.cal_AC_index[stock][-5:]).mean()) #判断序列n日上行 def is_up_going(alist,n): if len(alist) < n: return False for i in range(n-1): if alist[-(1+i)] <= alist[-(2+i)]: return False return True #判断序列n日下行 def is_down_going(alist,n): if len(alist) < n: return False for i in range(n-1): if alist[-(1+i)] >= alist[-(2+i)]: return False return True #碎形被突破 def active_fractal(stock,direction): close_price = history(1,'1d','close',[stock],df=False)[stock][0] if direction == 'up' and close_price > g.up_price[stock]: return True elif direction == 'down' and close_price < g.low_price[stock]: return True return False #进场,初始仓位 def set_initial_position(stock,context): close_price = history(1,'1d','close',[stock],df=False)[stock][0] g.amount[stock] = context.portfolio.cash/close_price/len(g.buy_stock)*3 order(stock, g.amount[stock]) log.info("buying %s 股数为 %s"%(stock,g.amount[stock])) g.down_fractal_exists[stock] = False #卖出 def sell_all_stock(stock,context): order_target(stock,0) log.info("selling %s"%stock) g.up_fractal_exists[stock] = False #加仓 def adjust_position(stock,context,position): order(stock,g.amount[stock]*position) log.info("adjust position buying %s 股数为 %s"%(stock,g.amount[stock]*position)) # 计算股票前n日收益率 def security_return(days,security_code): hist1 = attribute_history(security_code, days + 1, '1d', 'close',df=False) security_returns = (hist1['close'][-1]-hist1['close'][0])/hist1['close'][0] return security_returns # 止损,根据前n日收益率 def conduct_nday_stoploss(context,security_code,days,bench): if security_return(days,security_code)<= bench: for stock in g.buy_stock: order_target_value(stock,0) log.info("Sell %s for stoploss" %stock) return True else: return False # 计算股票累计收益率(从建仓至今) def security_accumulate_return(context,data,stock): current_price = data[stock].price cost = context.portfolio.positions[stock].avg_cost if cost != 0: return (current_price-cost)/cost else: return None # 个股止损,根据累计收益 def conduct_accumulate_stoploss(context,data,stock,bench): if security_accumulate_return(context,data,stock) != None and security_accumulate_return(context,data,stock) < bench: order_target_value(stock,0) log.info("Sell %s for stoploss" %stock) return True else: return False # 个股止盈,根据累计收益 def conduct_accumulate_stopwin(context,data,stock,bench): if security_accumulate_return(context,data,stock) != None and security_accumulate_return(context,data,stock) > bench: order_target_value(stock,0) log.info("Sell %s for stopwin" %stock) return True else: return False def handle_data(context,data): #大盘止损 if conduct_nday_stoploss(context,'000300.XSHG',3,-0.03): return for stock in g.buy_stock: #个股止损 if conduct_accumulate_stopwin(context,data,stock,0.3) or conduct_accumulate_stoploss(context,data,stock,-0.1): return #计算AO,AC指标 AC_index(stock) #空仓时,寻找机会入场 if context.portfolio.positions[stock].amount == 0: #计算向上碎形 is_effective_fractal(stock,'high') #有效向上碎形存在,并被突破,买入 if g.up_fractal_exists and active_fractal(stock,'up'): close_price = history(5, '1d', 'close', [stock],df = False) if is_up_going(g.AO_index[stock],5) and is_up_going(g.AC_index[stock],3) and is_up_going(close_price[stock],2): set_initial_position(stock,context) #有持仓时,加仓或离场 else: #计算向下碎形 is_effective_fractal(stock,'low') #出场条件1:有效向下碎形存在,并被突破,卖出 if g.down_fractal_exists and active_fractal(stock,'down'): sell_all_stock(stock,context) return
转载于:https://www.cnblogs.com/lxyoung/p/7421988.html
最后
以上就是欢喜蜗牛为你收集整理的金融量化分析策略展示的全部内容,希望文章能够帮你解决金融量化分析策略展示所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复