Once I was tasked with improving the database and general app operations of backend services which are using MongoDB as their main database.

These services were part of huge infrastructure where millions of messages coming through the queues and needed to be processed based on the message actions. That means tons of DB ops each second and other additional checks while processing.

This post cross-published with OnePublish

Real Case Scenario

The processing layer of service was using pymongo to interact with MongoDB and service itself was running in synchronous environment. Even the database operations was handled in bulk still performance was not capable of handling incoming data.

Synchronous code was making things even worse. The code execution waits result from current operation to move forward. That's a serious bottleneck in scalable systems.

This was causing queue overflows and potential data loss every time.

Asynchronous Environment

The solution I implemented was combination of:

  • Motor
  • Asyncio
  • Uvloop

Let's quickly go through the definitions of these items.

PyMongo is the official MongoDB driver for Python, providing a simple and intuitive way to interact with MongoDB databases. It's synchronous, meaning each database operation blocks the execution of your program until it completes, which can be a bottleneck in I/O-bound tasks.

Motor is the asynchronous driver for MongoDB, built on top of PyMongo and designed to take advantage of Python's asyncio library. Motor allows you to perform non-blocking database operations, making it suitable for high-performance applications that require concurrency.

To illustrate the performance differences, I prepared a stress test using two scripts: one using Motor (asynchronous) and the other using PyMongo (synchronous). Both scripts performed the same task of reading and writing documents to MongoDB in batches.

Both scripts reading 300k documents from source collection and migrating them to new target collection.

Asynchronous Script (Motor)

import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'

client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["new_collection"]

async def fetch_products(batch_size, last_id=None):
    query = {'_id': {'$gt': last_id}} if last_id else {}
    cursor = collection.find(query).sort('_id').limit(batch_size)
    products = await cursor.to_list(length=batch_size)
    return products

async def bulk_write_to_mongo(products):
    for product in products:
        product['_id'] = ObjectId()  # Generate a new ObjectId for each product

    try:
        result = await target_collection.insert_many(products, ordered=False)
        logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
    except Exception as e:
        logger.error(f'Error inserting products into MongoDB: {e}')

async def process_batches(batch_size, concurrency_limit):
    tasks = []
    last_id = None
    while True:
        products = await fetch_products(batch_size, last_id)
        if not products:
            break
        last_id = products[-1]['_id']
        tasks.append(bulk_write_to_mongo(products))
        if len(tasks) >= concurrency_limit:
            await asyncio.gather(*tasks)
            tasks = []
    # Process remaining tasks if any
    if tasks:
        await asyncio.gather(*tasks)

async def main():
    batch_size = 1000
    concurrency_limit = 10

    start_time = time.time()
    await process_batches(batch_size, concurrency_limit)
    end_time = time.time()
    logger.info(f'Total time: {end_time - start_time:.2f} seconds.')

if __name__ == '__main__':
    asyncio.run(main())

Synchronous Script (PyMongo)

import logging
import time
from bson import ObjectId
from pymongo import MongoClient

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
TARGET_COLLECTION_NAME = 'new_collection'

client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db[TARGET_COLLECTION_NAME]

def fetch_products(batch_size, last_id=None):
    query = {'_id': {'$gt': last_id}} if last_id else {}
    cursor = collection.find(query).sort('_id').limit(batch_size)
    products = list(cursor)
    return products

def bulk_write_to_mongo(products):
    for product in products:
        product['_id'] = ObjectId()  # Generate a new ObjectId for each product

    try:
        result = target_collection.insert_many(products, ordered=False)
        logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
    except Exception as e:
        logger.error(f'Error inserting products into MongoDB: {e}')

def process_batches(batch_size):
    last_id = None
    while True:
        products = fetch_products(batch_size, last_id)
        if not products:
            break
        last_id = products[-1]['_id']
        bulk_write_to_mongo(products)

def main():
    batch_size = 1000

    start_time = time.time()
    process_batches(batch_size)
    end_time = time.time()
    logger.info(f'Total time: {end_time - start_time:.2f} seconds.')

if __name__ == '__main__':
    main()

Results and Analysis

Execution Time of Migrating 300k documents:

  • Asynchronous script: 17.15 seconds
  • Synchronous script: 23.26 seconds

The asynchronous script completed the task 6.11 seconds faster than the synchronous script. While this might not seem like a significant difference for a single run, it becomes more pronounced in high-load scenarios or when processing large datasets continuously.

Benefits of Using Motor and Asynchronous Environment

  1. Improved Throughput: Asynchronous operations can handle more tasks concurrently, increasing overall throughput. This is especially beneficial in applications with high I/O operations, such as web servers handling multiple database queries simultaneously.
  2. Non-Blocking I/O: Asynchronous operations do not block the main thread, allowing other tasks to continue running. This results in better CPU utilization and smoother application performance, particularly under load.
  3. Scalability: Asynchronous code scales better with the number of concurrent operations. For example, a web application using Motor can handle more simultaneous requests compared to one using PyMongo.
  4. Resource Efficiency: Asynchronous operations can lead to more efficient use of system resources. For instance, the event loop in asyncio allows the application to switch between tasks, reducing idle times and improving overall efficiency.

Source Code and Video Explanation

You can find the source code on Github repository below:

GitHub - PylotStuff/motor-asyncio-performance: Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop
Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop - GitHub - PylotStuff/motor-asyncio-performance: Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop

Conclusion

The choice between Motor and PyMongo depends on the specific needs of your application. For applications that require high concurrency and efficient I/O handling, Motor and the asynchronous approach offer significant advantages. However, for simpler applications or scripts where ease of implementation is a priority, PyMongo's synchronous approach might be sufficient.

By leveraging asynchronous operations with Motor, you can build more scalable and performant applications, making it a worthwhile consideration for modern web development.