This was one of the worst mistakes I realized after the business scaled and more data started coming through.

Real Case Scenario

The business was pushing to deliver a service that manages slugs (part of URLs) for E-commerce entities such as products or categories. Millions of data were coming through queues, and this service processed and saved them to MongoDB.

Due to insufficient time to cover edge cases, we overlooked the design of the document structure and made decisions on the go during development.

Specifically, our schemas were inconsistent. If a field did not exist, we simply excluded it from the document, assuming it was unnecessary data and not worth keeping.

We created indexes for the collection, and it worked fine until the amount of data reached around 10 million. Eventually, the existence checking filters went out of control and started timing out. We were dealing with over 500 million documents, with filtering by existence checking. In this case, the DB index couldn't perform effectively.

Counting documents were nearly impossible. We prepared a script to go through all documents and check one by one which takes a lot of time. Sometimes, connection with mongo used to drop and we started counting again.

POC and Countermeasures

Invest some time to define document structure precisely. This is a critical part that never gets attention when starting the project. Our mistake was excluding the field entirely from some of the documents instead of assigning a default value for the field.

The primary reason MongoDB does not support existence checking indexes is due to the nature of how indexes are structured. Indexes in MongoDB are designed to quickly locate documents based on the values of specific fields.

However, checking for the existence of a field requires examining each document to determine if the field is present, which can be highly inefficient. This type of operation cannot be optimized with traditional indexing methods, leading to significant performance issues as the volume of data grows.

That ended up with slow query scanning because of existence checking. In MongoDB, it's possible to give partial index expressions but it's really not working that well where field existence is checked such as {parentfield.childfield: {'$exists': false or true}}.

In a nutshell, the decision to assign default values or delete fields should be guided by the specific needs of your application. If you frequently read the field and need to perform operations based on its value, assigning a default value might simplify your queries and application logic.

In my case, the field was frequently used in multiple places in the core logic of the system and because of existence checking the performance was draining.

Let's test it by breaking down below scripts. These scripts reading a source collection in the same database and pushing to different one. Then we're just querying the collection with some filters to see how much time it takes.

Check out previous article as a follow-up:

Boost MongoDB Performance: Motor Client vs PyMongo - Which is Faster?
Once I was tasked with improving the database and general app operations of backend services which are using MongoDB as their main database. These services were part of huge infrastructure where millions of messages coming through the queues and needed to be processed based on the message actions. That means

Now, here's the scripts I prepared to reflect real-world scenario. It's reading from source collection which has 1.8M of documents. In my real experience it was more than 500M documents. The larger the collection the more reading time.

Deleting Fields

import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MONGO_URI = 'mongodb://root:root@localhost:27020'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'

client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["bad_collection"]

async def create_index():
    await target_collection.create_index(
        [('slug.US', 1), ('price.currency', 1)]
    )
    logger.info('Index created on brand and condition with partial filter.')

async def fetch_products(batch_size, last_id=None):
    query = {'_id': {'$gt': last_id}} if last_id else {}
    cursor = collection.find(query).sort('_id').limit(batch_size)
    products = await cursor.to_list(length=batch_size)
    return products

async def bulk_write_to_mongo(products):
    for idx, product in enumerate(products):
        product['_id'] = ObjectId()
        if idx % 3 == 0:
            del product['slug']['US'] # deleting field
            product["price"]["currency"] = "EUR"

    try:
        result = await target_collection.insert_many(products, ordered=False)
        logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
    except Exception as e:
        logger.error(f'Error inserting products into MongoDB: {e}')

async def process_batches(batch_size, concurrency_limit):
    tasks = []
    last_id = None
    while True:
        products = await fetch_products(batch_size, last_id)
        if not products:
            break
        last_id = products[-1]['_id']
        tasks.append(bulk_write_to_mongo(products))
        if len(tasks) >= concurrency_limit:
            await asyncio.gather(*tasks)
            tasks = []
    # Process remaining tasks if any
    if tasks:
        await asyncio.gather(*tasks)

