概述
1. 调度对象封装
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os,logging
import asyncio
import datetime
from pytz import timezone
from api_monitor.utils import scheduler_config
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.events import EVENT_ALL
def default_func():
pass
class APIScheduler():
def __init__(self,executors=scheduler_config.executors,
jobstores=scheduler_config.jobstores,
job_defaults=scheduler_config.job_defaults):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._sched = AsyncIOScheduler(executors=executors,
jobstores=jobstores,
job_defaults=job_defaults,
timezone=timezone('Asia/Shanghai'))
self.default_id = 'default'
#设置scheduler默认为运行状态
self._sched.state = 1
# self._sched.add_listener(self.listener,EVENT_ALL)
self._sched._logger = scheduler_config.logger()
#解决job从redis获取后没有scheduler属性问题
for jobstore in jobstores:
jobstores[jobstore]._scheduler = self._sched
def listener(self,event):
if event.exception:
print('CHUCUO')
else:
print('正常。。。')
def add_job(self,**kwargs):
"""
添加任务到数据库中
:param kwargs:
func – callable (or a textual reference to one) to run at the given time
trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called
args (list|tuple) – list of positional arguments to call func with
kwargs (dict) – dict of keyword arguments to call func with
id (str|unicode) – explicit identifier for the job (for modifying it later)
name (str|unicode) – textual description of the job
misfire_grace_time (int) – seconds after the designated runtime that the job is still allowed to be run
coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession
max_instances (int) – maximum number of concurrently running instances allowed for this job
next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused)
jobstore (str|unicode) – alias of the job store to store the job in
executor (str|unicode) – alias of the executor to run the job with
replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one)`
:return:
"""
#state=1时才会调用_real_add_job方法添加数据到数据库
job = self._sched.add_job(**kwargs)
def _add_default_job(self,job_id='default',jobstore='default'):
self.add_job(
func=default_func, trigger='interval', seconds=1,
jobstore=jobstore,next_run_time=datetime.datetime.now(),
id=job_id, replace_existing=True, misfire_grace_time=3, coalesce=True,
max_instances=1
)
def remove_job(self,job_id,jobstore='default'):
"""
根据job_id删除任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.remove_job(job_id=job_id,jobstore=jobstore)
def remove_all_jobs(self,jobstore='default',default_jobid='default'):
"""
从jobstore内删除所有除job id为default的任务
:param jobstore:
:param default_jobid:
:return:
"""
valid_job_ids = (job.id for job in self._sched.get_jobs(jobstore=jobstore)
if job.id != default_jobid)
try:
for job_id in valid_job_ids:
self.remove_job(job_id,jobstore)
except AttributeError as e:
print(e.with_traceback())
def pause_job(self,job_id,jobstore='default'):
"""
根据job_id暂停任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.pause_job(job_id=job_id,jobstore=jobstore)
def resume_job(self,job_id,jobstore='default'):
"""
根据job_id唤醒任务
:param job_id:
:param jobstore:
:return:
"""
self._sched.resume_job(job_id=job_id,jobstore=jobstore)
def star(self,add_default_job=True,jobstore='default'):
"""
开始处理任务
:return:
"""
try:
if add_default_job:
self._add_default_job(jobstore=jobstore)
self._sched.state = 0
self._sched.start()
loop = asyncio.get_event_loop()
loop.run_forever()
finally:
self.shutdown()
def shutdown(self,wait=True):
self._sched.shutdown(wait=True)
2. 调度任务存储配置
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
executors = {
'default':ThreadPoolExecutor(200),
'processpool':ProcessPoolExecutor(5)
}
jobstores = {
'default':RedisJobStore(db=0,
jobs_key='apscheduler.jobs',
run_times_key='apscheduler.run_times',
host='127.0.0.1',
port='6379',
password='',
),
}
job_defaults = {
'coalesce': True,
'max_instances': 3
}
def logger(log_file='/tmp/scheduler.log'):
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=log_file,
filemode='a')
return logging
3.任务函数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pycurl
from urllib import parse
from io import StringIO,BytesIO
import json
import os,sys
import pymysql
import time
db_host = "xxxxxx"
db_user = "xxxx"
db_passwd = "xxxxxx"
db_database = "xxxxxx"
conn = pymysql.connect(db_host,db_user,db_passwd,db_database)
cursor = conn.cursor()
def curl(url,params,method,header=[],save_data=True,time_out=6,**kwargs):
api_info = {
'api_monitor_id': kwargs['api_id'],
'api_monitor_node_id': kwargs['node_id'],
}
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(pycurl.HTTPHEADER, header)
c.setopt(pycurl.WRITEDATA, buffer)
# 请求超时时间
c.setopt(pycurl.TIMEOUT, time_out)
if method.lower() == 'get':
#url = f'{url}?{parse.urlencode(params)}'
url = url + '?' + parse.urlencode(params)
else:
c.setopt(pycurl.POSTFIELDS, json.dumps(params))
c.setopt(pycurl.URL,url)
try:
c.perform()
api_info['err_message'] = 0
except pycurl.error as e:
api_info['err_message'] = str(e)
finally:
api_info.update(
{
'http_code': c.getinfo(pycurl.HTTP_CODE),
'totle_response_time': c.getinfo(pycurl.TOTAL_TIME),
'dns_time': c.getinfo(pycurl.NAMELOOKUP_TIME),
'connect_time': c.getinfo(pycurl.CONNECT_TIME),
'redriect_time': c.getinfo(pycurl.REDIRECT_TIME),
'ssl_time': c.getinfo(pycurl.APPCONNECT_TIME),
'size_download': c.getinfo(pycurl.SIZE_DOWNLOAD),
'speed_down': c.getinfo(pycurl.SPEED_DOWNLOAD),
'content': buffer.getvalue().decode('utf-8'),
}
)
if save_data:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
sql = """insert into api_monitor_apimonitorhistory (http_code, totle_response_time, dns_time, connect_time, redriect_time,ssl_time,size_download, speed_down, content, err_message, api_monitor_id,api_monitor_node_id, create_time)values({http_code},{totle_response_time},{dns_time},{connect_time},{redriect_time},{ssl_time},{size_download},{speed_down},'{content}',{err_message},{api_monitor_id},{api_monitor_node_id},'{current_time}')""".format(http_code=api_info.get('http_code'),totle_response_time=api_info.get('totle_response_time'),dns_time=api_info.get('dns_time'),connect_time=api_info.get('connect_time'),
redriect_time=api_info.get('redriect_time'),
ssl_time=api_info.get('ssl_time'),
size_download=api_info.get('size_download'),
speed_down=api_info.get('speed_down'),
content=api_info.get('content'),
err_message=api_info.get('err_message'),
api_monitor_id=api_info.get('api_monitor_id'),
api_monitor_node_id=api_info.get('api_monitor_node_id'),
current_time=current_time)
print(sql)
try:
cursor.execute(sql)
conn.commit()
except Exception as e:
print(e)
conn.rollback()
#ApiMonitorHistory.objects.create(**api_info)
else:
pass
c.close()
4.执行调度任务
server端添加任务到redis
job_obj = scheduler.APIScheduler() # 调用job对象
for node_id in api_monitor_node_id_list:
api_conf = {
"url":monitor_url,
"params": params_data,
"method": request_method,
"header": http_header,
"api_id": str(api_obj_id),
"node_id": node_id
}
job_obj.add_job(id=str(node_id), func=tasks.curl, kwargs=api_conf,
trigger='interval',
seconds=5,next_run_time=datetime.datetime.now(),
replace_existing=True, misfire_grace_time=3, coalesce=True,
max_instances=2,
jobstore='default') # 加入任务到redis, 必须设置scheduler state=1才能加入redis
agent端监听任务
from api_monitor.utils import scheduler
job_obj = scheduler.APIScheduler()
job_obj.star() # 启动监听
#job_obj.remove_job(job_id="default") # 根据任务id删除任务
参考链接: https://zhuanlan.zhihu.com/p/44185271
转载于:https://blog.51cto.com/haoyonghui/2374679
最后
以上就是忧心花卷为你收集整理的apscheduler定时调度任务的全部内容,希望文章能够帮你解决apscheduler定时调度任务所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复