我是靠谱客的博主 欢喜蜗牛,最近开发中收集的这篇文章主要介绍金融量化分析策略展示,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在聚宽网实现回测:https://www.joinquant.com/algorithm/index/edit?algorithmId=a5d6decffd9805f8f7896e431ca619c4

还有米筐、优矿来进行回测。

一、双均线策略

import jqdata

p1 = 5
p2 = 60


def initialize(context):
    set_benchmark('000300.XSHG')
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    g.security = ['601318.XSHG']
    
def handle_data(context, data):
    for stock in g.security:
        hist = attribute_history(stock, count=p2)
        ma5 = hist['close'][-5:].mean()
        ma60 = hist['close'].mean()
        
        if ma5 > ma60 and stock not in context.portfolio.positions:
            order = order_value(stock, context.portfolio.available_cash)
            if order:
                print('Buying: %dat Price: %.2f' % (order.amount, order.price))
        elif ma5 < ma60 and stock in context.portfolio.positions:
            order = order_target_value(stock, 0)
            if order:
                print('Selling: %d at Price: %.2f' % (order.amount, order.price))
    record(ma5=ma5, ma60=ma60)
View Code

二、因子选股

# 导入函数库
import jqdata

# 初始化函数,设定基准等等
def initialize(context):
    set_benchmark('000300.XSHG')
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    g.security = get_index_stocks('000300.XSHG')
    g.q = query(valuation).filter(valuation.code.in_(g.security))
    g.N = 10
    run_monthly(handle, 1)

def handle(context):
    df = get_fundamentals(g.q)
    df = df.sort(columns='market_cap')
    df = df[:g.N]
    tohold = df['code'].values
    
    for stock in context.portfolio.positions:
        if stock not in tohold:
            #卖出
            order_target(stock, 0)
            
    tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
    
    cash = context.portfolio.available_cash
    n = len(tobuy)
    #买入
    for stock in tobuy:
        order_value(stock, int(cash/n))
View Code

三、均值回归

import jqdata
import math
import numpy as np
import pandas as pd

def initialize(context):
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
    g.benchmark = '000300.XSHG'
    set_benchmark(g.benchmark)
    
    g.ma_days = 30
    g.stock_num = 10
    
    run_monthly(handle, 1)
    #run_monthly(handle, 11)
    
def handle(context):
    tohold = get_hold_list(context)
    for stock in context.portfolio.positions:
        if stock not in tohold:
            order_target_value(stock, 0)
    
    tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
    
    if len(tobuy)>0:
        cash = context.portfolio.available_cash
        cash_every_stock = cash / len(tobuy)
        
        for stock in tobuy:
            order_value(stock, cash_every_stock)

def get_hold_list(context):
    stock_pool = get_index_stocks(g.benchmark)
    stock_score = pd.Series(index=stock_pool)
    for stock in stock_pool:
        df = attribute_history(stock, g.ma_days, '1d', ['close'])
        ma = df.mean()[0]
        current_price = get_current_data()[stock].day_open
        ratio = (ma - current_price) / ma
        stock_score[stock] = ratio
    return stock_score.nlargest(g.stock_num).index.values
View Code

四、布林带策略

# 导入函数库
import jqdata

def initialize(context):
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    set_benchmark('000300.XSHG')
    
    g.security = ['600036.XSHG']
    
    g.N = 2
    g.ma_days = 20
    
def handle_data(context, data):
    for stock in g.security:
        df = attribute_history(stock, g.ma_days)
        middle = df['close'].mean()
        upper = middle + g.N * df['close'].std()
        lower = middle - g.N * df['close'].std()
        
        p = get_current_data()[stock].day_open
        # 如果价格突破阻力线
        if p >= upper and stock in context.portfolio.positions:
            order_target(stock, 0)
    
    cash = context.portfolio.available_cash / len(g.security)
    
    for stock in g.security:
        df = attribute_history(stock, g.ma_days)
        middle = df['close'].mean()
        upper = middle + g.N * df['close'].std()
        lower = middle - g.N * df['close'].std()
        
        p = get_current_data()[stock].day_open
        # 如果价格跌破支撑线
        
        if p <= lower and stock not in context.portfolio.positions:
            order_target(stock, cash)
