All writing
fastapi background-tasks python async tutorial

Mastering FastAPI Background Tasks: Real‑World Patterns, Testing, and When to Reach for Celery

9 min read
Mastering FastAPI Background Tasks: Real‑World Patterns, Testing, and When to Reach for Celery

Introduction

If you’ve built a FastAPI endpoint that needs to send a confirmation email, generate a PDF, or kick off a long‑running data import, you probably reached for the fastapi background task feature. The BackgroundTasks class lets you offload work that doesn’t have to block the HTTP response, keeping your API snappy without pulling in a full‑blown task queue. In this post we’ll dive deep into the BackgroundTasks class, walk through common use‑cases, compare it with external workers like Celery, show you how to test and debug these tasks, and lay out best‑practice patterns and pitfalls to avoid.

TL;DR – Use BackgroundTasks for lightweight, fire‑and‑forget jobs that finish within a few seconds. For anything that may run minutes, needs retries, or must survive process restarts, reach for a proper queue (Celery, RQ, Dramatiq, etc.).


Understanding the BackgroundTasks Class

FastAPI ships a thin wrapper around Starlette’s BackgroundTask implementation. You inject a BackgroundTasks instance into your path operation function, call its .add_task() method with any callable, and FastAPI schedules the callable to run after the response is sent.

from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
# Imagine a heavy I/O operation, like writing to a remote syslog server
with open("audit.log", "a") as f:
f.write(message + "\n")
@app.post("/items/")
async def create_item(name: str, background_tasks: BackgroundTasks):
# Persist the item synchronously (e.g., DB write)
# ...
# Queue the log write without delaying the client
background_tasks.add_task(write_log, f"Created item {name}")
return {"msg": "Item created, audit log queued"}

How It Works Under the Hood

  1. Request lifecycle – FastAPI builds the response, sends it to the client, then calls await request.background() which iterates over the stored tasks.
  2. Execution context – The background coroutine runs in the same event loop thread as the request handler. This means you cannot use blocking I/O without run_in_threadpool or asyncio.to_thread.
  3. Error handling – Exceptions raised inside a background task are logged but do not affect the original response. If you need retry or failure alerts, you must catch them yourself.

Because the tasks share the same process, they inherit the same configuration (database session, settings, etc.). This is great for short‑lived jobs, but it also means you need to be careful about resource leaks—especially with async database sessions. See our article on FastAPI Session Leak Detection for a deeper dive.


Implementing Background Tasks for Common Use‑Cases

Below are three practical patterns you’ll encounter in production.

1️⃣ Sending Email Confirmation

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
import aiosmtplib
app = FastAPI()
class SignUp(BaseModel):
email: EmailStr
name: str
async def send_email(to: str, subject: str, body: str):
message = f"Subject: {subject}\n\n{body}"
await aiosmtplib.send(
message,
hostname="smtp.example.com",
port=587,
start_tls=True,
username="no-reply@example.com",
password="********",
sender="no-reply@example.com",
recipients=[to],
)
@app.post("/signup/")
async def signup(payload: SignUp, background_tasks: BackgroundTasks):
# Imagine we store the user in DB here
# ...
# Queue the email – fire‑and‑forget
background_tasks.add_task(
send_email,
to=payload.email,
subject="Welcome to LogicLoop!",
body=f"Hi {payload.name}, thanks for joining us!",
)
return {"msg": "Signup successful, welcome email is on its way"}

Why it works: aiosmtplib is fully async, so the background task doesn’t block the event loop. If you used a synchronous SMTP client, wrap it with asyncio.to_thread.

2️⃣ Processing Uploaded Files

Suppose you accept CSV uploads and need to ingest them into a database. The ingestion can take several seconds, but you don’t want the client to wait.

