APScheduler(Python化的Cron)使用总结定时任务
APScheduler(Python化的Cron)使⽤总结定时任务APScheduler(Python化的Cron)使⽤总结
简介
APScheduler全程为Advanced Python Scheduler,是⼀款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执⾏的任务,并且⽀持Python函数或任意可调⽤的对象。官⽅⽂档:
APScheduler安装
电脑死机怎么办⽅法⼀:使⽤pip安装
$ pip install apscheduler
⽅法⼆:如果pip不起作⽤,可以从pypi上下载最新的源码包()进⾏安装:
$ python setup.py install
APScheduler组件
triggers(触发器):触发器中包含调度逻辑,每个作业都由⾃⼰的触发器来决定下次运⾏时间。除了他们⾃⼰初始配置意外,触发器完全是⽆状态的。
job stores(作业存储器):存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到⼀个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。
executors(执⾏器):处理作业的运⾏,他们通常通过在作业中提交指定的可调⽤对象到⼀个线程或者进城池来进⾏。当作业完成时,执⾏器将会通知调度器。
schedulers(调度器):配置作业存储器和执⾏器可以在调度器中完成,例如添加、修改和移除作业。根据不同的应⽤场景可以选⽤不同的调度器,可选的有
BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。调度器
BlockingScheduler : 当调度器是你应⽤中唯⼀要运⾏的东西时。
BackgroundScheduler : 当你没有运⾏任何其他框架并希望调度器在你应⽤的后台执⾏时使⽤(充电桩即使⽤此种⽅式)。AsyncIOScheduler : 当你的程序使⽤了asyncio(⼀个异步框架)的时候使⽤。
GeventScheduler : 当你的程序使⽤了gevent(⾼性能的Python并发框架)的时候使⽤。
TornadoScheduler : 当你的程序基于Tornado(⼀个web框架)的时候使⽤。
TwistedScheduler : 当你的程序使⽤了Twisted(⼀个异步框架)的时候使⽤
QtScheduler : 如果你的应⽤是⼀个Qt应⽤的时候可以使⽤。
作业存储器
如果你的应⽤在每次启动的时候都会重新创建作业,那么使⽤默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应⽤程序奔溃的情况下任然保留作业,你应该根据你的应⽤环境来选择具体的作业存储器。例如:使⽤Mongo或者SQLAlchemy JobStore (⽤于⽀持⼤多数RDBMS)
执⾏器
对执⾏器的选择取决于你使⽤上⾯哪些框架,⼤多数情况下,使⽤默认的ThreadPoolExecutor已经能够满⾜需求。如果你的应⽤涉及到CPU密集型操作,你可以考虑使⽤ProcessPoolExecutor来使⽤更多的CPU核⼼。你也可以同时使⽤两者,将ProcessPoolExecutor作为第⼆执⾏器。
触发器
当你调度作业的时候,你需要为这个作业选择⼀个触发器,⽤来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:
date ⼀次性指定⽇期
interval 在某个时间范围内间隔多长时间执⾏⼀次
cron 和Linux crontab格式兼容,最为强⼤
date
最基本的⼀种调度,作业只会执⾏⼀次。它的参数如下:
run_date (datetime|str) – 作业的运⾏⽇期或时间
timezone (info|str) – 指定时区
举个栗⼦:
# 2016-12-12运⾏⼀次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00运⾏⼀次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval
间隔调度,参数如下:
weeks (int) – 间隔⼏周
days (int) – 间隔⼏天
hours (int) – 间隔⼏⼩时
minutes (int) – 间隔⼏分钟
seconds (int) – 间隔多少秒
start_date (datetime|str) – 开始⽇期
end_date (datetime|str) – 结束⽇期
timezone (info|str) – 时区
举个栗⼦:
# 每两个⼩时调⼀下job_function
sched.add_job(job_function, 'interval', hours=2)
cron
参数如下:
year (int|str) – 年,4位数字
month (int|str) – ⽉ (范围1-12)
day (int|str) – ⽇ (范围1-31)
week (int|str) – 周 (范围1-53)
day_of_week (int|str) – 周内第⼏天或者星期⼏ (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 时 (范围0-23)
minute (int|str) – 分 (范围0-59)
second (int|str) – 秒 (范围0-59)
start_date (datetime|str) – 最早开始⽇期(包含)
end_date (datetime|str) – 最晚结束时间(包含)
timezone (info|str) – 指定时区
举个栗⼦:
# job_function将会在6,7,8,11,12⽉的第3个周五的1,2,3点运⾏
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截⽌到2016-12-30 00:00:00,每周⼀到周五早上五点半运⾏job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
配置调度程序
在应⽤程序中使⽤默认作业存储和默认执⾏程序运⾏BackgroundScheduler的例⼦:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization
波斯菊种植这将⽣成⼀个名为“default”的MemoryJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最⼤线程数为10。如果不满⾜于当前配置,如希望使⽤两个执⾏器有两个作业存储器,并且还想要调整新作业的默认值并设置不同的时区,可按如下配置:
from pytz import utc
醋泡熟鸡蛋from apscheduler.schedulers.background import BackgroundScheduler
城堡名from db import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
utors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 配置作业存储器
什么时候父亲节2020
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置执⾏器,并设置线程数
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,    # 默认情况下关闭新的作业
'max_instances': 3    # 设置调度程序将同时运⾏的特定作业的最⼤实例数3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
启动调度器
启动调度器只需要调⽤start()⽅法,对于除BlockingScheduler以外的调度程序,此调⽤将⽴即返回,您可以继续应⽤程序的初始化过程,可能会将作业添加到调度程序。
对于BlockingScheduler,只需在完成初始化步骤后调⽤start()
scheduler.start()
添加作业
⽅法⼀:调⽤add_job()⽅法
最常见的⽅法,add_job()⽅法返回⼀个apscheduler.job.Job实例,您可以稍后使⽤它来修改或删除该作业。
⽅法⼆:使⽤装饰器scheduled_job()
此⽅法主要是⽅便的声明在应⽤程序运⾏时不会改变的作业
删除作业
⽅法⼀:通过作业ID或别名调⽤remove_job()删除作业
⽅法⼆:通过add_job()返回的job实例调⽤remove()⽅法删除作业
举个栗⼦:
# 实例删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
# id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
暂停和恢复作业
可以通过Job实例或调度程序本⾝轻松暂停和恢复作业。当作业暂停时,下⼀个运⾏时间将被清除,直到作业恢复,不会再计算运⾏时间。要暂停作业,请使⽤以下任⼀⽅法:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢复作业:
apscheduler.sume()
apscheduler.schedulers.sume_job()
获取作业列表
要获得计划作业的机器可处理列表,可以使⽤get_jobs()⽅法。它将返回⼀个Job实例列表。如果您只对特定作业存储中包含的作业感兴趣,则将作业存储别名作为第⼆个参数。
为了⽅便起见,可以使⽤print_jobs()⽅法,它将打印格式化的作业列表,触发器和下次运⾏时间。
修改作业属性
您可以通过调⽤apscheduler.dify()或modify_job()来修改除id以外的任何作业属性。
关闭调度程序
默认情况下,调度程序关闭其作业存储和执⾏程序,并等待所有当前正在执⾏的作业完成,wait=False参数可选,代表⽴即停⽌,不⽤等待。
scheduler.shutdown(wait=False)
附:1、定时任务运⾏脚本⼩例⼦:
import datetime
gmail免费邮箱注册from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging
scheduler = BlockingScheduler()  # 后台运⾏
# 设置为每⽇凌晨00:30:30时执⾏⼀次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
print"schedule execute"
sys_logging.debug("statistic scheduler execute success" + w().strftime("%Y-%m-%d %H:%M:%S"))
if__name__ == '__main__':
try:
scheduler.start()
sys_logging.debug("statistic scheduler start success")
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
sys_logging.debug("statistic scheduler start-up fail")
2、第⼀次使⽤此定时器时总会执⾏两次,⼀直不知道为什么,后来发现,python 的flask框架在debug模式下会多开⼀个线程监测项⽬变化,所以每次会跑两遍,可以将debug选项改为False

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。