Python Spark MongoDB Connection & Workflow: A Comprehensive Guide 101

on Apache Spark, Big Data, Data Integration, Database Management Systems, MongoDB • February 22nd, 2022 • Write for Hevo

Python Spark MongoDB

Apache Spark is an advanced Computing Engine that focuses on speed, usability, and algorithmic capable analytics. Spark performs especially well when quick functioning is required. MongoDB is a popular NoSQL database that is used for real-time data analysis on organizational data. Also, Apache Spark expands core analytics to include real-time analysis tools and Machine Learning training models.

Upon a complete walkthrough of this article, you will gain a decent understanding of Python Spark MongoDB Connection and Workflow. Read along to learn more about the Python Spark MongoDB Connection!

Table of Contents

Prerequisites

  • Basic knowledge of Apache Spark and MongoDB Servers.
  • Running MongoDB instances is required while creating connections to MongoDB. The version on instance should be greater than version 2.6 and above.
  • The spark version should be 2.4.x, and Scala should be 2.12.x.

What is Apache Spark?

Spark Logo
Image Source

Today, Data scientists are eagerly interested in studying more about Natural Language Processing and other Machine Learning technologies. When Big Data technology was introduced, Apache Spark quickly gained attraction in the modern age of network data processing because it provides a simpler to use, flexible, and in-memory approach compared to the MapReduce approach.

Apache Spark is a Distributed Processing System that is Open Source and used for Big Data workloads. For fast analytic queries against any size of data, it uses in-memory caching and optimized query execution. Due to these features, it has overtaken the MapReduce approach in terms of marketplace usability. There are no restrictions on how it will be utilized with de-centralized storage systems such as HDFS, Apache Cassandra, and MongoDB.

What is MongoDB?

MongoDB Logo
Image Source

MongoDB is a NoSQL database that is becoming widely popular. This database’s exceptional capability is to manage document-oriented content by utilizing the designed distribution, management, and replication features. It allows scaling horizontally while also maintaining high provision and reliability.

You can change the structure of records (which MongoDB refers to as Documents) by simply adding new fields or deleting existing ones. This feature of MongoDB allows you to easily represent Hierarchical Relationships, Store Arrays, and other complex Data Structures. Nowadays, many tech giants, including Facebook, eBay, Adobe, and Google, use MongoDB to store colossal amounts of data

Simplify MongoDB ETL & Analysis Using Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up Data Integration from 100+ Data Sources (including 40+ Free sources) and will let you directly load data from sources like MongoDB to a Data Warehouse or the Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data. 

Get Started with Hevo for Free

Let’s look at some of the salient features of Hevo:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and maps it to the destination schema.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; and MySQL, SQL Server, TokuDB, MongoDB, PostgreSQL Databases to name a few.  
  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Python Spark MongoDB Connection & Workflow

Python Spark MongoDB Connection
Image Source

Here we are using the pyspark shell while connecting Python to MongoDB. PySpark is responsible for connecting Python API to the Spark core and setup the spark context. Many Python applications can set up spark context through self-contained code.

Python Spark Shell

When we start with the Python Spark shell, We need to set up some constraints and specify them according to our needs. It includes:

  • Downloading and installing the MongoDB Spark Connector package using the  – – packages option.
  • The MongoDB Spark Connector can be configured using the –conf function option.

Whenever you define the Connector configuration using SparkConf, you must ensure that all settings are initialized correctly.

Example Scenario

Here we take the example of Python spark-shell to MongoDB. It should be initialized with command-line execution. We are using here database and collections.

  • Using spark.mongodb.input.uri provides the MongoDB server address (127.0.0.1), the database to connect to (test), the collections (myCollection) from where to read data, and the reading option. The databases to connect to MongoDB is determined by the spark.mongodb.connection.uri.
  • When using the spark.mongodb.output.uri parameter, you can specify the MongoDB server IP (127.0.0.1), the databases to connect to (test), and the collections (myCollection) where data write to get an output of the SparkSession. By default, it creates a connection to port 27017.
  • The packages provide the Apache location coordinates for the Spark Connection in the configuration groupId:artifactId:version.

Creating SparkSession Object

When you first run pyspproviderk, we create a spark SparkSession object. You must explicitly build your SparkSession. The following example shows the Sparksession my_spark in Python application only.

Then, the standard SparkSession object uses the configuration settings you gave when you launched pyspark, including the spark.mongodb.input.uri and spark.mongodb.output.uri config options. 

