Task Scheduler¶
The TaskScheduler
module provides an asynchronous, cron-capable task scheduling system for Python. It supports both recurring (cron) and one-time tasks, with thread-safe execution, task management, and flexible logging. This is ideal for background jobs, periodic maintenance, or any workflow requiring scheduled execution.
Why?¶
Scheduling background tasks is a common requirement, but implementing a robust, thread-safe, and flexible scheduler is non-trivial. The TaskScheduler
module abstracts away the complexity of:
- Managing task state and concurrency
- Supporting both synchronous and asynchronous functions
- Handling cron expressions and time zones
- Providing hooks for logging and task removal
Example of scheduling a simple task:
from escudeiro.ds.scheduler import TaskScheduler, CronInfo
import asyncio
async def my_job():
print("Running my job!")
scheduler = TaskScheduler()
cron = CronInfo(cronexpr="*/5 * * * *") # Every 5 minutes
scheduler.schedule_task(my_job, cron_expr=cron, run_at=None)
asyncio.run(scheduler.start())
Features¶
- Asynchronous scheduling loop with support for both sync and async tasks
- Cron-based and one-time scheduling
- Per-task locking for safe concurrent execution
- Automatic rescheduling and removal
- Customizable logging and callbacks
- Task querying and management API
Usage¶
Scheduling a Recurring Task¶
from escudeiro.ds.scheduler import TaskScheduler, CronInfo
def my_task():
print("Hello from scheduled task!")
scheduler = TaskScheduler()
cron = CronInfo(cronexpr="0 * * * *") # Every hour
scheduler.schedule_task(my_task, cron_expr=cron, run_at=None)
Scheduling a One-Time Task¶
from escudeiro.ds.scheduler import TaskScheduler
from datetime import datetime, timedelta
def one_time():
print("This runs once!")
scheduler = TaskScheduler()
run_at = datetime.now() + timedelta(minutes=10)
scheduler.schedule_task(one_time, cron_expr=None, run_at=run_at)
Starting and Stopping the Scheduler¶
import asyncio
async def main():
scheduler = TaskScheduler()
# ... schedule tasks ...
await scheduler.start() # Runs until stopped
# To stop:
# await scheduler.stop()
Removing Tasks¶
async def on_remove(task_id):
print(f"Task {task_id} removed.")
await scheduler.remove_task(task_id, callback=on_remove)
API Reference¶
Classes¶
Task
¶
Represents a scheduled task.
- Attributes:
id_
: Unique task IDfunc
: Callable (sync or async)cron
: Cron expression (or None)next_run
: Next scheduled run timename
: Optional task nameis_running
: Execution stateremove_callback
: Async callback on removal
CronInfo
¶
Container for cron scheduling info.
- Attributes:
cronexpr
: Cron expression stringtimezone
: Timezone (defaults to local)
TaskScheduler
¶
Main scheduler class.
- Attributes:
default_sleep_interval
: Sleep time when idle-
log_info
,log_warning
,log_exception
: Logging callbacks -
Methods:
schedule_task(func, cron_expr, run_at, ...)
: Schedule a new taskis_scheduled(task_id)
: Check if task existsget_task(task_id)
: Retrieve a tasktask_ids()
: List all task IDsremove_task(task_id, callback)
: Remove a task by IDremove_by_name(name, callback)
: Remove task(s) by namestart()
: Start the scheduler loopstop()
: Stop the scheduleraclose()
: Async cleanup (alias forstop()
)
Notes¶
- Both synchronous and asynchronous functions are supported as tasks.
- Use
CronInfo
for recurring tasks, orrun_at
for one-time tasks (not both). - The scheduler is designed for use in asyncio-based applications.
- Logging and removal callbacks are fully customizable.