如何让FastAPI与Celery完美联姻,打造高效异步任务处理系统?

如何让FastAPI与Celery完美联姻,打造高效异步任务处理系统?

cmdragon_cn.png cmdragon_cn.png

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/

  1. Celery基础概念
    Celery架构由三部分组成:客户端(发送任务)、消息代理(存储任务队列)、工作者(执行任务)。典型的消息代理选择包括Redis(支持6379端口)和RabbitMQ(默认使用5672端口)。任务结果存储建议使用Redis或关系型数据库,需在配置中明确指定backend参数。

  2. FastAPI与Celery集成步骤

graph TD; A(用户请求) --> B(FastAPI路由); B --> C(任务入队); C --> D(消息队列); D --> E(Celery Worker处理); E --> F(结果存储); F --> G(客户端查询);
  1. 代码实现与解析
    安装依赖:
pip install fastapi==0.103.2 celery==5.3.4 redis==4.5.5 uvicorn==0.23.2 pydantic==2.5.2

项目结构:

project/
├── main.py
├── celery_app.py
└── tasks.py

核心代码示例:

# celery_app.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["project.tasks"]
)
celery_app.conf.update(task_track_started=True)

# tasks.py
from .celery_app import celery_app


@celery_app.task
def process_data(data: dict) -> dict:
    """模拟耗时数据处理任务"""
    import time
    time.sleep(5)
    return {"result": f"Processed {data["input"]}"}


# main.py
from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_data

app = FastAPI()


class RequestData(BaseModel):
    input: str
    priority: int = 1


@app.post("/submit-task")
async def submit_task(data: RequestData):
    """提交异步任务接口"""
    task = process_data.apply_async(
        kwargs={"data": data.dict()},
        priority=data.priority
    )
    return {"task_id": task.id}
  1. 任务状态查询实现
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """任务状态查询接口"""
    from celery.result import AsyncResult
    result = AsyncResult(task_id, app=celery_app)
    return {
        "status": result.status,
        "result": result.result if result.ready() else None
    }
  1. 课后Quiz
    Q1:为什么需要为Celery配置不同的Redis数据库作为broker和backend?
    A1:区分存储空间,避免任务队列与结果数据相互干扰。broker的0号库存储待处理任务,backend的1号库保存任务状态和结果。

Q2:如何处理长时间运行的任务超时问题?
A2:在任务装饰器中设置soft_time_limit参数,例如@task(soft_time_limit=300),同时配置worker的--maxtasksperchild参数限制最大任务数。

  1. 常见报错解决方案
    错误现象:Worker收到任务但未执行
    检查要点:
  2. 确认Redis服务运行状态:redis-cli ping应返回PONG
  3. 检查任务模块是否包含在配置中:celery_app的include参数需正确指向tasks模块
  4. 验证任务函数是否正确定义:使用@celery_app.task装饰器

错误现象:任务结果无法获取
解决方案:

  1. 检查backend配置是否正确

  2. 确认任务执行完成(状态为SUCCESS)

  3. 检查Redis存储空间是否充足

  4. 任务优先级配置
    在启动worker时指定队列:

celery -A project.celery_app worker -Q high_priority,default -l INFO

接口调用时指定优先级:

process_data.apply_async(
    kwargs={"data": data},
    queue="high_priority" if data.priority > 5 else "default"
)

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何让FastAPI与Celery完美联姻,打造高效异步任务处理系统?

