Operations 10 min read

APScheduler Tutorial: Installation, Basic Usage, Triggers, and Advanced Features

This article introduces the Python APScheduler library, covering installation, a basic interval example, various trigger types such as date and cron, parameterized and timezone-aware jobs, retry handling, dynamic job management, SQL job stores, multithreading, parallel execution, external event triggers, priority settings, and how to run the examples.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
APScheduler Tutorial: Installation, Basic Usage, Triggers, and Advanced Features

APScheduler is a powerful Python library for scheduling timed tasks, supporting multiple trigger types (date, interval, cron) and capable of running in single‑process or multi‑threaded environments.

Installation

pip install apscheduler

Basic Example

Define a simple task that prints a message every 5 seconds using BlockingScheduler . This blocks the main thread until the scheduler is stopped.

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 每隔 5 秒钟执行一次 print_message 函数
scheduler.add_job(print_message, 'interval', seconds=5)
# 启动调度器
scheduler.start()

If you prefer a non‑blocking scheduler, use BackgroundScheduler instead.

Using Different Triggers

1. Date Trigger (run once)

from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler

def print_message():
    print("Hello, world! Current time:", datetime.now())

scheduler = BlockingScheduler()
# 在指定的时间点执行任务
scheduler.add_job(print_message, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()

2. Cron Trigger (schedule like Linux cron)

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 每天的 10:30 执行一次任务
scheduler.add_job(print_message, 'cron', hour=10, minute=30)
scheduler.start()

Note: The cron example assumes the script runs before 10:30 each day; otherwise it will fire the next day.

Parameterized Tasks

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message(message):
    print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 每隔 5 秒执行一次并传递参数
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()

Complex Cron Expressions

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 每天 10:30 到 11:30 之间每 5 分钟执行一次
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5')
scheduler.start()

Cron with Timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime, timezone, timedelta
import pytz

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
job_defaults = {'coalesce': False, 'max_instances': 3}

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
# 指定时区为美国东部时间
scheduler.add_job(print_message, 'cron', hour='10-11', minute='*/5', timezone=pytz.timezone('US/Eastern'))
scheduler.start()

Job Retry Mechanism

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import time

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

def handle_job_error(event):
    print("Error occurred in job:", event.exception)
    print("Traceback:", event.traceback)

scheduler = BlockingScheduler()
# 添加错误监听器
scheduler.add_listener(handle_job_error, EVENT_JOB_ERROR)
# 设置任务重试次数
scheduler.add_job(print_message, 'interval', seconds=5, max_instances=1, misfire_grace_time=10, coalesce=True)
scheduler.start()

Dynamic Add/Remove Tasks

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message(message):
    print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 动态添加任务
scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
# 运行一段时间后移除任务
time.sleep(20)
scheduler.remove_job('job1')
scheduler.start()

Using SQL Database to Store Jobs

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import time

jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = BlockingScheduler(jobstores=jobstores)

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler.add_job(print_message, 'interval', seconds=5)
scheduler.start()

Multithread/Multiprocess Execution

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import time

executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
scheduler = BlockingScheduler(executors=executors)

def print_message(message):
    print("Message:", message, "Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler.add_job(print_message, 'interval', seconds=5, args=['Hello, world!'])
scheduler.start()

Parallel Execution of Multiple Tasks

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message1():
    print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

def print_message2():
    print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
scheduler.add_job(print_message1, 'interval', seconds=5)
scheduler.add_job(print_message2, 'interval', seconds=10)
scheduler.start()

External Event Triggered Task

from apscheduler.schedulers.blocking import BlockingScheduler
import threading, time

def print_message():
    print("Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

def trigger_event():
    event.set()

event = threading.Event()
scheduler = BlockingScheduler()
# 当外部事件被触发时执行任务
scheduler.add_job(print_message, 'interval', seconds=5, trigger='event', args=[event])
# 模拟外部事件触发
threading.Timer(10, trigger_event).start()
scheduler.start()

Task Priority

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def print_message1():
    print("Message 1: Hello, world! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

def print_message2():
    print("Message 2: Hello again! Current time:", time.strftime("%Y-%m-%d %H:%M:%S"))

scheduler = BlockingScheduler()
# 设置任务优先级
scheduler.add_job(print_message1, 'interval', seconds=5, id='job1', replace_existing=True, misfire_grace_time=60, priority=10)
scheduler.add_job(print_message2, 'interval', seconds=10, id='job2', replace_existing=True, misfire_grace_time=60, priority=5)
scheduler.start()

Running the Examples

Save any of the above snippets to a Python file (e.g., scheduler_example.py ) and execute it from the command line:

python scheduler_example.py
concurrencytask schedulingCronapschedulerJob Store
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.