logo

FastAPI 定时任务全攻略:从入门到实战

作者:梅琳marlin2025.09.19 13:45浏览量:1

简介:本文详细解析在 FastAPI 中设置定时任务的多种方法,涵盖 APScheduler 集成、Celery 分布式任务队列及自定义实现方案,提供完整代码示例与最佳实践建议。

FastAPI 定时任务全攻略:从入门到实战

一、为什么需要定时任务?

在 Web 开发中,定时任务是处理周期性任务的必备工具。典型应用场景包括:

  • 数据同步与备份(如每日数据库备份)
  • 消息队列消费(如定期处理未读消息)
  • 缓存清理(如每小时清理过期缓存)
  • 通知发送(如每周发送周报邮件)
  • 监控告警(如每5分钟检查系统健康状态)

FastAPI 作为现代 Python Web 框架,虽然本身不包含定时任务功能,但通过集成第三方库可以轻松实现。本文将系统介绍三种主流方案,帮助开发者根据业务需求选择最适合的实现方式。

二、方案一:APScheduler 轻量级集成

APScheduler 是 Python 生态中最流行的定时任务库之一,支持多种触发方式(间隔、日期、Cron 表达式)和多种作业存储后端。

1. 基础实现步骤

首先安装依赖:

  1. pip install apscheduler

创建定时任务管理器:

  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from fastapi import FastAPI
  3. import logging
  4. app = FastAPI()
  5. # 配置日志
  6. logging.basicConfig()
  7. logging.getLogger("apscheduler").setLevel(logging.DEBUG)
  8. scheduler = BackgroundScheduler()
  9. scheduler.start()
  10. def job_function():
  11. print("定时任务执行中...")
  12. # 添加定时任务(每10秒执行一次)
  13. scheduler.add_job(job_function, "interval", seconds=10)
  14. @app.on_event("shutdown")
  15. def shutdown_event():
  16. scheduler.shutdown()

2. 高级功能实现

动态任务管理

  1. from apscheduler.job import Job
  2. jobs_dict = {}
  3. @app.post("/add-job/")
  4. def add_job(interval: int):
  5. job = scheduler.add_job(
  6. job_function,
  7. "interval",
  8. seconds=interval,
  9. id=f"job_{len(jobs_dict)+1}"
  10. )
  11. jobs_dict[job.id] = job
  12. return {"message": "任务添加成功", "job_id": job.id}
  13. @app.delete("/remove-job/{job_id}")
  14. def remove_job(job_id: str):
  15. if job_id in jobs_dict:
  16. scheduler.remove_job(job_id)
  17. del jobs_dict[job_id]
  18. return {"message": "任务删除成功"}
  19. return {"message": "任务不存在"}

Cron 表达式支持

  1. def weekly_report():
  2. print("生成周报...")
  3. scheduler.add_job(
  4. weekly_report,
  5. "cron",
  6. day_of_week="mon",
  7. hour=9,
  8. minute=30
  9. )

3. 最佳实践建议

  1. 异常处理:为定时任务添加 try-catch 块,防止单个任务失败导致整个调度器崩溃
  2. 任务去重:通过唯一 ID 机制避免重复添加相同任务
  3. 持久化存储:生产环境建议使用 SQLAlchemyJobStore 持久化任务
  4. 线程安全:BackgroundScheduler 默认在独立线程运行,注意线程安全问题

三、方案二:Celery 分布式任务队列

对于需要分布式处理的高并发定时任务,Celery 是更专业的选择。

1. 环境准备

安装必要组件:

  1. pip install celery redis # 使用Redis作为消息代理

2. 完整实现示例

创建 Celery 应用

  1. from celery import Celery
  2. from datetime import timedelta
  3. celery_app = Celery(
  4. "tasks",
  5. broker="redis://localhost:6379/0",
  6. backend="redis://localhost:6379/1"
  7. )
  8. # 配置定时任务
  9. celery_app.conf.beat_schedule = {
  10. "every-30-seconds": {
  11. "task": "tasks.process_data",
  12. "schedule": timedelta(seconds=30)
  13. },
  14. "daily-report": {
  15. "task": "tasks.generate_report",
  16. "schedule": crontab(hour=9, minute=0) # 每天9点执行
  17. }
  18. }
  19. @celery_app.task
  20. def process_data():
  21. print("处理数据...")
  22. return "完成"
  23. @celery_app.task
  24. def generate_report():
  25. print("生成日报...")
  26. return "日报已生成"

