How to Integrate MongoDB with Diffusion Cloud

Martin Cowie - July 13, 2016

MongoDB forms part of the NoSQL vanguard and is big in more than once sense, which is part of the attraction. It is both very popular, finding employ inside eBay, LinkedIn and Craigslist, and it excels in storing & organizing very large volumes of data.

That it stores its content as (what look and feel like) JSON documents makes it interesting from from a Diffusion Cloud perspective. “How hard ..”, I asked myself, “..would it be to reflect a MongoDB collection into the topic tree?”. Not so hard, as it turns out.

The intent

MongoDB divides is storage into databases which in turn are divided into collections, much like a conventional RDMS uses tables. Our intent is to subscribe to a single collection and all changes to it, creating a discrete JSON topic for each document, and updating each topic in sync with the original document.

Configuring MongoDB for replication

Our first obstacle: MongoDB doesn’t really do subscription in a pub/sub sense. So instead make use the replication feature, which is really intended for wiring together MongoDB replica sets. Plenty of third-party applications make use of it however, including Meteor who do include a more directly useful Publish/Subscribe feature in their product.

It’s an optional feature so it has to be deliberately configured. Typically MongoDB is run as a daemon, but for testing purposes we configure and run ours in the fore.

As well as starting your MongoDB server this will also output a lot of diagnostic noise, so start another shell for MongoDB interaction.

The output from this Mongo testReplSet:PRIMARY confirms that we’re logged into the primary (and indeed only) member of the replication set. We then create 100 documents in a collection someDB.someCollection. This gives us our test subject.

Building the Adapter.

All sources for the adapter are available in Github, but here we shall lavish some explanation on the significant parts.

Placing the connectivity

We chose JCommander because we’ve used it before, and while shows nearly five times as many projects use Apache Commons CLI, the annotation based approach makes for a quicker & cleaner approach.

Having connected to MongoDB using Mongo’s own JavaScript driver we then arrange access to two collections; firstly the collection given on the command line and secondly in the database local which exists only when replication is configured, and is used as a sink for all storage mutation events. The OpLog is a “capped” collection, making it a little different to regular collections:

  • Documents can only be appended to a capped collection (though an identically sized documents can be used to overwrite an existing document).
  • Capped collections are akin to circular buffers. They are bounded, and once all allocated storage is used, older storage is then reused.
  • Similarly to the way tail(1) works with files, a capped collection can entertain queries from ‘tailable’ cursors, such that when the cursor reaches the end of the collection the cursors waits while more documents are appended to the collection.

Building the Adapter

Adapter’s static build method gathers two key features. Firstly it obtains TopicUpdateControl which is used to update topics values, using which it obtains an exclusive ValueUpdater for a branch of the topic tree. Finally we obtain TopicControl in order that we may create and remove topics.

Now we build a MongoNamespace, encapsulating the database and collection used to listen for command events (specifically commands to drop the collection), before constructing and returning an Adapter object. Using a builder rather than a more complex constructor conveys many benefits (as covered in chapter 2 of in Effective Java by Josh Bloch), but my motivations here are: the constructors intent is clear and it is simpler to build an immutable (and therefore thread-safe) Adapter object.

Turning the handle

The real work can begin now both the connectivity and state are in place. In method run() we do two things:

  1. Query the MongoDB collection for all documents, transcribing each.
  2. call relayChanges()

We use Java 8’s “Try With Resources” feature so that cursor opened on line 10 is closed a soon as the thread drops out of that scope.

Method transcribeDocument() has the task of building a new JSON topic from the content of the MongoDB document. If the topic has already been created (and therefore exists in topicPaths) we update the topic. Either branch makes use of method toBytes(String) which parses a JSON string down to a form suitable for the ValueUpdater.

Relaying the changes

Method relayChanges(long) does what it says on the tin: relays changes from the MongoDB instance to the topic tree. Before listening to the OpLog for events it composes a filter to use. In this case we wish only to see events later that ‘now’, that are document-insert, update or delete events, or commands to drop the collection.

Interestingly BsonTimestamp resolves time only to the second, and nothing smaller. It uses an incrementing counter to disambiguate events within that second.

We make the call to oplog.find(filter), and build a TailableAwait cursor, and then read incoming events as they happen. We switching them to either processInsert(Document), processUpdate(Document), processDelete(Document) or processCommand(Document), depending on the value of property op (presumably short for operation).

Methods processUpdate(Document), processDelete(Document) and processCommand(Document) check that the event relates to the adapted collection. A more focussed filter here would be both more performant and eliminate the need for this step.

Method processInsert(Document) mostly reuses transcribeDocument() to update a topic with the new content embedded in the event.

Method processDelete(Document) uses the MongoDB document-id to look up the effected topic, and then remove it.

Method processUpdate(Document) is more interesting as the event contains instructions to change the document, but not the changed document itself. A more fully realised adapter implementation would interpret and react to the changes, however in this case we fetch the changed document from the collection using it’s ObjectId, and pass that to transcribeDocument to update the existing topic. This imposes the cost of an extra round-trip, for the sake of readability.

Summing up

Above we have shown how little code is actually required to get started. As this is written the Github repository holds only 426 lines of Java. In the next blog we shall show how the adapter can be put to use.

The Diffusion® Intelligent Event-Data Platform makes it easy to consume, enrich and deliver event-data in real-time across all network conditions. Push Technology pioneered and is the sole provider of real-time delta data streaming™ technology that powers mission-critical business applications worldwide. Leading brands use Push Technology to bring innovative products to market faster by reducing the software development efforts using the low-code features. The Diffusion® Intelligent Event-Data Platform is available on-premise, in-the-cloud, or in a hybrid configuration. Learn how Push Technology can reduce infrastructure costs, and increase speed, efficiency, and reliability, of your web, mobile, and IoT application.

Learn More: View Blog Posts and Developer Resources


Check out these other resources


Info Center

Case Studies, White Papers, and Tech Talk notes relating to Diffusion and the customers benefiting from using it.


Quick Start Guide

Step-by-step guide to getting started fast with Diffusion or Diffusion Cloud.


Diffusion Cloud

Get the facts and the details about our Diffusion Cloud platform.