2022-06-23 18:00:00+00:00

In high-throughput event-driven microservices, Apache Kafka is the industry standard for distributing messages. If your Python backend services use synchronous Kafka libraries (like confluent-kafka) inside an asynchronous web framework (like FastAPI or Sanic), the main event loop will block during message writes, degrading web server performance. aiokafka solves this by providing native asyncio support for Kafka consumer and producer loops.

By implementing non-blocking producer clients and message consumer loops, we can build scalable event-driven backends.


1. Writing the Asynchronous Kafka Producer

We initialize the producer inside FastAPI's startup event hooks and send events asynchronously during requests:

# kafka_producer.py
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
import json

app = FastAPI()
producer = None

@app.on_event("startup")
async def startup_event():
    global producer
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    await producer.start()

@app.on_event("shutdown")
async def shutdown_event():
    await producer.stop()

@app.post("/events/publish")
async def publish_event(topic: str, data: dict):
    payload = json.dumps(data).encode('utf-8')
    # Send message non-blockingly
    await producer.send_and_wait(topic, payload)
    return {"status": "published"}

2. Running the Async Consumer Loop

We run a background task loop that continuously consumes messages from a topic and processes them, maintaining system throughput.