Change Streams MongoDB Java Working Simplified

on MongoDB • July 28th, 2022 • Write for Hevo

Change Streams MongoDB Java - Featured Image

Modern applications are in the race to bring real-time synchronization mechanisms. Consider powering trading apps that must update in real-time as stock values fluctuate. Alternatively, you might build an IoT data pipeline that produces warnings anytime a connected car leaves a geofenced region. 

This is where the Change Streams MongoDB Java feature comes into play, making it feasible. Change streams, a new feature in MongoDB 3.6, allow applications to broadcast real-time data updates by utilizing MongoDB’s underlying replication capabilities.

MongoDB users can now easily retrieve real-time data updates using change streams, eliminating the complexity and danger of tailing the oplog (operation log). Any application can easily monitor changes and respond promptly by making judgments that aid the organization in responding to events in real-time. Read along to know how to leverage Change Streams MongoDB Java in your application. 

Table of Contents

What are Change Streams in MongoDB?

Change Stream was introduced in MongoDB 3.6. Change streams are the real-time stream of changes in a database, collection, or deployment. They enable applications to access real-time data updates without the complexity and risk of tailing the oplog. For example, when an update occurs in a collection, MongoDB generates a change event.

Change streams can notify your application of all writes to documents (including deletes) and provide access to all the relevant information as changes occur, eliminating the need for polling, which can introduce delays, incur higher overhead, and lead to missed opportunities.

Key Features of MongoDB Change Streams

  • Targeted changes: Updates can be filtered to highlight listening apps’ relevant and targeted changes. For example, filters can be based on operation type or fields inside the document.
  • Resumablility: Each change stream response involves a resume token. Suppose the connection between the application and the database is suddenly interrupted. In that case, the application can send the last resume token it received, and the change streams will pick up immediately where the application left off.
  • Security: Change streams are safe because users can only establish change streams on collections to which they have been allowed read access.
  • Ease of use: Change streams are well-known – the API syntax uses the established MongoDB drivers and query language and is independent of any specific oplog format.

Why Use Change Streams MongoDB Java Implementation?

Introduced in MongoDB 3.6, MongoDB change streams have been in the play for some time now, but unfortunately, it hasn’t got the limelight it deserves, and most of the MongoDB dev fraternity hasn’t explored the power it brings onboard. It is a hidden gem as many developers are still unaware of its powers and the plethora of options it brings to the table.
Below are some crucial advantages of using change streams MongoDB java Implementation.

  • It enables applications to stream real-time data changes by leveraging MongoDB’s underlying replication capabilities.
  • We can open a change stream cursor for collections, databases, or even at the deployment level, which means we can watch for changes to all collections and databases. From a database and deployments perspective, we must remember that we can do it across all databases, not admin, local, and config databases.
  • One of the most significant advantages of implementing change streams is that it reduces the development effort considerably, especially while implementing common use cases like notification services, real-time analytics, ETL services, and cross-platform synchronizations.
  • The implementation and development effort is drastically reduced compared to any similar type of implementation you would be doing.
  • Reduced dev effort and almost zero infrastructure cost mean a higher return on your investment. It practically means that if you have MongoDB clusters already in production, you can implement change streams without any additional infrastructure burden and literally at zero cost.

Scale your data integration effortlessly with Hevo’s Fault-Tolerant No Code Data Pipeline

As the ability of businesses to collect data explodes, data teams have a crucial role in fueling data-driven decisions. Yet, they struggle to consolidate the scattered data in their warehouse to build a single source of truth. Broken pipelines, data quality issues, bugs and errors, and lack of control and visibility over the data flow make data integration a nightmare.

1000+ data teams rely on Hevo’s Data Pipeline Platform to integrate data from over 150+ sources in a matter of minutes. Billions of data events from sources as varied as SaaS apps, Databases, File Storage, and Streaming sources can be replicated in near real-time with Hevo’s fault-tolerant architecture. What’s more – Hevo puts complete control in the hands of data teams with intuitive dashboards for pipeline monitoring, auto-schema management, and custom ingestion/loading schedules. 

This, combined with transparent pricing and 24×7 support, makes us the most special data pipeline software on review sites.

Take our 14-day free trial to experience a better way to manage data pipelines.

Get started for Free with Hevo!

MongoDB Change Streams Data Flow

MongoDB Change Streams Data Flow
Image Source

Change streams data flow enables developers to build reactive real-time services. Change stream data flow is more straightforward than any custom-built data change management implementation.

You have data from your business, various business apps, user data, IoT sensors, or third-party sources. As the data enters MongoDB, it is almost immediately available for change streams.

Using the change streams API, we can manage the real-time notifications and send the change events or notifications across to our different apps.

We could send it to our custom apps; we could send it to Atlas triggers or even your Kafka implementations. The only thing you have to do is just to build a connection between Change Streams MongoDB and the final application. It’s just a few lines of code to enable pushing data to your downstream applications.

Prerequisites

Before you begin using this handbook, you should be familiar with the following prerequisites:

  • Working familiarity with MongoDB is required.
  • MongoDB version 3.6 or above is required.
  • MongoDB Atlas Cluster
  • A general understanding of the principles of sharding and replication.
  • A general overview of the MongoDB operations log.
  • A basic understanding of Change Events.

Change Streams MongoDB Java Implementation

In this section, we will walk through the steps/ operations for Change Streams MongoDB Java Implementation.
For the sake of simplicity, we will be using the following code repo available on Github. You can clone the repository to get started.

git clone https://github.com/mongodb-developer/java-quick-start

After cloning the repo, you must have seen multiple files. In this tutorial, we will be working on the ChangeStream.java file. We will use several examples to demonstrate some of the Change Stream’s features. We will just work on the code directly linked to the Change Streams for simplicity.

