FastAPI 定时任务全攻略:从入门到实战
2025.09.19 13:45浏览量:1简介:本文详细解析在 FastAPI 中设置定时任务的多种方法,涵盖 APScheduler 集成、Celery 分布式任务队列及自定义实现方案,提供完整代码示例与最佳实践建议。
FastAPI 定时任务全攻略:从入门到实战
一、为什么需要定时任务?
在 Web 开发中,定时任务是处理周期性任务的必备工具。典型应用场景包括:
FastAPI 作为现代 Python Web 框架,虽然本身不包含定时任务功能,但通过集成第三方库可以轻松实现。本文将系统介绍三种主流方案,帮助开发者根据业务需求选择最适合的实现方式。
二、方案一:APScheduler 轻量级集成
APScheduler 是 Python 生态中最流行的定时任务库之一,支持多种触发方式(间隔、日期、Cron 表达式)和多种作业存储后端。
1. 基础实现步骤
首先安装依赖:
pip install apscheduler
创建定时任务管理器:
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI
import logging
app = FastAPI()
# 配置日志
logging.basicConfig()
logging.getLogger("apscheduler").setLevel(logging.DEBUG)
scheduler = BackgroundScheduler()
scheduler.start()
def job_function():
print("定时任务执行中...")
# 添加定时任务(每10秒执行一次)
scheduler.add_job(job_function, "interval", seconds=10)
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
2. 高级功能实现
动态任务管理:
from apscheduler.job import Job
jobs_dict = {}
@app.post("/add-job/")
def add_job(interval: int):
job = scheduler.add_job(
job_function,
"interval",
seconds=interval,
id=f"job_{len(jobs_dict)+1}"
)
jobs_dict[job.id] = job
return {"message": "任务添加成功", "job_id": job.id}
@app.delete("/remove-job/{job_id}")
def remove_job(job_id: str):
if job_id in jobs_dict:
scheduler.remove_job(job_id)
del jobs_dict[job_id]
return {"message": "任务删除成功"}
return {"message": "任务不存在"}
Cron 表达式支持:
def weekly_report():
print("生成周报...")
scheduler.add_job(
weekly_report,
"cron",
day_of_week="mon",
hour=9,
minute=30
)
3. 最佳实践建议
- 异常处理:为定时任务添加 try-catch 块,防止单个任务失败导致整个调度器崩溃
- 任务去重:通过唯一 ID 机制避免重复添加相同任务
- 持久化存储:生产环境建议使用 SQLAlchemyJobStore 持久化任务
- 线程安全:BackgroundScheduler 默认在独立线程运行,注意线程安全问题
三、方案二:Celery 分布式任务队列
对于需要分布式处理的高并发定时任务,Celery 是更专业的选择。
1. 环境准备
安装必要组件:
pip install celery redis # 使用Redis作为消息代理
2. 完整实现示例
创建 Celery 应用:
from celery import Celery
from datetime import timedelta
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
# 配置定时任务
celery_app.conf.beat_schedule = {
"every-30-seconds": {
"task": "tasks.process_data",
"schedule": timedelta(seconds=30)
},
"daily-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=9, minute=0) # 每天9点执行
}
}
@celery_app.task
def process_data():
print("处理数据...")
return "完成"
@celery_app.task
def generate_report():
print("生成日报...")
return "日报已生成"
FastAPI 集成:
from fastapi import FastAPI
from celery.result import AsyncResult
app = FastAPI()
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
return {
"status": result.status,
"result": result.result
}
3. 生产环境配置要点
- worker 启动:
celery -A tasks worker --loglevel=info
- beat 启动:
celery -A tasks beat --loglevel=info
- 监控:集成 Flower 进行任务监控
- 重试机制:配置
task_routes
和task_reject_on_worker_lost
四、方案三:自定义实现(基于 asyncio)
对于简单场景,可以使用 asyncio 实现轻量级定时任务。
1. 基础实现
import asyncio
from fastapi import FastAPI
app = FastAPI()
async def periodic_task():
while True:
print("异步定时任务执行...")
await asyncio.sleep(5) # 每5秒执行一次
@app.on_event("startup")
async def startup_event():
asyncio.create_task(periodic_task())
2. 动态任务管理
from typing import Dict
tasks: Dict[str, asyncio.Task] = {}
@app.post("/start-async-task/{task_id}")
async def start_async_task(task_id: str, interval: int):
async def task_func():
while True:
print(f"任务 {task_id} 执行中...")
await asyncio.sleep(interval)
if task_id not in tasks:
tasks[task_id] = asyncio.create_task(task_func())
return {"message": "任务启动成功"}
@app.delete("/stop-async-task/{task_id}")
async def stop_async_task(task_id: str):
if task_id in tasks:
tasks[task_id].cancel()
del tasks[task_id]
return {"message": "任务停止成功"}
return {"message": "任务不存在"}
五、方案对比与选型建议
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
APScheduler | 单机定时任务 | 配置简单,支持多种触发器 | 非分布式,单机性能有限 |
Celery | 分布式定时任务 | 分布式处理,可扩展性强 | 配置复杂,依赖外部消息队列 |
自定义 asyncio | 简单异步定时任务 | 无需额外依赖,控制灵活 | 功能有限,不适合复杂场景 |
选型建议:
- 小型项目/开发环境:APScheduler
- 大型分布式系统:Celery
- 微服务架构中的简单任务:自定义 asyncio
六、常见问题解决方案
定时任务不执行:
- 检查调度器是否启动
- 验证日志级别是否设置为 DEBUG
- 检查任务函数是否可能抛出未捕获异常
任务重复执行:
- 确保为任务分配唯一 ID
- 检查是否有多个调度器实例
时区问题:
from pytz import timezone
scheduler.timezone = timezone("Asia/Shanghai")
Docker 部署注意事项:
- 确保容器时区配置正确
- 对于 Celery,需要同时运行 worker 和 beat 容器
七、进阶技巧
任务依赖管理:
from apscheduler.job import Job
def dependent_job():
print("依赖任务执行...")
def main_job():
print("主任务执行前检查依赖...")
# 这里可以添加依赖检查逻辑
dependent_job()
print("主任务执行...")
scheduler.add_job(main_job, "interval", minutes=1)
任务结果持久化:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.db")
}
scheduler = BackgroundScheduler(jobstores=jobstores)
动态修改任务间隔:
@app.put("/update-interval/{job_id}")
def update_interval(job_id: str, new_interval: int):
if job_id in jobs_dict:
scheduler.modify_job(job_id, trigger="interval", seconds=new_interval)
return {"message": "间隔更新成功"}
return {"message": "任务不存在"}
八、总结
FastAPI 中的定时任务实现有多种选择,开发者应根据项目规模、分布式需求和复杂度要求进行选型。APScheduler 适合大多数中小型项目,Celery 适合需要分布式处理的大型系统,而自定义 asyncio 实现则适合简单的异步任务场景。
在实际开发中,建议:
- 为所有定时任务添加完善的日志记录
- 实现任务执行结果的持久化存储
- 添加适当的监控和告警机制
- 考虑任务执行的幂等性设计
通过合理选择和配置定时任务方案,可以显著提升 FastAPI 应用的自动化能力和运维效率。
发表评论
登录后可评论,请前往 登录 或 注册