This was one of the worst mistakes I realized after the business scaled and more data started coming through.
Real Case Scenario
The business was pushing to deliver a service that manages slugs (part of URLs) for E-commerce entities such as products or categories. Millions of data were coming through queues, and this service processed and saved them to MongoDB.
Due to insufficient time to cover edge cases, we overlooked the design of the document structure and made decisions on the go during development.
Specifically, our schemas were inconsistent. If a field did not exist, we simply excluded it from the document, assuming it was unnecessary data and not worth keeping.
We created indexes for the collection, and it worked fine until the amount of data reached around 10 million. Eventually, the existence checking filters went out of control and started timing out. We were dealing with over 500 million documents, with filtering by existence checking. In this case, the DB index couldn't perform effectively.
Counting documents were nearly impossible. We prepared a script to go through all documents and check one by one which takes a lot of time. Sometimes, connection with mongo used to drop and we started counting again.
POC and Countermeasures
Invest some time to define document structure precisely. This is a critical part that never gets attention when starting the project. Our mistake was excluding the field entirely from some of the documents instead of assigning a default value for the field.
The primary reason MongoDB does not support existence checking indexes is due to the nature of how indexes are structured. Indexes in MongoDB are designed to quickly locate documents based on the values of specific fields.
However, checking for the existence of a field requires examining each document to determine if the field is present, which can be highly inefficient. This type of operation cannot be optimized with traditional indexing methods, leading to significant performance issues as the volume of data grows.
That ended up with slow query scanning because of existence checking. In MongoDB, it's possible to give partial index expressions but it's really not working that well where field existence is checked such as {parentfield.childfield: {'$exists': false or true}}
.
In a nutshell, the decision to assign default values or delete fields should be guided by the specific needs of your application. If you frequently read the field and need to perform operations based on its value, assigning a default value might simplify your queries and application logic.
In my case, the field was frequently used in multiple places in the core logic of the system and because of existence checking the performance was draining.
Let's test it by breaking down below scripts. These scripts reading a source collection in the same database and pushing to different one. Then we're just querying the collection with some filters to see how much time it takes.
Check out previous article as a follow-up:
Now, here's the scripts I prepared to reflect real-world scenario. It's reading from source collection which has 1.8M of documents. In my real experience it was more than 500M documents. The larger the collection the more reading time.
Deleting Fields
import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MONGO_URI = 'mongodb://root:root@localhost:27020'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["bad_collection"]
async def create_index():
await target_collection.create_index(
[('slug.US', 1), ('price.currency', 1)]
)
logger.info('Index created on brand and condition with partial filter.')
async def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = await cursor.to_list(length=batch_size)
return products
async def bulk_write_to_mongo(products):
for idx, product in enumerate(products):
product['_id'] = ObjectId()
if idx % 3 == 0:
del product['slug']['US'] # deleting field
product["price"]["currency"] = "EUR"
try:
result = await target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
async def process_batches(batch_size, concurrency_limit):
tasks = []
last_id = None
while True:
products = await fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
tasks.append(bulk_write_to_mongo(products))
if len(tasks) >= concurrency_limit:
await asyncio.gather(*tasks)
tasks = []
# Process remaining tasks if any
if tasks:
await asyncio.gather(*tasks)
async def measure_filter_execution_time():
start_time = time.time()
query = {'slug.US': {"$exists": False}, "price.currency": "EUR"}
count = await target_collection.count_documents(query)
end_time = time.time()
logger.info(f'Time to execute filter query: {end_time - start_time:.2f} seconds.')
logger.info(f'Number of documents without "brand" field: {count}')
async def main():
await create_index()
batch_size = 1000
concurrency_limit = 10
start_time = time.time()
await process_batches(batch_size, concurrency_limit)
end_time = time.time()
logger.info(f'Total time to process batches: {end_time - start_time:.2f} seconds.')
await measure_filter_execution_time()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
- Deletes the
slug.US
field for every third product. - Uses
"$exists": False
to filter documents where theslug.US
field does not exist. - Deleting the field entirely can save storage space and may be more semantically meaningful for indicating the absence of a value. However, it requires existence checks in queries, which might be slower.
- More focused on a scenario where fields might not exist at all, reflecting a more storage-efficient approach but potentially more complex queries.
Results:
Assign Default Values
import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MONGO_URI = 'mongodb://root:root@localhost:27020'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["good_collection"]
async def create_index():
await target_collection.create_index(
[('slug.US', 1), ('price.currency', 1)]
)
logger.info('Index created on brand and condition with partial filter.')
async def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = await cursor.to_list(length=batch_size)
return products
async def bulk_write_to_mongo(products):
for idx, product in enumerate(products):
product['_id'] = ObjectId()
if idx % 3 == 0:
product['slug']['US'] = None # default value
product["price"]["currency"] = "EUR"
try:
result = await target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
async def process_batches(batch_size, concurrency_limit):
tasks = []
last_id = None
while True:
products = await fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
tasks.append(bulk_write_to_mongo(products))
if len(tasks) >= concurrency_limit:
await asyncio.gather(*tasks)
tasks = []
# Process remaining tasks if any
if tasks:
await asyncio.gather(*tasks)
async def measure_filter_execution_time():
start_time = time.time()
query = {'slug.US': None, "price.currency": "EUR"}
count = await target_collection.count_documents(query)
end_time = time.time()
logger.info(f'Time to execute filter query: {end_time - start_time:.2f} seconds.')
logger.info(f'Number of documents without "brand" field: {count}')
async def main():
await create_index()
batch_size = 1000
concurrency_limit = 10
start_time = time.time()
await process_batches(batch_size, concurrency_limit)
end_time = time.time()
logger.info(f'Total time to process batches: {end_time - start_time:.2f} seconds.')
await measure_filter_execution_time()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
- Sets the
slug.US
field toNone
for every third product. - Uses
None
to filter documents where theslug.US
field is set toNone
. - Setting the field to
None
retains the field in the document with aNone
value, potentially using more storage but simplifying queries since checking forNone
can be faster than checking for non-existence. - Using
None
can make queries simpler and faster, as checking for aNone
value is generally straightforward.
Results:
Difference between read operations is 1.17
seconds for reading 1.8M documents. Can you imagine if collection size 500M
? Disaster.
If you want to test from your side as well here's the repo:
and video I recorded with a bit more explanation:
Conclusion
I always advise investing time in designing the system before implementation. Specifically, the design should support scalability and maintain high performance under increased data loads. It's essential to create a robust architecture that can handle growth seamlessly and ensure efficient operation even as demands escalate. Prioritizing these aspects in the design phase can save considerable time and resources in the long run, preventing issues that might arise as the business expands.
Member discussion: