2021-08-14 08:51:25+00:00

Hardware clients (like routers or local gateways) continuously generate system logs (syslogs) documenting system state changes, authentication events, and process crashes. Ingesting syslog streams at scale requires a dedicated service that can parse raw payloads, extract error details, and log events to Cloud Datastore. Since syslog updates arrive continuously, the ingestion endpoint must be highly optimized to prevent thread starvation under load.

By writing high-performance FastAPI controllers in Python, we can parse RFC syslog formats and queue logs asynchronously using Task Queues.


1. RFC Syslog Parsing with Regular Expressions

We write a fast parsing library to extract priorities, process IDs, and messages from incoming UDP syslog payloads:

# syslog_parser.py
import re

# RFC 5424 Syslog regex parser
# E.g. <34>1 2026-06-03T10:00:00Z mydevice myapp 1234 ID45 - My log message
SYSLOG_REGEX = re.compile(
    r"^<(\d+)>\d+\s+[^\s]+\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+[^\s]+\s+-\s+(.*)$"
)

def parse_syslog(raw_line):
    match = SYSLOG_REGEX.match(raw_line)
    if not match:
        return None
        
    pri, hostname, app_name, pid, message = match.groups()
    priority = int(pri)
    facility = priority // 8
    severity = priority % 8
    
    return {
        "hostname": hostname,
        "app_name": app_name,
        "pid": pid,
        "severity": severity,
        "facility": facility,
        "message": message
    }

2. The Ingestion Controller

We define a FastAPI endpoint that receives log batches, validates formats, and enqueues tasks for database writing, ensuring low response latencies:

# main.py
from fastapi import FastAPI, Request, HTTPException
from google.cloud import tasks_v2
import json
from syslog_parser import parse_syslog

app = FastAPI()
client = tasks_v2.CloudTasksClient()
QUEUE_PATH = client.queue_path("my-project", "us-central1", "syslog-ingest-queue")

@app.post("/syslog/ingest")
async def ingest_syslog(request: Request):
    raw_body = await request.body()
    decoded_body = raw_body.decode("utf-8")
    
    parsed = parse_syslog(decoded_body)
    if not parsed:
        raise HTTPException(status_code=400, detail="Invalid Syslog format")
        
    # Queue task for asynchronous execution
    task = {
        "http_request": {
            "http_method": tasks_v2.HttpMethod.POST,
            "url": "https://service-dot-my-project.appspot.com/tasks/save-syslog",
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps(parsed).encode("utf-8")
        }
    }
    
    client.create_task(parent=QUEUE_PATH, task=task)
    return {"status": "enqueued"}