import csv
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from myproject.db import get_async_session # FastAPI dependency
app = FastAPI()
async def ingest_csv(file_path: str, session: AsyncSession):
async with aiofiles.open(file_path, mode="r") as f:
reader = csv.AsyncReader(f) # hypothetical async CSV reader
async for row in reader:
# Convert row to model and add to session
session.add(MyModel(**row))
await session.commit()
@app.post("/upload-csv/")
async def upload_csv(
file: UploadFile = File(...),
background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_async_session),
):
# Save to a temporary location
tmp_path = f"/tmp/{file.filename}"
async with aiofiles.open(tmp_path, "wb") as out_file:
content = await file.read()
await out_file.write(content)
# Pass the session into the background task (be careful!)
background_tasks.add_task(ingest_csv, tmp_path, session)
return {"msg": "CSV received, ingestion started in background"}

Important: The session object is tied to the request’s lifespan. In the background task we keep a reference, which can lead to “Session already closed” errors if the request finishes and the session is disposed. The safer approach is to create a fresh session inside the background coroutine or use a separate dependency that provides a new session for background work. See our guide on fixing async session errors (/modern-backend-building-high-performance-async-apis-with-fastapi-and-sqlalchemy-20/) for a concrete pattern.

3️⃣ Triggering a Remote API Call (e.g., Firebase Push)

import httpx
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
async def push_fcm(token: str, title: str, body: str):
payload = {"to": token, "notification": {"title": title, "body": body}}
async with httpx.AsyncClient() as client:
await client.post("https://fcm.googleapis.com/fcm/send", json=payload)
@app.post("/notify/")
async def notify_user(token: str, background_tasks: BackgroundTasks):
background_tasks.add_task(push_fcm, token, "Hello!", "You have a new message")
return {"msg": "Notification queued"}

If you need to sign the request with a server key, keep the secret out of the function’s signature and read it from environment variables inside push_fcm.


Comparing FastAPI Background Tasks with External Workers like Celery

FeatureBackgroundTasks (built‑in)Celery (or RQ/Dramatiq)
Process isolationSame process, same memorySeparate worker processes/containers
ReliabilityLost if the process crashes before executionPersisted in broker (Redis/RabbitMQ) → survives restarts
Retry & back‑offManual, ad‑hocBuilt‑in retry policies, exponential back‑off
Result storageNone (fire‑and‑forget)Can store results in backend, query later
ScalingLimited to the number of FastAPI workersHorizontal scaling of workers independent of API
ComplexityZero extra dependenciesRequires broker, worker daemon, monitoring
Typical use‑caseEmail, short file ops, logging, analytics pingsHeavy ML inference, batch imports, long video transcoding, periodic jobs

When to Reach for Celery

  • Job duration > 30 seconds – long tasks block the event loop or thread pool, degrading API latency.
  • Need for retries – network hiccups, temporary DB failures, or rate limits are common in external API calls.
  • Persistence required – If you must guarantee the job runs even after a crash, a broker-backed queue is the safe choice.
  • Multiple worker types – You may want dedicated CPU‑intensive workers (e.g., video encoding) separate from I/O‑bound API workers.

Hybrid Approach

You can start with BackgroundTasks for simplicity, then progressively migrate “hot” endpoints to Celery as they outgrow the built‑in limits. A common pattern is to expose a tiny wrapper endpoint that enqueues a Celery task, keeping the public API unchanged.

# FastAPI endpoint delegating to Celery
from fastapi import FastAPI
from myproject.celery_tasks import send_welcome_email
app = FastAPI()
@app.post("/signup/")
def signup(payload: SignUp):
# DB write...
send_welcome_email.delay(payload.email, payload.name) # Celery async call
return {"msg": "User created, email will be sent"}

Testing and Debugging Background Tasks

Unit Testing with TestClient

Since BackgroundTasks runs after the response, you can inspect the background attribute of the request object in tests.