View Code

五、PEG策略

def initialize(context):
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    set_benchmark('000300.XSHG')
    
    g.security = get_index_stocks('000300.XSHG')
    g.N = 20
    run_monthly(handle, 1)
    
def handle(context):
    df = get_fundamentals(query(valuation.code, valuation.pe_ratio, indicator.inc_net_profit_year_on_year).filter(valuation.code.in_(g.security)))
    df = df[(df['pe_ratio']>0) & (df['inc_net_profit_year_on_year']>0)]
    df['PEG'] = df['pe_ratio'] / df['inc_net_profit_year_on_year'] / 100
    df = df.sort(columns='PEG')[:g.N]
    tohold = df['code'].values
    
    for stock in context.portfolio.positions:
        if stock not in tohold:
            order_target_value(stock, 0)
    
    tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
    
    if len(tobuy)>0:
        print('Buying')
        cash = context.portfolio.available_cash
        cash_every_stock = cash / len(tobuy)
        
        for stock in tobuy:
            order_value(stock, cash_every_stock)
View Code

六、羊驼交易法则-表现最优入池

def initialize(context):
    
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
    set_benchmark('000300.XSHG')
    
    g.security = get_index_stocks('000300.XSHG')
    g.period = 3
    g.N = 10
    g.M = 2
    print("init")
    run_monthly(handle, 10)
    
    stocks = get_sorted_stocks(context, g.security)[:g.N]
    cash = context.portfolio.available_cash / len(stocks)
    for stock in stocks:
        order_value(stock, cash)
    
    
def get_sorted_stocks(context, stocks):
    df = history(g.period, field='close', security_list=stocks).T
    df['ret'] = (df.iloc[:,-1] - df.iloc[:,0]) / df.iloc[:,0]
    df = df.sort(columns='ret', ascending=False)
    return df.index.values
    
def handle(context):
    stocks = get_sorted_stocks(context, context.portfolio.positions.keys())
    
    for stock in stocks[-g.M:]:
        order_target(stock, 0)
        
    stocks = get_sorted_stocks(context, g.security)
    
    cash = context.portfolio.available_cash / g.M
    
    for stock in stocks:
        if len(context.portfolio.positions) >= g.N:
            break
        if stock not in context.portfolio.positions:
            order_target(stock, cash)
View Code

七、海龟交易法则

可参考:https://www.joinquant.com/post/1401?f=study&m=algorithm

import jqdata
import math
import numpy as np
import pandas as pd
from collections import deque

def initialize(context):
    
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')

    g.security = '000060.XSHE'
    set_benchmark(g.security)
    g.in_day = 20
    g.out_day = 10
    g.today_units = 0
    g.current_units = 0
    g.N=deque(maxlen=19)
    g.current_N = 0
    g.last_buy_price = 0
    
    price = attribute_history(g.security, g.N.maxlen*2+1, '1d', ('high', 'low', 'close'))
    
    for i in range(g.N.maxlen+1, g.N.maxlen*2+1):
        li = []
        for j in range(i-19,i+1):
            a = price['high'][j]-price['low'][j]
            b = abs(price['high'][j]-price['close'][j-1])
            c = abs(price['low'][j]-price['close'][j-1])
            li.append(max(a,b,c))
        current_N = np.array(li).mean()
        g.N.append(current_N)
        
    
def before_trading_start(context):
    g.current_N = cal_N()
    g.today_units = 0

    
def handle_data(context, data):
    dt = context.current_dt
    current_price = data[g.security].price #上一分钟价格
    value = context.portfolio.total_value
    cash = context.portfolio.available_cash
    
    unit = math.floor(value * 0.01 / g.current_N)
    
        
    if g.current_units == 0:
        buy(current_price, cash, unit)
    else:
        if stop_loss(current_price):
            return
        if sell(current_price):
            return
        addin(current_price, cash, unit)
    
