In high-throughput event-driven microservices, Apache Kafka is the industry standard for distributing messages. If your Python backend services use synchronous Kafka libraries (like confluent-kafka) inside an asynchronous web framework (like FastAPI or Sanic), the main event loop will block during message writes, degrading web server performance. aiokafka solves this by providing native asyncio support for Kafka consumer and producer loops.
By implementing non-blocking producer clients and message consumer loops, we can build scalable event-driven backends.
1. Writing the Asynchronous Kafka Producer
We initialize the producer inside FastAPI's startup event hooks and send events asynchronously during requests:
# kafka_producer.py
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
import json
app = FastAPI()
producer = None
@app.on_event("startup")
async def startup_event():
global producer
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
await producer.start()
@app.on_event("shutdown")
async def shutdown_event():
await producer.stop()
@app.post("/events/publish")
async def publish_event(topic: str, data: dict):
payload = json.dumps(data).encode('utf-8')
# Send message non-blockingly
await producer.send_and_wait(topic, payload)
return {"status": "published"}
2. Running the Async Consumer Loop
We run a background task loop that continuously consumes messages from a topic and processes them, maintaining system throughput.