Modern applications are in the race to bring real-time synchronization mechanisms. Consider powering trading apps that must update in real-time as stock values fluctuate. Alternatively, you might build an IoT data pipeline that produces warnings anytime a connected car leaves a geofenced region.
This is where the Change Streams MongoDB Java feature comes into play, making it feasible. Change streams, a new feature in MongoDB 3.6, allow applications to broadcast real-time data updates by utilizing MongoDB’s underlying replication capabilities.
MongoDB users can now easily retrieve real-time data updates using change streams, eliminating the complexity and danger of tailing the oplog (operation log).
Any application can easily monitor changes and respond promptly by making judgments that aid the organization in responding to events in real-time. Read along to know how to leverage Change Streams MongoDB Java in your application.
What are Change Streams in MongoDB?
Change Stream was introduced in MongoDB 3.6. Change streams are the real-time stream of changes in a database, collection, or deployment. They enable applications to access real-time data updates without the complexity and risk of tailing the oplog. For example, when an update occurs in a collection, MongoDB generates a change event.
Change streams can notify your application of all writes to documents (including deletes) and provide access to all the relevant information as changes occur, eliminating the need for polling, which can introduce delays, incur higher overhead, and lead to missed opportunities.
Key Features of MongoDB Change Streams
- Targeted changes: Updates can be filtered to highlight listening apps’ relevant and targeted changes. For example, filters can be based on operation type or fields inside the document.
- Resumablility: Each change stream response involves a resume token. Suppose the connection between the application and the database is suddenly interrupted. In that case, the application can send the last resume token it received, and the change streams will pick up immediately where the application left off.
- Security: Change streams are safe because users can only establish change streams on collections to which they have been allowed read access.
- Ease of use: Change streams are well-known – the API syntax uses the established MongoDB drivers and query language and is independent of any specific oplog format.
Achieve seamless integration with MongoDB using Hevo’s real-time data sync capabilities. Effortlessly capture and transfer data from MongoDB to your desired destination, ensuring your analytics are always up-to-date and accurate.
Hevo offers:
- Minimal Learning: Hevo’s simple and interactive UI makes it extremely simple for new customers to work on and perform operations.
- Live Support: The Hevo team is available 24/7 to extend exceptional support to its customers through chat, E-Mail, and support calls.
- Transformational Capabilities: It provides pre- and post-load transformational capabilities to ensure your data is always analysis ready.
- Transparent Pricing: Hevo offers transparent pricing with no hidden fees, allowing you to budget effectively while scaling your data integration needs.
Try Hevo today to experience seamless data transformation and migration.
Get Started with Hevo for Free
Why Use Change Streams MongoDB Java Implementation?
Introduced in MongoDB 3.6, MongoDB change streams have been in the play for some time now, but unfortunately, it hasn’t got the limelight it deserves, and most of the MongoDB dev fraternity hasn’t explored the power it brings onboard.
It is a hidden gem as many developers are still unaware of its powers and the plethora of options it brings to the table.
Below are some crucial advantages of using change streams MongoDB java Implementation.
- It enables applications to stream real-time data changes by leveraging MongoDB’s underlying replication capabilities.
- We can open a change stream cursor for collections, databases, or even at the deployment level, which means we can watch for changes to all collections and databases. From a database and deployments perspective, we must remember that we can do it across all databases, not admin, local, and config databases.
- One of the most significant advantages of implementing change streams is that it reduces the development effort considerably, especially while implementing common use cases like notification services, real-time analytics, ETL services, and cross-platform synchronizations.
- The implementation and development effort is drastically reduced compared to any similar type of implementation you would be doing.
- Reduced dev effort and almost zero infrastructure cost mean a higher return on your investment. It practically means that if you have MongoDB clusters already in production, you can implement change streams without any additional infrastructure burden and literally at zero cost.
MongoDB Change Streams Data Flow
Change streams data flow enables developers to build reactive real-time services. Change stream data flow is more straightforward than any custom-built data change management implementation.
You have data from your business, various business apps, user data, IoT sensors, or third-party sources. As the data enters MongoDB, it is almost immediately available for change streams.
Using the change streams API, we can manage the real-time notifications and send the change events or notifications across to our different apps.
We could send it to our custom apps; we could send it to Atlas triggers or even your Kafka implementations. The only thing you have to do is just to build a connection between Change Streams MongoDB and the final application. It’s just a few lines of code to enable pushing data to your downstream applications.
Prerequisites
Before you begin using this handbook, you should be familiar with the following prerequisites:
- Working familiarity with MongoDB is required.
- MongoDB version 3.6 or above is required.
- MongoDB Atlas Cluster
- A general understanding of the principles of sharding and replication.
- A general overview of the MongoDB operations log.
- A basic understanding of Change Events.
Integrate MongoDB to BigQuery
Integrate MongoDB to Snowflake
Integrate MongoDB Atlas to Redshift
Change Streams MongoDB Java Implementation
In this section, we will walk through the steps/ operations for Change Streams MongoDB Java Implementation.
For the sake of simplicity, we will be using the following code repo available on Github. You can clone the repository to get started.
git clone https://github.com/mongodb-developer/java-quick-start
After cloning the repo, you must have seen multiple files. In this tutorial, we will be working on the ChangeStream.java file. We will use several examples to demonstrate some of the Change Stream’s features. We will just work on the code directly linked to the Change Streams for simplicity.
Creating Simple Change Streams Without Filters
Let us begin with the simplest Change Streams MongoDB Java Implementation we can create:
MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
ChangeStreamIterable<Grade> changeStream = grades.watch();
changeStream.forEach((Consumer<ChangeStreamDocument<Grade>>) System.out::println);
The critical takeaway from the above code is as follows:
- db.collection.watch() function is used to open a change stream cursor on the collection.
- This returns a ChangeStreamIterable that can be iterated to return our change events as indicated by its name.
- In our program, we are iterating over Change Stream to print our change event documents in the Java standard output.
The simpler and shorter form of the above code is presented below:
grades.watch().forEach(printEvent());
private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
return System.out::println;
}
To run the above example: Uncomment only example 1 from the ChangeStreams.java file. Start it in your IDE or a dedicated terminal at the root of your project by using Maven. Use the following code snippet to start execution through the console terminal.
mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
The second Java file we will be using is MappingPOJO.java. This program will generate MongoDB operations that we will observe in the Change Streams output.
Start MappingPOJO.java in another console or in your IDE.
mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.MappingPOJO" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
In MappingPOJO, we are performing four MongoDB operations:
- Creating a new Grade document with the insertOne() method.
- Searching for this Grade document using the find() method.
- Replacing this Grade entirely using the findOneAndReplace() method.
- Deleting this Grade using the deleteOne() method.
MappingJava’s typical output proves this:
Grade inserted.
Grade found: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}]}
Grade replaced: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}, Score{type='exam', score=42.0}]}
Grade deleted: AcknowledgedDeleteResult{deletedCount=1}
Note: The find() method simply reads a document from MongoDB. It doesn’t change anything. Hence it doesn’t generate an event in the Change Stream.
Insert and Delete OperationType Filter in Change Streams
Let’s perform some filter operations. This time we’ll just be interested in insert and delete actions only.
List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete"))));
grades.watch(pipeline).forEach(printEvent());
Uncomment example 2 in ChangeStreams.java and run the program, followed by MappingPOJO.java, as we did previously.
You must receive the following output:
ChangeStreamDocument {operationType=OperationType {value= 'insert'},
resumeToken= {"_data": "825E2F4983000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
namespace=sample_training.grades,
destinationNamespace=null,
fullDocument=Grade
{
id=5e2f4983cc1d2842bff55564,
student_id=10003.0,
class_id=10.0,
scores= [ Score {type= 'homework', score=50.0}]
},
documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564" }},
clusterTime=Timestamp {value=6786723990460170241, seconds=1580157315, inc=1 },
updateDescription=null,
txnNumber=null,
lsid=null
}
ChangeStreamDocument { operationType=OperationType {value= 'delete'},
resumeToken= {"_data": "825E2F4983000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
namespace=sample_training.grades,
destinationNamespace=null,
fullDocument=null,
documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564"}},
clusterTime=Timestamp {value=6786723990460170244, seconds=1580157315, inc=4},
updateDescription=null,
txnNumber=null,
lsid=null
}
]
Note: This time, we’re only getting 2 events,i.e., insert and delete.
Migrate Data seamlessly from MongoDB with Hevo!
No credit card required
Update OperationType Filter in Change Streams
This time we will use the Update OperationType filter.
List<Bson> pipeline = singletonList(match(eq("operationType", "update")));
grades.watch(pipeline).forEach(printEvent());
Follow these instructions this time.
- Uncomment example 3 in ChangeStreams.java, then run Create.java if you haven’t already. These new documents will be used in the following stage.
- In another console, run Update.java.
In the output, you would get the following;
ChangeStreamDocument {operationType=OperationType {value= 'update'},
resumeToken= {"_data": "825E2FB83E000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
namespace=sample_training.grades,
destinationNamespace=null,
fullDocument=null,
documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
clusterTime=Timestamp {value=6786845739898109953, seconds=1580185662, inc=1},
updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.10": "You will learn a lot if you read the MongoDB blog!"}},
txnNumber=null,
lsid=null
}
As you can see, we are obtaining our update operation in the updateDescription field, but we are just getting the difference between this document and its prior version.
Note: You can use the UPDATE_LOOKUP option to retrieve the entire document during an update operation.
Conclusion
Change Streams MongoDB Java implementation facilitates a real-time and seamless connection between the frontend and backend. This functionality allows you to use MongoDB for the pubsub model, eliminating the need to manage Kafka or RabbitMQ deployments.
In this blog, you have learned about Change Streams MongoDB Java implementation. Change Streams is an excellent feature of MongoDB for real-time application building. If you are unfamiliar with it, it’s the right time to start learning and exploring it.
Apart from MongoDB, you would use several applications and databases across your business for Marketing, Accounting, Sales, Customer Relationship Management, etc. It is essential to consolidate data from all these sources to get a complete overview of your business performance.
To achieve this, you need to assign a portion of your Engineering Bandwidth to Integrate Data from all sources, Clean & Transform it, and finally, Load it to a Cloud Data Warehouse or a destination of your choice for further Business Analytics. All of these challenges can be comfortably solved by a Cloud-Based ETL tool such as Hevo Data. Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.
FAQs
1. What are Change Streams in MongoDB?
Change Streams allow applications to access real-time updates of data changes in a MongoDB database or collection, eliminating the need for polling and enabling instant responses to events.
2. How do I implement Change Streams in Java?
You can implement Change Streams in Java by using the watch() method on a collection and iterating over the change events.
3. What are the benefits of using Change Streams?
Change Streams provide real-time notifications, reduce development effort for applications like analytics or notifications, and eliminate the need for additional infrastructure.
Suraj has over a decade of experience in the tech industry, with a significant focus on architecting and developing scalable front-end solutions. As a Principal Frontend Engineer at Hevo, he has played a key role in building core frontend modules, driving innovation, and contributing to the open-source community. Suraj's expertise includes creating reusable UI libraries, collaborating across teams, and enhancing user experience and interface design.