async def measure_filter_execution_time():
    start_time = time.time()
    
    query = {'slug.US': {"$exists": False}, "price.currency": "EUR"}
    count = await target_collection.count_documents(query)
    
    end_time = time.time()
    logger.info(f'Time to execute filter query: {end_time - start_time:.2f} seconds.')
    logger.info(f'Number of documents without "brand" field: {count}')

async def main():
    await create_index()
    
    batch_size = 1000
    concurrency_limit = 10

    start_time = time.time()
    await process_batches(batch_size, concurrency_limit)
    end_time = time.time()
    logger.info(f'Total time to process batches: {end_time - start_time:.2f} seconds.')

    await measure_filter_execution_time()

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
  • Deletes the slug.US field for every third product.
  • Uses "$exists": False to filter documents where the slug.US field does not exist.
  • Deleting the field entirely can save storage space and may be more semantically meaningful for indicating the absence of a value. However, it requires existence checks in queries, which might be slower.
  • More focused on a scenario where fields might not exist at all, reflecting a more storage-efficient approach but potentially more complex queries.

Results:

Assign Default Values

import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MONGO_URI = 'mongodb://root:root@localhost:27020'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'

client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["good_collection"]

async def create_index():
    await target_collection.create_index(
        [('slug.US', 1), ('price.currency', 1)]
    )
    logger.info('Index created on brand and condition with partial filter.')

async def fetch_products(batch_size, last_id=None):
    query = {'_id': {'$gt': last_id}} if last_id else {}
    cursor = collection.find(query).sort('_id').limit(batch_size)
    products = await cursor.to_list(length=batch_size)
    return products

async def bulk_write_to_mongo(products):
    for idx, product in enumerate(products):
        product['_id'] = ObjectId()
        if idx % 3 == 0:
            product['slug']['US'] = None # default value
            product["price"]["currency"] = "EUR"

    try:
        result = await target_collection.insert_many(products, ordered=False)
        logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
    except Exception as e:
        logger.error(f'Error inserting products into MongoDB: {e}')

async def process_batches(batch_size, concurrency_limit):
    tasks = []
    last_id = None
    while True:
        products = await fetch_products(batch_size, last_id)
        if not products:
            break
        last_id = products[-1]['_id']
        tasks.append(bulk_write_to_mongo(products))
        if len(tasks) >= concurrency_limit:
            await asyncio.gather(*tasks)
            tasks = []
    # Process remaining tasks if any
    if tasks:
        await asyncio.gather(*tasks)

async def measure_filter_execution_time():
    start_time = time.time()
    
    query = {'slug.US': None, "price.currency": "EUR"}
    count = await target_collection.count_documents(query)
    
    end_time = time.time()
    logger.info(f'Time to execute filter query: {end_time - start_time:.2f} seconds.')
    logger.info(f'Number of documents without "brand" field: {count}')

async def main():
    await create_index()
    
    batch_size = 1000
    concurrency_limit = 10

    start_time = time.time()
    await process_batches(batch_size, concurrency_limit)
    end_time = time.time()
    logger.info(f'Total time to process batches: {end_time - start_time:.2f} seconds.')

    await measure_filter_execution_time()

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
  • Sets the slug.US field to None for every third product.
  • Uses None to filter documents where the slug.US field is set to None.
  • Setting the field to None retains the field in the document with a None value, potentially using more storage but simplifying queries since checking for None can be faster than checking for non-existence.
  • Using None can make queries simpler and faster, as checking for a None value is generally straightforward.

Results:

Difference between read operations is 1.17 seconds for reading 1.8M documents. Can you imagine if collection size 500M ? Disaster.

If you want to test from your side as well here's the repo:

GitHub - PylotStuff/motor-asyncio-performance: Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop
Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop - GitHub - PylotStuff/motor-asyncio-performance: Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop

and video I recorded with a bit more explanation:

Conclusion

I always advise investing time in designing the system before implementation. Specifically, the design should support scalability and maintain high performance under increased data loads. It's essential to create a robust architecture that can handle growth seamlessly and ensure efficient operation even as demands escalate. Prioritizing these aspects in the design phase can save considerable time and resources in the long run, preventing issues that might arise as the business expands.