def cal_N():
    # if len(g.N) < g.N.maxlen:
    #     price = attribute_history(g.security, g.N.maxlen+2, '1d', ('high', 'low', 'close'))
    #     li = []
    #     for i in range(1, g.N.maxlen+2):
    #         a = price['high'][i]-price['low'][i]
    #         b = abs(price['high'][i]-price['close'][i-1])
    #         c = abs(price['low'][i]-price['close'][i-1])
    #         li.append(max(a,b,c))
    #     current_N = np.array(li).mean()
    # else:
    price = attribute_history(g.security, 2, '1d', ('high', 'low', 'close'))
    a = price['high'][1]-price['low'][1]
    b = abs(price['high'][1]-price['close'][0])
    c = abs(price['low'][1]-price['close'][0])
    current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+1)
    g.N.append(current_N)
    return current_N
    
def buy(current_price, cash, unit):
    price = attribute_history(g.security, g.in_day, '1d', ('close',))
    if current_price > max(price['close']):
        shares = cash / current_price
        if shares >= unit:
            print("buying %d" % unit)
            o = order(g.security, unit)
            g.last_buy_price = o.price
            g.current_units += 1
            g.today_units += 1
            return True
    return False
            

def addin(current_price, cash, unit):
    if current_price >= g.last_buy_price + 0.5 * g.current_N:
        shares = cash / current_price
        if shares >= unit:
            print("adding %d" % unit)
            o = order(g.security, unit)
            g.last_buy_price = o.price
            g.current_units += 1
            g.today_units += 1
            return True
    return False
            
def sell(current_price):
    price = attribute_history(g.security, g.out_day, '1d', ('close',))
    if current_price < min(price['close']):
        print("selling")
        order_target(g.security, 0)
        g.current_units = g.today_units
        return True
    return False
        
def stop_loss(current_price):
    if current_price < g.last_buy_price - 2 * g.current_N:
        print("stop loss")
        order_target(g.security, 0)
        g.current_units = g.today_units
        return True
    return False
View Code

八、鳄鱼交易法则

# 导入函数库
import jqdata
import numpy as np

# 初始化函数,设定基准等等
def initialize(context):
    set_option('use_real_price', True)
    set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    set_benchmark('000300.XSHG')
    
    g.up_price = {} #向上碎形最高价
    g.low_price = {} #向下碎形最低价
    g.up_fractal_exists = {} #判断有效向上碎形
    g.down_fractal_exists = {} #判断有效向下碎形
    g.AO_index = {} #存放连续的AO指标数据
    g.cal_AC_index = {} #计算AC指标中转存储
    g.AC_index = {} #存放连续的AC指标数据
    g.amount = {} #满仓仓位
    g.stock = get_index_stocks('000300.XSHG')
    g.buy_stock = []
    g.month = context.current_dt.month
    run_monthly(select_universe,1,'open')
    
#重置全局变量
def reset_global():
    g.up_price = {} #向上碎形最高价
    g.low_price = {} #向下碎形最低价
    g.up_fractal_exists = {} #判断有效向上碎形
    g.down_fractal_exists = {} #判断有效向下碎形
    g.AO_index = {} #存放连续的AO指标数据
    g.cal_AC_index = {} #计算AC指标中转存储
    g.AC_index = {} #存放连续的AC指标数据
    g.amount = {} #满仓仓位
    g.buy_stock = []

def initial_stock_global(stock):
    g.up_price[stock] = 0
    g.low_price[stock] = 0
    g.up_fractal_exists[stock] = False
    g.down_fractal_exists[stock] = False #判断有效向下碎形
    g.AO_index[stock] = [0] #存放连续的AO指标数据
    g.cal_AC_index[stock] = [0]  #计算AC指标中转存储
    g.AC_index[stock] = [0] #存放连续的AC指标数据
    g.amount[stock] = 0 #满仓仓位

#轮换选股后清空持仓
def reset_position(context):
    for stock in g.buy_stock:
        order_target(stock,0)
        log.info("sell %s for reset position"%stock)