Creating Simple Change Streams Without Filters

Let us begin with the simplest Change Streams MongoDB Java Implementation we can create:

MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
ChangeStreamIterable<Grade> changeStream = grades.watch();
changeStream.forEach((Consumer<ChangeStreamDocument<Grade>>) System.out::println);

The critical takeaway from the above code is as follows:

  • db.collection.watch() function is used to open a change stream cursor on the collection.
  • This returns a ChangeStreamIterable that can be iterated to return our change events as indicated by its name. 
  • In our program, we are iterating over Change Stream to print our change event documents in the Java standard output.

The simpler and shorter form of the above code is presented below:

grades.watch().forEach(printEvent());

private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
    return System.out::println;
}

To run the above example: Uncomment only example 1 from the ChangeStreams.java file. Start it in your IDE or a dedicated terminal at the root of your project by using Maven. Use the following code snippet to start execution through the console terminal.

mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority" 

The second Java file we will be using is  MappingPOJO.java. This program will generate MongoDB operations that we will observe in the Change Streams output.

Start MappingPOJO.java in another console or in your IDE.

mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.MappingPOJO" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"

In MappingPOJO, we are performing four MongoDB operations: 

  • Creating a new Grade document with the insertOne() method.
  • Searching for this Grade document using the find() method.
  • Replacing this Grade entirely using the findOneAndReplace() method.
  • Deleting this Grade using the deleteOne() method.

MappingJava’s typical output proves this:

Grade inserted.
Grade found:    Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}]}
Grade replaced: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}, Score{type='exam', score=42.0}]}
Grade deleted:  AcknowledgedDeleteResult{deletedCount=1}

Note: The find() method simply reads a document from MongoDB. It doesn’t change anything. Hence it doesn’t generate an event in the Change Stream.

Insert and Delete OperationType Filter in Change Streams

Let’s perform some filter operations. This time we’ll just be interested in insert and delete actions only.

List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete"))));
grades.watch(pipeline).forEach(printEvent());

Uncomment example 2 in ChangeStreams.java and run the program, followed by MappingPOJO.java, as we did previously.

You must receive the following output:

ChangeStreamDocument {operationType=OperationType {value= 'insert'},
  resumeToken= {"_data": "825E2F4983000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
  namespace=sample_training.grades,
  destinationNamespace=null,
  fullDocument=Grade
  {
    id=5e2f4983cc1d2842bff55564,
    student_id=10003.0,
    class_id=10.0,
    scores= [ Score {type= 'homework', score=50.0}]
  },
  documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564" }},
  clusterTime=Timestamp {value=6786723990460170241, seconds=1580157315, inc=1 },
  updateDescription=null,
  txnNumber=null,
  lsid=null
}

ChangeStreamDocument { operationType=OperationType {value= 'delete'},
  resumeToken= {"_data": "825E2F4983000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
  namespace=sample_training.grades,
  destinationNamespace=null,
  fullDocument=null,
  documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564"}},
  clusterTime=Timestamp {value=6786723990460170244, seconds=1580157315, inc=4},
  updateDescription=null,
  txnNumber=null,
  lsid=null
  }
]

Note: This time, we’re only getting 2 events,i.e., insert and delete.

Update OperationType Filter in Change Streams

This time we will use the Update OperationType filter.

List<Bson> pipeline = singletonList(match(eq("operationType", "update")));
grades.watch(pipeline).forEach(printEvent());

Follow these instructions this time.

  • Uncomment example 3 in ChangeStreams.java, then run Create.java if you haven’t already. These new documents will be used in the following stage.
  • In another console, run Update.java.

In the output, you would get the following;

ChangeStreamDocument {operationType=OperationType {value= 'update'},
  resumeToken= {"_data": "825E2FB83E000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
  namespace=sample_training.grades,
  destinationNamespace=null,
  fullDocument=null,
  documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
  clusterTime=Timestamp {value=6786845739898109953, seconds=1580185662, inc=1},
  updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.10": "You will learn a lot if you read the MongoDB blog!"}},
  txnNumber=null,
  lsid=null
}

As you can see, we are obtaining our update operation in the updateDescription field, but we are just getting the difference between this document and its prior version.

Note: You can use the UPDATE_LOOKUP option to retrieve the entire document during an update operation.

Conclusion

Change Streams MongoDB Java implementation facilitates a real-time and seamless connection between the frontend and backend. This functionality allows you to use MongoDB for the pubsub model, eliminating the need to manage Kafka or RabbitMQ deployments. If your application requires real-time data, you should investigate this MongoDB functionality. This tutorial should help you get started with change streams MongoDB Java.

In this blog, you have learned about Change Streams MongoDB Java implementation. Change Streams is an excellent feature of MongoDB for real-time application building. If you are unfamiliar with it, it’s the right time to start learning and exploring it. Feel free to share your view on the Change Streams MongoDB Java tutorial in below comment section.

Apart from MongoDB, you would use several applications and databases across your business for Marketing, Accounting, Sales, Customer Relationship Management, etc. It is essential to consolidate data from all these sources to get a complete overview of your business performance. To achieve this, you need to assign a portion of your Engineering Bandwidth to Integrate Data from all sources, Clean & Transform it, and finally, Load it to a Cloud Data Warehouse or a destination of your choice for further Business Analytics. All of these challenges can be comfortably solved by a Cloud-Based ETL tool such as Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline, can seamlessly transfer data from a vast sea of 100+ sources such as MongoDB & MongoDB Atlas to a Data Warehouse or a Destination of your choice to be visualized in a BI Tool. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using MongoDB as your NoSQL Database Management System and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of learning about the Change Streams MongoDB Java Implementation! Share your thoughts with us in the comments section below.

No-code Data Pipeline for MongoDB