Aggregating logs and events from thousands of distributed application instances creates massive telemetry streams. If you configure backend microservices to write each incoming telemetry event directly to your database, the high concurrent request count will exhaust database connections, spike write latencies, and increase database costs. A resilient logging pipeline buffers telemetry streams asynchronously, using lightweight ingestion gates and batch updates.
By pairing Cloud Pub/Sub with serverless cloud functions, we can build a scalable ingestion pipeline that processes high-frequency telemetry events.
1. Lightweight Logging Gateway
The entry gate must return a success response immediately without waiting for database operations. We write a fast Python API handler that parses the log payload and publishes it directly to a queue topic:
# main.py
from fastapi import FastAPI, HTTPException, Request
from google.cloud import pubsub_v1
import json
app = FastAPI()
publisher = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path("my-gcp-project", "telemetry-events-topic")
@app.post("/ingest-log")
async def ingest_log(request: Request):
try:
body = await request.json()
payload = json.dumps(body).encode("utf-8")
# Publish event asynchronously
future = publisher.publish(TOPIC_PATH, payload)
# Block only to ensure the message was queued, not processed
future.result(timeout=2)
return {"status": "queued"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
2. The Serverless Batch Worker
A cloud function is triggered by incoming Pub/Sub message batches. It accumulates telemetry records in memory and performs bulk updates, minimizing database roundtrips:
# cloud_function.py
import base64
import json
from google.cloud import ndb
client = ndb.Client()
def process_telemetry_batch(event, context):
"""Background Cloud Function to batch-save queue records"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
data = json.loads(pubsub_message)
with client.context():
# Instantiate model entities
entities = []
for item in data.get("events", []):
entity = TelemetryLog(
device_id=item["device_id"],
event_type=item["type"],
details=item["details"]
)
entities.append(entity)
# Execute batch database write in a single operation
ndb.put_multi(entities)
print(f"Successfully batch-wrote {len(entities)} records to Datastore.")
3. Backpressure and Auto-Scaling
Utilizing Pub/Sub as an intermediate buffer decoupling ingestion from database updates prevents traffic spikes from bringing down your application. If ingestion rates double, the queue buffers events safely while backend function instances scale up gradually.