from fastapi.testclient import TestClient
from myapp.main import app
client = TestClient(app)
def test_signup_triggers_email(monkeypatch):
called = {}
async def fake_send_email(to, subject, body):
called["to"] = to
called["subject"] = subject
monkeypatch.setattr("myapp.routes.send_email", fake_send_email)
response = client.post("/signup/", json={"email": "test@example.com", "name": "Test"})
assert response.status_code == 200
# The background task hasn't run yet, but we can force it:
background = response.request.scope["background"]
# Execute all queued tasks synchronously for the test
for task in background.tasks:
task.function(*task.args, **task.kwargs)
assert called["to"] == "test@example.com"

Integration Testing with a Real Event Loop

If you prefer to let the event loop execute the task naturally:

import asyncio
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_file_upload():
async with AsyncClient(app=app, base_url="http://test") as ac:
files = {"file": ("data.csv", b"a,b,c\n1,2,3\n")}
resp = await ac.post("/upload-csv/", files=files)
assert resp.status_code == 200
# Give the background task a moment to finish
await asyncio.sleep(0.5)
# Verify DB state, file existence, etc.

Debugging Tips

  • Logs inside tasks – Use logging.getLogger("fastapi.background") to differentiate background logs.
  • uvicorn --reload vs. production – In production (e.g., Gunicorn with multiple workers) each worker has its own background queue. A task queued on worker 1 will never run on worker 2. This can surface as “intermittent” behavior. See our post on why FastAPI works locally but fails in production (/why-your-fastapi-app-works-locally-but-fails-in-production/).
  • Database session leaks – If you notice “QueuePool limit reached” errors, you likely left an async session open in a background task. Ensure you close or commit the session inside the task, or create a fresh one.

Best Practices and Limitations

1. Keep Tasks Idempotent

Because there’s no built‑in retry, a failure may leave the system in an inconsistent state. Design the task so that re‑running it won’t cause duplicate side effects (e.g., sending a second email is acceptable, but inserting duplicate rows is not).

2. Use ThreadPool for Blocking Calls

If you must call a synchronous library (e.g., boto3 for S3), offload it:

from fastapi import BackgroundTasks
from fastapi.concurrency import run_in_threadpool
def upload_to_s3_sync(file_path: str, bucket: str):
import boto3
s3 = boto3.client("s3")
s3.upload_file(file_path, bucket, os.path.basename(file_path))
background_tasks.add_task(run_in_threadpool, upload_to_s3_sync, tmp_path, "my-bucket")

3. Limit Task Duration

A rule of thumb: keep background tasks under ~10 seconds. Longer tasks should be moved to a proper queue. If you exceed this, you’ll see increased memory usage per worker and risk timeouts under high load.

4. Graceful Shutdown

When the FastAPI process receives a SIGTERM (e.g., during a container restart), any pending background tasks are aborted. If you need graceful completion, implement a shutdown event that drains a custom queue or use a dedicated worker process.

@app.on_event("shutdown")
async def shutdown_background():
# Example: wait for all tasks in a custom asyncio.Queue
await my_queue.join()

5. Avoid Heavy Dependencies

Importing large ML libraries inside a background function can increase cold‑start latency. Lazy‑load them inside the task, not at module import time.

6. Security

Don’t pass raw user data into a background task that could be exploited later. Validate and sanitize before queuing.


Key Takeaways

  • BackgroundTasks is a lightweight, zero‑dependency way to fire‑and‑forget short jobs in FastAPI.
  • Use it for tasks like sending email, small file transformations, or pushing notifications—anything that finishes within a few seconds and doesn’t need retries.
  • For long‑running, retry‑aware, or persisted jobs, migrate to an external worker system such as Celery; the built‑in class is not a replacement for a full task queue.
  • Test background tasks by either executing the queued callables directly in unit tests or letting the async test client run the event loop with a short sleep.
  • Follow best practices: keep tasks idempotent, offload blocking I/O to a thread pool, limit duration, and be aware of session/connection leaks (see our session‑leak detection guide).

By understanding the strengths and limits of FastAPI’s native background support, you can build responsive APIs today while keeping a clear migration path to robust distributed workers as your application scales. Happy coding!

Working on something similar?

If you're building backend or AI systems and want a second set of senior eyes, let's talk.

Keep reading

Related articles