雷达智富

首页 > 内容 > 程序笔记 > 正文

程序笔记

Python定时任务调度框架APScheduler详解

2024-08-24 25

APScheduler 是 Python 中一个功能强大且灵活的定时任务调度库,允许你以多种方式(如日期、时间间隔等)调度执行 Python 函数或代码块。它支持多种调度器(Schedulers),包括基于日期、间隔、CRON 表达式等多种方式。

APScheduler 官网地址https://apscheduler.readthedocs.io/en/3.x/

APScheduler 使用方法

安装 APScheduler:

你可以使用 pip 安装 APScheduler:

pip install apscheduler

APScheduler 代码示例:

以下示例演示了如何使用 APScheduler 创建一个简单的定时任务:

from apscheduler.schedulers.background import BackgroundScheduler

def my_job():
    print("Executing my job!")

# 创建一个后台调度器
scheduler = BackgroundScheduler()

# 添加一个定时任务,每隔5秒钟执行一次 my_job 函数
scheduler.add_job(my_job, 'interval', seconds=5)

# 开始调度器
scheduler.start()

# 让程序保持运行,触发定时任务
try:
    # 这里用一个循环来保持主线程的运行
    while True:
        pass
except (KeyboardInterrupt, SystemExit):
    # 捕获 Ctrl+C 或者其他退出信号来优雅地关闭调度器
    scheduler.shutdown()

这段代码创建了一个后台调度器(BackgroundScheduler),然后使用 add_job() 方法添加了一个定时任务,该任务每隔5秒钟执行一次 my_job() 函数。最后,使用 scheduler.start() 启动调度器,并用一个无限循环来保持主线程的运行,以便触发定时任务。

APScheduler 还支持更多高级功能,比如持久化存储、多种不同类型的触发器、多种调度器选择等等,可以根据实际需求进一步扩展和定制。

持久化存储

APScheduler 支持持久化存储来存储任务和调度程序的状态,以便在应用重启后恢复任务。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

# 使用 SQLAlchemyJobStore 进行持久化存储
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 创建一个线程池执行器
executors = {
    'default': ThreadPoolExecutor(10)
}

# 创建后台调度器,并指定 jobstores 和 executors
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)

# ... 添加和启动定时任务的代码

自定义触发器

使用自定义触发器,你可以创建基于自己需求的定时策略。

from apscheduler.triggers.base import BaseTrigger
from datetime import datetime, timedelta

class MyTrigger(BaseTrigger):
    def get_next_fire_time(self, previous_fire_time, now):
        return now + timedelta(seconds=5)  # 5秒后执行任务

# 创建一个后台调度器
scheduler = BackgroundScheduler()

# 使用自定义触发器创建定时任务
scheduler.add_job(my_job, MyTrigger())

# ... 启动调度器的代码

监听器

你可以添加监听器来监控任务的执行、异常和其他事件。

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.schedulers.background import BackgroundScheduler

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

# 创建一个后台调度器
scheduler = BackgroundScheduler()

# 添加监听器
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# ... 添加和启动定时任务的代码

这些APScheduler的高级功能可以帮助你更好地管理定时任务、监控任务的状态以及根据需求进行灵活的定制化。

更新于:22天前
赞一波!4

文章评论

全部评论