往期文章归档:

  • 如何用WebSocket打造毫秒级实时协作系统? - cmdragon"s Blog
  • 如何让你的WebSocket连接既安全又高效?
  • 如何让多客户端会话管理不再成为你的技术噩梦? - cmdragon"s Blog
  • 如何在FastAPI中玩转WebSocket消息处理?
  • 如何在FastAPI中玩转WebSocket,让实时通信不再烦恼? - cmdragon"s Blog
  • WebSocket与HTTP协议究竟有何不同?FastAPI如何让长连接变得如此简单? - cmdragon"s Blog
  • FastAPI如何玩转安全防护,让黑客望而却步?
  • 如何用三层防护体系打造坚不可摧的 API 安全堡垒? - cmdragon"s Blog
  • FastAPI安全加固:密钥轮换、限流策略与安全头部如何实现三重防护? - cmdragon"s Blog
  • 如何在FastAPI中巧妙玩转数据脱敏,让敏感信息安全无忧? - cmdragon"s Blog
  • RBAC权限模型如何让API访问控制既安全又灵活? - cmdragon"s Blog
  • FastAPI中的敏感数据如何在不泄露的情况下翩翩起舞?
  • FastAPI安全认证的终极秘籍:OAuth2与JWT如何完美融合? - cmdragon"s Blog
  • 如何在FastAPI中打造坚不可摧的Web安全防线? - cmdragon"s Blog
  • 如何用 FastAPI 和 RBAC 打造坚不可摧的安全堡垒? - cmdragon"s Blog
  • FastAPI权限配置:你的系统真的安全吗? - cmdragon"s Blog
  • FastAPI权限缓存:你的性能瓶颈是否藏在这只“看不见的手”里? | cmdragon"s Blog
  • FastAPI日志审计:你的权限系统是否真的安全无虞? | cmdragon"s Blog
  • 如何在FastAPI中打造坚不可摧的安全防线? | cmdragon"s Blog
  • 如何在FastAPI中实现权限隔离并让用户乖乖听话? | cmdragon"s Blog
  • 如何在FastAPI中玩转权限控制与测试,让代码安全又优雅? | cmdragon"s Blog
  • 如何在FastAPI中打造一个既安全又灵活的权限管理系统? | cmdragon"s Blog
  • FastAPI访问令牌的权限声明与作用域管理:你的API安全真的无懈可击吗? | cmdragon"s Blog
  • 如何在FastAPI中构建一个既安全又灵活的多层级权限系统? | cmdragon"s Blog
  • FastAPI如何用角色权限让Web应用安全又灵活? | cmdragon"s Blog
  • FastAPI权限验证依赖项究竟藏着什么秘密? | cmdragon"s Blog
  • 如何用FastAPI和Tortoise-ORM打造一个既高效又灵活的角色管理系统? | cmdragon"s Blog
  • JWT令牌如何在FastAPI中实现安全又高效的生成与验证? | cmdragon"s Blog
  • 你的密码存储方式是否在向黑客招手? | cmdragon"s Blog
  • 如何在FastAPI中轻松实现OAuth2认证并保护你的API? | cmdragon"s Blog
  • FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍 | cmdragon"s Blog
  • FastAPI认证系统:从零到令牌大师的奇幻之旅 | cmdragon"s Blog
  • FastAPI安全异常处理:从401到422的奇妙冒险 | cmdragon"s Blog
  • FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍 | cmdragon"s Blog
  • JWT令牌:从身份证到代码防伪的奇妙之旅 | cmdragon"s Blog
  • FastAPI安全认证:从密码到令牌的魔法之旅 | cmdragon"s Blog

免费好用的热门在线工具

  • CMDragon 在线工具 - 高级AI工具箱与开发者套件 | 免费好用的在线工具
  • 应用商店 - 发现1000+提升效率与开发的AI工具和实用程序 | 免费好用的在线工具
  • CMDragon 更新日志 - 最新更新、功能与改进 | 免费好用的在线工具
  • 支持我们 - 成为赞助者 | 免费好用的在线工具
  • AI文本生成图像 - 应用商店 | 免费好用的在线工具
  • 临时邮箱 - 应用商店 | 免费好用的在线工具
  • 二维码解析器 - 应用商店 | 免费好用的在线工具
  • 文本转思维导图 - 应用商店 | 免费好用的在线工具
  • 正则表达式可视化工具 - 应用商店 | 免费好用的在线工具
  • 文件隐写工具 - 应用商店 | 免费好用的在线工具
  • IPTV 频道探索器 - 应用商店 | 免费好用的在线工具
  • 快传 - 应用商店 | 免费好用的在线工具
  • 随机抽奖工具 - 应用商店 | 免费好用的在线工具
  • 动漫场景查找器 - 应用商店 | 免费好用的在线工具
  • 时间工具箱 - 应用商店 | 免费好用的在线工具
  • 网速测试 - 应用商店 | 免费好用的在线工具
  • AI 智能抠图工具 - 应用商店 | 免费好用的在线工具
  • 背景替换工具 - 应用商店 | 免费好用的在线工具
  • 艺术二维码生成器 - 应用商店 | 免费好用的在线工具
  • Open Graph 元标签生成器 - 应用商店 | 免费好用的在线工具
  • 图像对比工具 - 应用商店 | 免费好用的在线工具
  • 图片压缩专业版 - 应用商店 | 免费好用的在线工具
  • 密码生成器 - 应用商店 | 免费好用的在线工具
  • SVG优化器 - 应用商店 | 免费好用的在线工具
  • 调色板生成器 - 应用商店 | 免费好用的在线工具
  • 在线节拍器 - 应用商店 | 免费好用的在线工具
  • IP归属地查询 - 应用商店 | 免费好用的在线工具
  • CSS网格布局生成器 - 应用商店 | 免费好用的在线工具
  • 邮箱验证工具 - 应用商店 | 免费好用的在线工具
  • 书法练习字帖 - 应用商店 | 免费好用的在线工具
  • 金融计算器套件 - 应用商店 | 免费好用的在线工具
  • 中国亲戚关系计算器 - 应用商店 | 免费好用的在线工具
  • Protocol Buffer 工具箱 - 应用商店 | 免费好用的在线工具
  • IP归属地查询 - 应用商店 | 免费好用的在线工具
  • 图片无损放大 - 应用商店 | 免费好用的在线工具
  • 文本比较工具 - 应用商店 | 免费好用的在线工具
  • IP批量查询工具 - 应用商店 | 免费好用的在线工具
  • 域名查询工具 - 应用商店 | 免费好用的在线工具
  • DNS工具箱 - 应用商店 | 免费好用的在线工具
  • 网站图标生成器 - 应用商店 | 免费好用的在线工具
  • XML Sitemap

原文地址:https://www.cnblogs.com/Amd794/p/18979675