#选股
def select_universe(context):
    #每三个月操作一次
    month = context.current_dt.month
    if month%6 != g.month%6:
        return
    #清空全局变量
    reset_position(context)
    reset_global()
    hist = history(30,'1d','close',g.stock,df = False)
    for stock in g.stock:
        if is_sleeping_alligator(stock,hist,20):
            g.buy_stock.append(stock)
            #初始化该股票全局变量
            initial_stock_global(stock)
    print g.buy_stock
    return None
    
#睡着的鳄鱼
def is_sleeping_alligator(stock,hist,nday):
    for i in range(nday):
        if is_struggle(stock,hist,i) == False:
            return False
    return True

#均线纠缠,BRG三线非常接近
def is_struggle(stock,hist,delta):
    blue_line = hist[stock][-21-delta:-8-delta].mean()
    red_line = hist[stock][-13-delta:-5-delta].mean()
    green_line = hist[stock][-8-delta:-3-delta].mean()
    if abs(blue_line/red_line-1)<0.02 and abs(red_line/green_line-1)<0.02:
        return True
    else:
        return False
        
#判断 向上 或 向下 碎形
def is_fractal(stock,direction):
    hist = attribute_history(stock, 5, fields=[direction])
    if direction == 'high':
        if np.all(hist.iloc[:2] < hist.iloc[2]) and np.all(hist.iloc[3:] < hist.iloc[2]):
            g.up_price[stock] = hist.iloc[2].values
            return True
    elif direction == 'low':
        if np.all(hist.iloc[:2] > hist.iloc[2]) and np.all(hist.iloc[3:] > hist.iloc[2]):
            g.low_price[stock] = hist.iloc[2].values
            return True
    return False
    
#通过比较碎形与红线位置,判断碎形是否有效
def is_effective_fractal(stock, direction):
    if is_fractal(stock,direction):
        hist = attribute_history(stock, 11)
        red_line = hist['close'][:-3].mean()
        close_price = hist['close'][-1]
        if direction == 'high':
            if close_price > red_line:
                g.up_fractal_exists[stock] = True
            else:
                g.up_fractal_exists[stock] = False
        elif direction == 'low':
            if close_price < red_line:
                g.down_fractal_exists[stock] = True
            else:
                g.down_fractal_exists[stock] = False
    

#N日内最高价格的N日线
def nday_high_point(stock,n):
    hist = history(2*n,'1d','high',[stock],df = False)[stock]
    high_point = []
    for i in range(n):
        high_point.append(max(hist[-5-i:-1-i]))
    return np.array(high_point).mean()

#N日内最低价格的N日线
def nday_low_point(stock,n):
    hist = history(2*n,'1d','low',[stock],df = False)[stock]
    low_point = []
    for i in range(n):
        low_point.append(max(hist[-5-i:-1-i]))
    return np.array(low_point).mean()

#AO=5日内(最高-最低)/2的5日移动平均-34日内(最高-最低)/2的34日移动平均
def AO_index(stock):
    g.AO_index[stock].append(nday_high_point(stock,5)/2 + nday_low_point(stock,5)/2
                      - nday_high_point(stock,34)/2 - nday_low_point(stock,34)/2)
    return None

#AO-AO的5日平均值的5日平均
def AC_index(stock):
    AO_index(stock)
    if len(g.AO_index[stock]) >= 5:
        g.cal_AC_index[stock].append(g.AO_index[stock][-1] - np.array(g.AO_index[stock][-5:]).mean())
        if len(g.cal_AC_index[stock]) >=5:
            g.AC_index[stock].append(np.array(g.cal_AC_index[stock][-5:]).mean())

#判断序列n日上行
def is_up_going(alist,n):
    if len(alist) < n:
        return False
    for i in range(n-1):
        if alist[-(1+i)] <= alist[-(2+i)]:
            return False
    return True

#判断序列n日下行
def is_down_going(alist,n):
    if len(alist) < n:
        return False
    for i in range(n-1):
        if alist[-(1+i)] >= alist[-(2+i)]:
            return False
    return True

