For an AI agent to become a true long-term assistant, it needs memory. When an agent chats with a developer, it must extract important facts (e.g., "The project uses the staging cluster at port 8443" or "The user prefers HSL tailored CSS templates") and store them for future retrieval.
However, saving facts into a semantic database is a highly expensive operations.
To save a single fact, the memory platform must:
- Classify the context to filter out noise.
- Generate vector embeddings using heavy machine learning models.
- Build a semantic graph of associations to link the fact to existing concepts.
If an AI agent blocks and waits for these steps to complete in real-time, the user experience falls apart. Stripe webhooks will time out, agents will hit timeout thresholds, and chat responses will feel sluggish.
To solve this latency bottleneck, we must decouple memory writes. This article details how to build an Asynchronous Memory Curation Engine using NATS message queues and a FalkorDB knowledge graph, inspired by the high-performance memory architecture in Segnog.
🏗️ The Decoupled Architecture
The core engineering strategy is simple: never perform calculations on the request path.
When the agent sends a "save memory" command via the Model Context Protocol (MCP), the ingestion gateway writes the raw payload to a high-speed message queue (NATS) and immediately returns a 200 OK ("Saved!") confirmation back to the agent in under 10 milliseconds.
A detached background worker consumes events from the queue, executes the resource-heavy embedding generation (using local CPU-bound Google Gemma models), and inserts the semantic nodes into the knowledge graph (FalkorDB).
🛠️ The Ingestion Layer: Ultra-Fast Message Buffering
The ingestion API runs a clean, light HTTP server. When it receives a POST request containing a new memory fact, it writes the payload to a designated NATS subject (memory.facts.ingest) and closes the connection with an immediate success response.
Below is the implementation of the fast Express ingestion endpoint:
// server.js (Segnog Fast Ingestion Gateway)
import express from 'express';
import { connect } from 'nats';
const app = express();
app.use(express.json());
// Initialize connection to local NATS broker
const nc = await connect({ servers: "nats://127.0.0.1:4222" });
const jc = JSON.createCodec();
app.post('/mcp/memory', async (req, res) => {
const { fact, sessionId, userId } = req.body;
if (!fact || !sessionId) {
return res.status(400).json({ error: "Missing required parameters" });
}
const payload = {
fact,
sessionId,
userId: userId || 'anonymous',
timestamp: Date.now()
};
try {
// Publish message to NATS broker instantly
nc.publish('memory.facts.ingest', jc.encode(payload));
// Respond back to Agent immediately (sub-10ms)
return res.status(202).json({ status: "queued", message: "Memory ingestion queued successfully." });
} catch (err) {
console.error("NATS publishing failed:", err);
return res.status(500).json({ error: "Internal message queue failure" });
}
});
app.listen(9000, () => {
console.log("Gateway listening on port 9000...");
});
🚀 The Asynchronous Worker: Embeddings and Graph Transactions
With the event safely buffered, the background worker picks up the queue message and executes the heavy lifting. The worker performs two critical steps:
- Vector Embedding Generation: Calls a local embedding model (like
google/embeddinggemma-300m) to convert the text fact into a 1024-dimensional semantic float array. - Knowledge Graph Updates (Cypher): Executes a transactional graph update in FalkorDB to create the entity nodes, link them to the active session graph, and write semantic relationship edges.
Below is the worker script written in Python:
# worker.py (Curation Background Worker)
import asyncio
from nats.aio.client import Client as NATS
from falkordb import FalkorDB
from sentence_transformers import SentenceTransformer
import json
# Initialize FalkorDB connection and local Embedding Model
db = FalkorDB(host="127.0.0.1", port=6379)
graph = db.select_graph("segnog_memory")
# Load google/embeddinggemma-300m locally on CPU
print("Loading local Gemma embedding model...")
model = SentenceTransformer("google/embeddinggemma-300m")
async def process_memory_event(msg):
# Parse payload
data = json.loads(msg.data.decode())
fact = data["fact"]
session_id = data["sessionId"]
print(f"Processing fact: {fact} for Session {session_id}...")
# Step 1: Generate Vector Embeddings locally (CPU-bound)
embeddings = model.encode(fact).tolist()
# Step 2: Build the Cypher query to insert into FalkorDB
# Create the fact node, session node, and bind them with a link edge
cypher_query = """
MERGE (s:Session {id: $session_id})
CREATE (f:Fact {
content: $fact,
vector: $embeddings,
created_at: timestamp()
})
CREATE (s)-[:HAS_FACT]->(f)
RETURN f.id
"""
params = {
"session_id": session_id,
"fact": fact,
"embeddings": embeddings
}
# Execute transactional write to the Graph DB
try:
result = graph.query(cypher_query, params)
print(f"🎉 Successfully inserted fact node. Graph updated.")
except Exception as e:
print(f"FalkorDB transaction failed: {e}")
async def run():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to ingestion subject
sub = await nc.subscribe("memory.facts.ingest", cb=process_memory_event)
print("Background worker subscribed. Awaiting memory events...")
# Keep event loop running
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run())
📈 Summary of Benefits
Adopting an asynchronous curation pipeline creates a highly resilient developer environment:
- Instant Agent Execution: Relieving the MCP request chain from calculations cuts latency by 98%, keeping agent interactions feeling instant.
- Elastic Scaling: During multi-file refactoring runs, agents emit hundreds of micro-facts. The NATS broker acts as a shock-absorber, buffering updates safely without overloading your server's RAM or CPU cores.
- Graph Integrity: If FalkorDB crashes or lockouts happen, messages remain safely stored in NATS. The worker will retry delivery automatically when databases recover.
By decoupling LLM memory writes with NATS and FalkorDB, you gain the benefits of a complex, long-term semantic knowledge graph with the high performance of a lightweight, responsive serverless bridge.