Importing massive batches of records (such as migrating 100k customer accounts or bulk updating product stock databases) will choke synchronous scripts. If you process records sequentially, execution takes hours. However, launching thousands of concurrent API requests simultaneously will crash your server or saturate database connections.
Using asyncio queues, we build a producer-consumer worker pool to execute ingestion in parallel while throttling concurrency.
1. Structuring the Worker Pool
We instantiate an async queue, load the records (producer), and launch a limited number of consumer tasks (workers) that process jobs concurrently:
# Async Ingestion Worker Pool in Python
import asyncio
async def worker(queue):
while True:
# Fetch ingestion task from queue
item = await queue.get()
try:
await process_item_ingestion(item)
finally:
queue.task_done()
async def run_bulk_ingestion(records, worker_count=10):
queue = asyncio.Queue()
# Populate the queue
for r in records:
await queue.put(r)
# Start consumer tasks
tasks = []
for _ in range(worker_count):
task = asyncio.create_task(worker(queue))
tasks.append(task)
await queue.join() # Wait for queue to empty
for t in tasks:
t.cancel() # Shut down workers
2. Concurrency Control
By adjusting worker_count, we control the maximum number of concurrent database operations, preventing lockups while utilizing full network bandwidth.