#碎形被突破
def active_fractal(stock,direction):
    close_price = history(1,'1d','close',[stock],df=False)[stock][0]
    if direction == 'up' and close_price > g.up_price[stock]:
        return True
    elif direction == 'down' and close_price < g.low_price[stock]:
        return True
    return False

#进场,初始仓位
def set_initial_position(stock,context):
    close_price = history(1,'1d','close',[stock],df=False)[stock][0]
    g.amount[stock] = context.portfolio.cash/close_price/len(g.buy_stock)*3
    order(stock, g.amount[stock])
    log.info("buying %s 股数为 %s"%(stock,g.amount[stock]))
    g.down_fractal_exists[stock] = False

#卖出
def sell_all_stock(stock,context):
    order_target(stock,0)
    log.info("selling %s"%stock)
    g.up_fractal_exists[stock] = False

#加仓
def adjust_position(stock,context,position):
    order(stock,g.amount[stock]*position)
    log.info("adjust position buying %s 股数为 %s"%(stock,g.amount[stock]*position))

# 计算股票前n日收益率
def security_return(days,security_code):
    hist1 = attribute_history(security_code, days + 1, '1d', 'close',df=False)
    security_returns = (hist1['close'][-1]-hist1['close'][0])/hist1['close'][0]
    return security_returns

# 止损,根据前n日收益率
def conduct_nday_stoploss(context,security_code,days,bench):
    if  security_return(days,security_code)<= bench:
        for stock in g.buy_stock:
            order_target_value(stock,0)
            log.info("Sell %s for stoploss" %stock)
        return True
    else:
        return False

# 计算股票累计收益率(从建仓至今)
def security_accumulate_return(context,data,stock):
    current_price = data[stock].price
    cost = context.portfolio.positions[stock].avg_cost
    if cost != 0:
        return (current_price-cost)/cost
    else:
        return None

# 个股止损,根据累计收益
def conduct_accumulate_stoploss(context,data,stock,bench):
    if security_accumulate_return(context,data,stock) != None
    and security_accumulate_return(context,data,stock) < bench:
        order_target_value(stock,0)
        log.info("Sell %s for stoploss" %stock)
        return True
    else:
        return False

# 个股止盈,根据累计收益
def conduct_accumulate_stopwin(context,data,stock,bench):
    if security_accumulate_return(context,data,stock) != None
    and security_accumulate_return(context,data,stock) > bench:
        order_target_value(stock,0)
        log.info("Sell %s for stopwin" %stock)
        return True
    else:
        return False

def handle_data(context,data):
    #大盘止损
    if conduct_nday_stoploss(context,'000300.XSHG',3,-0.03):
        return
    for stock in g.buy_stock:
        #个股止损
        if conduct_accumulate_stopwin(context,data,stock,0.3)
        or conduct_accumulate_stoploss(context,data,stock,-0.1):
            return
        #计算AO,AC指标
        AC_index(stock)
        #空仓时,寻找机会入场
        if context.portfolio.positions[stock].amount == 0:
            #计算向上碎形
            is_effective_fractal(stock,'high')
            #有效向上碎形存在,并被突破,买入
            if g.up_fractal_exists and active_fractal(stock,'up'):
                close_price = history(5, '1d', 'close', [stock],df = False)
                if is_up_going(g.AO_index[stock],5)
                and is_up_going(g.AC_index[stock],3)
                and is_up_going(close_price[stock],2):
                    set_initial_position(stock,context)
        #有持仓时,加仓或离场
        else:
            #计算向下碎形
            is_effective_fractal(stock,'low')
            #出场条件1:有效向下碎形存在,并被突破,卖出
            if g.down_fractal_exists and active_fractal(stock,'down'):
                sell_all_stock(stock,context)
                return
View Code

 

转载于:https://www.cnblogs.com/lxyoung/p/7421988.html

最后

以上就是欢喜蜗牛为你收集整理的金融量化分析策略展示的全部内容,希望文章能够帮你解决金融量化分析策略展示所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部