2020-04-14 16:51:25+00:00

Cloud Datastore is a robust, auto-scaling NoSQL database for server side operations. However, it lacks native support for pushing real-time updates to web browsers. If your application needs to show live dashboards or telemetry status feeds, clients must poll the database continuously, causing high read costs. A clean solution replicates updates from Datastore to the Firebase Realtime Database asynchronously, which then pushes changes to active clients.

By using Google App Engine Task Queues, we can capture Datastore modifications and mirror key-value states to Firebase.


1. Enqueuing Replication Tasks

Whenever a telemetry entity is updated in Datastore, the backend enqueues a background task containing the record data to be synced:

# db_replicator.py
from google.appengine.api import taskqueue
import json

def save_entity_and_sync(entity):
    # Save to main Cloud Datastore
    entity.put()
    
    # Enqueue replication task
    payload = {
        "device_id": entity.device_id,
        "status": entity.status,
        "last_update": entity.timestamp.isoformat()
    }
    taskqueue.add(
        url='/tasks/replicate-to-firebase',
        params={'payload': json.dumps(payload)},
        method='POST'
    )

2. Writing the Firebase Sync Worker

The worker receives the task request on GAE, authenticates using service account credentials, and pushes the payload directly to the Firebase database reference:

# firebase_worker.py
from flask import Flask, request
import firebase_admin
from firebase_admin import db

app = Flask(__name__)

# Initialize Firebase SDK
cred = firebase_admin.credentials.Certificate('service-key.json')
firebase_admin.initialize_app(cred, {
    'databaseURL': 'https://my-app.firebaseio.com'
})

@app.route('/tasks/replicate-to-firebase', methods=['POST'])
def replicate_task():
    payload_str = request.form.get('payload')
    data = json.loads(payload_str)
    
    # Write to Firebase reference
    ref = db.reference(f"devices/{data['device_id']}")
    ref.set({
        "status": data["status"],
        "updatedAt": data["last_update"]
    })
    return "OK", 200