Python中定时任务框架APScheduler的快速入门指南
发布时间 - 2026-01-11 02:12:31 点击率:次前言

大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法。
一、APScheduler介绍
APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。
APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用;同时也提供了不同的存储机制,可以方便与Redis,数据库等第三方的外部持久化机制进行协同工作,总之功能非常强大和易用。
在Python的世界中,另外一个齐名的调度模块是Celery,功能也非常的强大,号称分布式的调度器,感兴趣的读者可以自行进行研究。
官网文档地址:http://apscheduler.readthedocs.io/en/latest/
安装包位置: https://pypi.python.org/pypi/APScheduler/
在系统中,如何进行安装呢?其实非常简单,基于pip直接安装即可:
pip install APScheduler
二、APScheduler的主要的调度类
在APScheduler中有以下几个非常重要的概念,需要大家理解:
1、触发器(trigger)
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,根据trigger中定义的时间点,频率,时间区间等等参数设置。除了他们自己初始配置以外,触发器完全是无状态的。
2、作业存储(job store)
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。job store支持主流的存储机制:redis, mongodb, 关系型数据库, 内存等等
3、执行器(executor)
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。基于池化的操作,可以针对不同类型的作业任务,更为高效地使用cpu的计算资源。
调度器(scheduler)
通常在应用只有一个调度器,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
这里简单列一下常用的若干调度器:
- BlockingScheduler:仅可用在当前你的进程之内,与当前的进行共享计算资源
- BackgroundScheduler: 在后台运行调度,不影响当前的系统计算运行
- AsyncIOScheduler: 如果当前系统中使用了async module,则需要使用异步的调度器
- GeventScheduler: 如果使用了gevent,则需要使用该调度
- TornadoScheduler: 如果使用了Tornado, 则使用当前的调度器
- TwistedScheduler:Twister应用的调度器
- QtScheduler: Qt的调度器
由此可知,在APscheduler的调度器中,是与底层的实现机制紧密相关的,需要依据当前的计算模型来动态选择调度器。
三、APScheduler的job管理
Job是APScheduler中的核心,其承接目前需要执行的工作和任务,其可以在系统运行过程中动态地进行增加/修改/删除/查询等操作。
3.1 Job的新增
共有两种方式进行新增job的操作:
基于add_job来动态增加
代码示例:
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
基于修饰器scheduled_job来动态装饰job的实际函数
代码示例:
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
3.2 移除作业
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
Same, using an explicit job ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
基于job id来动态移除特定的job.
3.3 暂停和恢复作业
暂停作业:
– apscheduler.job.Job.pause()
– apscheduler.schedulers.base.BaseScheduler.pause_job()
恢复作业:
– apscheduler.job.Job.resume()
– apscheduler.schedulers.base.BaseScheduler.resume_job()
3.4. 获得job列表
获得调度作业的列表,可以使用 get_jobs() 来完成,它会返回所有的job实例。或者使用 print_jobs() 来输出所有格式化的作业列表。
3.5. 修改作业 job
可以通过apscheduler.job.Job.modify() or modify_job()来动态修改job的属性信息,除了job id无法修改之外,都是可以修改的。
job.modify(max_instances=6, name='Alternate name')
另外我们也可以通过apscheduler.job.Job.reschedule() or reschedule_job()动态重新设置trigger,示例如下:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
3.6. 关闭调度器
默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
scheduler.shutdown() scheduler.shutdown(wait=False)
四、 APScheduler的代码示例
这里使用装饰器来展示一个调度的使用:
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=3)
def timed_job():
print('This job is run every three minutes.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour='0-9', minute='30-59', second='*/3')
def scheduled_job():
print('This job is run every weekday at 5pm.')
print('before the start funciton')
sched.start()
print("let us figure out the situation")
代码说明:
在这段代码中,使用了当前进程中共享计算资源的BlockingScheduler,共使用了2个调度器,其中一个是间隔3秒的执行。
另外一个调度器是模仿cron来执行的,在周一到周五其间,每天的0点到9点直接,在30分到59分之间执行,执行频次为3秒。
基于正常代码的示例如下:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time
import logging
def job_function():
print "Hello World" + " " + str(datetime.datetime.now())
if __name__ == '__main__':
log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO) # DEBUG
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)
print('start to do it')
sched = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
sched.start()
五、某个异常问题的思考
在执行以下代码之时候,定时任务一直未能正常生效:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time
def job_function():
print "Hello World" + " " + str(datetime.datetime.now())
if __name__ == '__main__':
print('start to do it')
sched = BlockingScheduler()
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
sched.start()
代码报错的错误信息为:
No handlers could be found for logger “apscheduler.scheduler”
从字面意思来分析,是没有logging模块的logger存在,故需要添加上去即可。
新增对应的logging信息即可:
import logging
log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO) # DEBUG
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)
后来笔者重新做了一次执行,即使移除掉logging的内容,依然可以正常执行,故可以推测为需要动态引入一次依赖包logging即可。
六、总结
APScheduler是一个非常强大易用的类库,为了我们简单快捷的解决问题提供了很多的工具,并且提供了很多灵活的扩展点,只要你添加若干的web页面,就可以创建一个强大的任务调度系统,不是吗?
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。
# python
# 定时任务框架
# apscheduler
# 安装
# python任务调度框架
# Python实现定时任务
# Python3实现定时任务的四种方式
# python BlockingScheduler定时任务及其他方式的实现
# python 实现定时任务的四种方式
# Linux下Python脚本自启动与定时任务详解
# Python3.6 Schedule模块定时任务(实例讲解)
# 对Python定时任务的启动和停止方法详解
# 详解使用python crontab设置linux定时任务
# python Celery定时任务的示例
# Python中实现定时任务详解
# 使用了
# 移除
# 自己的
# 可以通过
# 执行器
# 另外一个
# 易用
# 要使
# 则需
# 都是
# 器中
# 几个
# 如果你
# 序列化
# 好了
# 由此可知
# 将会
# 两种
# 中有
# 其他的
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
如何在 Telegram Web View(iOS)中防止键盘遮挡底部输入框
谷歌浏览器如何更改浏览器主题 Google Chrome主题设置教程
b2c电商网站制作流程,b2c水平综合的电商平台?
齐河建站公司:营销型网站建设与SEO优化双核驱动策略
linux写shell需要注意的问题(必看)
Win11搜索不到蓝牙耳机怎么办 Win11蓝牙驱动更新修复【详解】
如何快速使用云服务器搭建个人网站?
如何在服务器上配置二级域名建站?
Python结构化数据采集_字段抽取解析【教程】
logo在线制作免费网站在线制作好吗,DW网页制作时,如何在网页标题前加上logo?
百度浏览器ai对话怎么关 百度浏览器ai聊天窗口隐藏
Laravel表单请求验证类怎么用_Laravel Form Request分离验证逻辑教程
如何用狗爹虚拟主机快速搭建网站?
Laravel策略(Policy)如何控制权限_Laravel Gates与Policies实现用户授权
JavaScript如何实现倒计时_时间函数如何精确控制
android nfc常用标签读取总结
Windows10电脑怎么查看硬盘通电时间_Win10使用工具检测磁盘健康
Laravel PHP版本要求一览_Laravel各版本环境要求对照
Laravel如何使用缓存系统提升性能_Laravel缓存驱动和应用优化方案
Laravel怎么在Blade中安全地输出原始HTML内容
Laravel的契約(Contracts)是什么_深入理解Laravel Contracts与依赖倒置
如何确认建站备案号应放置的具体位置?
html5如何实现懒加载图片_ intersectionobserver api用法【教程】
Laravel如何使用软删除(Soft Deletes)功能_Eloquent软删除与数据恢复方法
Laravel如何配置和使用队列处理异步任务_Laravel队列驱动与任务分发实例
西安专业网站制作公司有哪些,陕西省建行官方网站?
惠州网站建设制作推广,惠州市华视达文化传媒有限公司怎么样?
悟空识字怎么关闭自动续费_悟空识字取消会员自动扣费步骤
Windows10电脑怎么设置虚拟光驱_Win10右键装载ISO镜像文件
Laravel如何设置定时任务(Cron Job)_Laravel调度器与任务计划配置
微信小程序 HTTPS报错整理常见问题及解决方案
如何用JavaScript实现文本编辑器_光标和选区怎么处理
网站制作怎么样才能赚钱,用自己的电脑做服务器架设网站有什么利弊,能赚钱吗?
绝密ChatGPT指令:手把手教你生成HR无法拒绝的求职信
长沙企业网站制作哪家好,长沙水业集团官方网站?
Python高阶函数应用_函数作为参数说明【指导】
Laravel如何发送邮件_Laravel Mailables构建与发送邮件的简明教程
网站制作软件免费下载安装,有哪些免费下载的软件网站?
WEB开发之注册页面验证码倒计时代码的实现
Laravel怎么使用Intervention Image库处理图片上传和缩放
Laravel如何使用Service Provider注册服务_Laravel服务提供者配置与加载
如何快速配置高效服务器建站软件?
如何在IIS中新建站点并配置端口与IP地址?
Laravel怎么使用Markdown渲染文档_Laravel将Markdown内容转HTML页面展示【实战】
如何在沈阳梯子盘古建站优化SEO排名与功能模块?
Python面向对象测试方法_mock解析【教程】
如何快速生成ASP一键建站模板并优化安全性?
Laravel用户密码怎么加密_Laravel Hash门面使用教程
Laravel Asset编译怎么配置_Laravel Vite前端构建工具使用
大连企业网站制作公司,大连2025企业社保缴费网上缴费流程?

