This repository provides code to integrate FastAPI with Taskiq easily.
Taskiq and FastAPI both use dependency injection, but they function differently. This library bridges the gap, allowing you to depend on fastapi.Request or starlette.requests.HTTPConnection inside your Taskiq tasks.
With this library, you can easily re-use your FastAPI dependencies (like database pools or config loaders) inside your Taskiq background functions.
Taskiq tasks usually run in a separate Worker process, not inside your FastAPI web server process. (It does not run within event loop)
When the Worker starts, this library initializes your FastAPI application in the background.
It creates a dummy Request object within the Worker. This allows functions that need FastAPI request context (like accessing app.state) to work identically in both the Web App and the Background Worker.
Note: The injected Request object in a task is NOT the original HTTP request from the user. It is a simulated request context solely for accessing application state.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends as FastAPIDepends
from pydantic import BaseModel
from redis.asyncio import ConnectionPool, Redis
from taskiq import TaskiqDepends, ZeroMQBroker
import taskiq_fastapi
broker = ZeroMQBroker()
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Creating redis pool")
app.state.redis_pool = ConnectionPool.from_url("redis://localhost")
#####################
# IMPORTANT NOTE #
#####################
# If you won't check that this is not
# a worker process, you'll
# create an infinite recursion. Because in worker processes
# fastapi startup will be called.
if not broker.is_worker_process:
print("Starting broker client")
await broker.startup()
yield # waits for shutdown
#####################
# IMPORTANT NOTE #
#####################
# Same as above
if not broker.is_worker_process:
print("Shutting down broker client")
await broker.shutdown()
print("Stopping redis pool")
await app.state.redis_pool.disconnect()
app = FastAPI(lifespan=lifespan)
# Here we call our magic function.
taskiq_fastapi.init(broker, "test_script:app")
# We use TaskiqDepends here, because if we use FastAPIDepends fastapi
# initialization will fail.
def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:
return request.app.state.redis_pool
@broker.task
async def my_redis_task(
key: str,
val: str,
# Here we depend using TaskiqDepends.
# Please use TaskiqDepends for all tasks to be resolved correctly.
# Or dependencies won't be injected.
pool: ConnectionPool = TaskiqDepends(get_redis_pool),
):
async with Redis(connection_pool=pool) as redis:
await redis.set(key, val)
print("Value set.")
class MyVal(BaseModel):
key: str
val: str
@app.post("/val")
async def setval_endpoint(val: MyVal) -> None:
await my_redis_task.kiq(key=val.key, val=val.val)
print("Task sent")
@app.get("/val")
async def getval_endpoint(
key: str,
pool: ConnectionPool = FastAPIDepends(get_redis_pool),
) -> str:
async with Redis(connection_pool=pool, decode_responses=True) as redis:
return await redis.get(key)- True for FastAPI server (uvicorn) → lifespan starts broker client so tasks can be sent.
- False for Taskiq worker → worker manages its own broker connection; lifespan still initializes dependencies.
Prevents double-starting the broker.
| Purpose | Use |
|---|---|
| Inside Taskiq tasks | TaskiqDepends |
| Inside HTTP routes | FastAPIDepends |
When using InMemoryBroker (often used for unit testing) it may be required to update the dependency context manually, as there is no separate worker process to trigger the initialization.
import taskiq_fastapi
from taskiq import InMemoryBroker
from fastapi import FastAPI
broker = InMemoryBroker()
app = FastAPI()
taskiq_fastapi.init(broker, "test_script:app")
taskiq_fastapi.populate_dependency_context(broker, app)The best way to deploy this system is to use the Single Artifact pattern. You build one Docker image that contains your entire codebase, and you run it with different commands to start the API or the Worker.
This Dockerfile copies your code and installs dependencies. It does not specify a CMD because we will set that in docker-compose.yml.
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .Notice how api and worker use the same build: . context but run different commands.
version: "3.8"
services:
api:
build: .
container_name: fastapi_app
command: uvicorn test_script:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats
worker:
build: .
container_name: taskiq_worker
command: taskiq worker test_script:broker
environment:
- NATS_URL=nats://nats:4222
depends_on:
- nats
nats:
image: nats:latest
ports:
- "4222:4222"docker-compose up --buildYou may also run it individually using docker run or integrate into the kubernetes environment.