FastAPI 集成

  1. from fastapi import FastAPI
  2. from celery.result import AsyncResult
  3. app = FastAPI()
  4. @app.get("/task/{task_id}")
  5. def get_task_status(task_id: str):
  6. result = AsyncResult(task_id, app=celery_app)
  7. return {
  8. "status": result.status,
  9. "result": result.result
  10. }

3. 生产环境配置要点

  1. worker 启动celery -A tasks worker --loglevel=info
  2. beat 启动celery -A tasks beat --loglevel=info
  3. 监控:集成 Flower 进行任务监控
  4. 重试机制:配置 task_routestask_reject_on_worker_lost

四、方案三:自定义实现(基于 asyncio)

对于简单场景,可以使用 asyncio 实现轻量级定时任务。

1. 基础实现

  1. import asyncio
  2. from fastapi import FastAPI
  3. app = FastAPI()
  4. async def periodic_task():
  5. while True:
  6. print("异步定时任务执行...")
  7. await asyncio.sleep(5) # 每5秒执行一次
  8. @app.on_event("startup")
  9. async def startup_event():
  10. asyncio.create_task(periodic_task())

2. 动态任务管理

  1. from typing import Dict
  2. tasks: Dict[str, asyncio.Task] = {}
  3. @app.post("/start-async-task/{task_id}")
  4. async def start_async_task(task_id: str, interval: int):
  5. async def task_func():
  6. while True:
  7. print(f"任务 {task_id} 执行中...")
  8. await asyncio.sleep(interval)
  9. if task_id not in tasks:
  10. tasks[task_id] = asyncio.create_task(task_func())
  11. return {"message": "任务启动成功"}
  12. @app.delete("/stop-async-task/{task_id}")
  13. async def stop_async_task(task_id: str):
  14. if task_id in tasks:
  15. tasks[task_id].cancel()
  16. del tasks[task_id]
  17. return {"message": "任务停止成功"}
  18. return {"message": "任务不存在"}

五、方案对比与选型建议

方案 适用场景 优点 缺点
APScheduler 单机定时任务 配置简单,支持多种触发器 非分布式,单机性能有限
Celery 分布式定时任务 分布式处理,可扩展性强 配置复杂,依赖外部消息队列
自定义 asyncio 简单异步定时任务 无需额外依赖,控制灵活 功能有限,不适合复杂场景

选型建议

  1. 小型项目/开发环境:APScheduler
  2. 大型分布式系统:Celery
  3. 微服务架构中的简单任务:自定义 asyncio

六、常见问题解决方案

  1. 定时任务不执行

    • 检查调度器是否启动
    • 验证日志级别是否设置为 DEBUG
    • 检查任务函数是否可能抛出未捕获异常
  2. 任务重复执行

    • 确保为任务分配唯一 ID
    • 检查是否有多个调度器实例
  3. 时区问题

    1. from pytz import timezone
    2. scheduler.timezone = timezone("Asia/Shanghai")
  4. Docker 部署注意事项

    • 确保容器时区配置正确
    • 对于 Celery,需要同时运行 worker 和 beat 容器

七、进阶技巧

  1. 任务依赖管理

    1. from apscheduler.job import Job
    2. def dependent_job():
    3. print("依赖任务执行...")
    4. def main_job():
    5. print("主任务执行前检查依赖...")
    6. # 这里可以添加依赖检查逻辑
    7. dependent_job()
    8. print("主任务执行...")
    9. scheduler.add_job(main_job, "interval", minutes=1)
  2. 任务结果持久化

    1. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    2. jobstores = {
    3. "default": SQLAlchemyJobStore(url="sqlite:///jobs.db")
    4. }
    5. scheduler = BackgroundScheduler(jobstores=jobstores)
  3. 动态修改任务间隔

    1. @app.put("/update-interval/{job_id}")
    2. def update_interval(job_id: str, new_interval: int):
    3. if job_id in jobs_dict:
    4. scheduler.modify_job(job_id, trigger="interval", seconds=new_interval)
    5. return {"message": "间隔更新成功"}
    6. return {"message": "任务不存在"}

八、总结

FastAPI 中的定时任务实现有多种选择,开发者应根据项目规模、分布式需求和复杂度要求进行选型。APScheduler 适合大多数中小型项目,Celery 适合需要分布式处理的大型系统,而自定义 asyncio 实现则适合简单的异步任务场景。

在实际开发中,建议:

  1. 为所有定时任务添加完善的日志记录
  2. 实现任务执行结果的持久化存储
  3. 添加适当的监控和告警机制
  4. 考虑任务执行的幂等性设计

通过合理选择和配置定时任务方案,可以显著提升 FastAPI 应用的自动化能力和运维效率。

相关文章推荐

发表评论