EverythingPython

Streaming Real-Time AI Inference Results into MongoDB

I’m going to be presenting a talk at a Mongo DB User group meetup later today and I thought I’d write about it so that it would -

Let me pre-emptively say, I don’t know why I chose this to be the “TITLE” of the talk. It is extremely dry. But the topic is actually a lot of fun!

Effectively, there is this concept called Change Streams in Mongo DB and this talk is an attempt to understand and illustrate its usage.

So first off, what is Mongo DB and where is it used?

MongoDB is a database that follows a document model to store data. This means that, in comparison to normal relational databases where data is stored using fixed schemas in rows and columns, Mongo DB and its other Document DB brethren store data in a more flexible format - in JSON-like documents.

Example of a dataset that can be stored in Mongo :

 1{
 2  "_id" : ObjectId(),
 3  "title" : "Anna Karenina",
 4  "author" : "Leo Tolstoy",
 5  "content" : "It's a classic of world literature, exploring themes of love, loyalty, family, and social class in 19th-century Russia.",
 6  "tags" : [ "Russian" , "Serious" ],
 7  "publishedDate" : ISODate()
 8},
 9{
10  "_id" : ObjectId(),
11  "title" : "War and Peace",
12  "author" : "Leo Tolstoy",
13  "content" : "Set during the Napoleonic Wars and focusing on several aristocratic families in Russia.",
14  "tags" : [ "Russian", "Serious" ],
15  "publishedDate" : ISODate()
16}

MongoDB can be used where the data to be stored doesn’t particularly benefit from being modelled into strict tables. It can be used when the data schema needs to be more flexible.

For example, the same data above could have potentially been stored in a Relational DB. But as the number of fields increases and as there end up being more and more complicated JOINS, the Relational DB model might suffer from slowdown.


What is this post about?

Change Streams!

This is a feature that has been around for a long time now. We’re currently on Mongo DB 8.0.4 as of December 2024 but this feature was introduced in MongoDB 3.6 in 2017.

It is used to let applications access real-time data changes in Mongo DB. Without having to keep polling for it.

Now other DBs also have this in some shape or form but they all vary in difficulty levels of configuration -

In that aspect Change Streams are very easy to configure.

How do Change Streams work?


How can I enable Change Streams?

This post assumes you have a working MongoDb setup - I’ll write a separate post for this.

If there’s a replica set configuration like so - Alt Text

you’re all good. If not, you need to do the following :

replication:
	  replSetName: "<Whatever name you want>"

So it looks like -

Alt Text

Boom! Change Streams are now enabled!


Great. Now how do I use them?

I thought about this a lot as well. My initial thought was to mimic the functioning of an oxymeter in a hospital that keeps reporting the oxygen level of a patient.

But I thought I’d keep it a little lighter.

So, my simulated use case is one of segregating News Items into 1 of 4 categories as they keep getting pulled from News Sources.

Here’s a broad architecture diagram -

Alt Text

Let’s consider this piece by piece :

News Generator :

Alt Text

The Flask App

The Client/ UI/ Browser

Alt Text

This changes automatically every 20 seconds.

Hopefully this gave you an idea of how Change Streams can be used!


Codebase -

All the code for this project can be found here.

#Talks #Mongodb