When you prefer to configure a custom SparkSession object within pyspark, you can call SparkSession.builder and specify various configuration parameters. Alternatively, you can use SparkSession.builder and specify other customization settings.

from pyspark.sql import SparkSession

my_spark = SparkSession 
   .builder 
   .appName("PyApp") 
   .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.coll") 
   .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.coll") 
.getOrCreate()

Using the SparkSession object, you may conduct actions such as writing data to MongoDB, reading data from MongoDB, creating DataFrames, and performing SQL queries.

Read Data from MongoDB

When utilizing the spark.mongodb.input.uri parameter in your SparkSession option, you can build a Spark DataFrame to store data from the MongoDB database given in the spark.mongodb.input.uri function in your SparkSession method.

Example:

Consider the Collection of data named Cars that contain the following document:

{ "_id" : 1, "company" : "Tata", "quantity" : 10 }
{ "_id" : 2, "company" : "Mercedeze", "quantity" : 15 }
{ "_id" : 3, "company" : "Jeep", "quantity" : 20 }

Python Spark MongoDB may bind the collections to a DataFrame with spark.read(). This process is to be performed inside the pyspark shell.

spark.read.format("Tata"). load()

Spark performs a sampling operation to deduce the collection configuration for each record in the data collection.

The Python spark-shell output is generated by the preceding operation below.

root
|-- id: double (nullable = true)
|-- quantity: double (nullable = true)
|-- company: string (nullable = true)

The .option function can be used to read data into an array of DataFrames if you want to interpret from a distinct MongoDB collection.

If you want to read data from a collection named a city in a dataset called country, you can use the people parameter like country.city in the URI tag for the input function.

spark.read.format("Tata").option("uri",
"mongodb://127.0.0.1/country.city").load()

In this way, we can read data from MongoDB through the Python Spark connection. Next, let us look into writing to the MongoDB function.

Write to MongoDB

A DataFrame is formed by first creating a SparkSession object and then calling the createDataFrame() method on that object. createDataFrame() in the below scenario accepts a collection of tuples that include country and prices.

country = spark.createDataFrame([("India",  50000), ("USA", 30000), ("China", 20000)], ["country", "price"])

The write method can be used to write the country DataFrame to the MongoDB dataset and collection defined in spark.mongodb.output.uri.

country.write.format("mongo").mode("append").save()

When you connect to the pyspark shell, the spark.mongodb.output.uri method specifies the MongoDB dataset and collection to write to it. So by country.show() function data prints in pyspark in below form.

country price
India50000
USA30000
China20000

And in pyspark shell, output results are,

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- price: long (nullable = true)
 |-- country: string (nullable = true)

You can use the .option() method in the function with .write() to start writing to a distinct MongoDB collection. We can use the same URI logic of reading from MongoDB here as well.

Benefits of using Python Spark MongoDB 

Python Spark MongoDB connector benefits:

  • Easily read and write BSON documents directly to/from MongoDB.
  • Conversion of MongoDB Collections to Spark RDD.
  • Integration between MongoDB aggregation pipelines. 
  • Spark has predicate pushdown that prevents memory from being loaded unnecessarily with the filtered data.
  • Spark MongoDB setup fetches data based on cluster location. It saves network traffic when uploading data into Spark units.

Conclusion

This article introduced you to Python Spark MongoDB Connection & Workflow in detail. MongoDB and Apache Spark allow outcomes by transforming data into actionable real-time scenarios. Developers can create more useful apps through python in less time with Spark and MongoDB connections. It is built on single database architecture. As we see, MongoDB connections simplify the interaction between Spark and MongoDB, which develops a robust combination of sophisticated Python applications.

As your business begins to grow, data is generated at an exponential rate across all of your company’s SaaS applications, Databases, and other sources. To meet this growing storage and computing needs of data,  you would require to invest a portion of your engineering bandwidth to Integrate data from all sources, Clean & Transform it, and finally load it to a Cloud Data Warehouse for further Business Analytics. All of these challenges can be efficiently handled by a Cloud-Based ETL tool such as Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources like MongoDB and a wide variety of Desired Destinations, with a few clicks. Hevo Data with its strong integration with 100+ sources (including 40+ free sources) allows you to not only export data from your desired data sources & load it to the destination of your choice, but also transform & enrich your data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Share with us your experience of learning about the Python Spark MongoDB Connection & Workflow in the comments below!

No Code Data Pipeline For MongoDB