Relational databases are excellent for transactional consistency, but slow down when handling complex search filters, wildcards, and autocomplete requests across millions of records. An ideal stack delegates searches to a dedicated engine like Elasticsearch, while keeping PostgreSQL as the source of truth.
To do this, we must build a synchronization pipeline that captures database writes and updates Elasticsearch indexes in real time.
1. Hooking into Database Events
We intercept data changes in our ORM models (using SQLAlchemy event listeners or FastAPI service logic) and write an event payload to our search indexing worker queue:
# Syncing PostgreSQL records to Elasticsearch in Python
from elasticsearch import AsyncElasticsearch
es_client = AsyncElasticsearch(hosts=["http://localhost:9200"])
async def index_account_record(account_id, account_data):
doc_payload = {
"name": account_data["name"],
"email": account_data["email"],
"industry": account_data["industry"],
"status": account_data["status"]
}
# index the document asynchronously
response = await es_client.index(
index="the enterprise platform-accounts",
id=account_id,
document=doc_payload
)
return response["result"]
2. Handling Ingestion Failures
If Elasticsearch is down, the synchronization task must not fail the client request. The indexing task is written to an asynchronous queue (using Kafka or SQS), so workers can retry index insertion until successful.