Databricks Autoloader: Data Ingestion Simplified 101

on Data Automation, Data Lake, Data Loading, Databricks, Python • November 15th, 2021 • Write for Hevo

Businesses around the world record data from a plethora of sources such as Relational Databases, SaaS applications, Flat files such as CSV, etc. Nowadays, a gradual shift from traditional On-premise Storage solutions to Cloud platforms has been observed in various organisations globally. Cloud Solutions such as Data Warehouses provide a structured, scalable central repository whereas Data Lakes offer the storage of all data formats including unstructured data with better accessibility and ease of use. 

Incorporating the best of these two solutions, a Data Lakehouse such as Databricks provides a lightning-fast Data Processing and Transforming platform with in-built tools for SQL Analytics, BI Reporting, Data Science, and Machine Learning. To ingest data more efficiently into your Databricks Delta Lake Tables you can use Databricks Autoloader. As the new files land in your Cloud Storage like Azure Data Lake Storage, Amazon S3, or Google Cloud Storage, the Databricks Autoloader starts automatically processing them.

In this article, you’ll learn how to effectively use Databricks Autoloader to simplify your Data Ingestion process.

Table of Contents

What is Databricks?

Databricks Autoloader - Databricks Logo
Image Source

Databricks is a flexible Cloud Data Lakehousing Engine that allows you to prepare & process data, train models, and manage the entire Machine Learning Lifecycle, from testing to production. Built on top of Apache Spark, a fast and generic engine for Large-Scale Data Processing, Databricks delivers reliable, top-notch performance. In addition to being easy to use, it also supports programming languages ​​like Python, R, Java, and SQL. On top of your Data lakes, Databricks provides Delta Lake, an Open Format Storage Layer that assists in ACID transactions, Scalable Metadata Handling, and unifies Streaming and Batch Data Processing.

Databricks is also integrated with major Cloud service providers such as Amazon Web Services, Microsoft Azure, and Google Cloud Platform. This allows you to start using Databricks on top of your desired Cloud Storage platform, giving you more control over your data as it remains in your Cloud account and Data Sources. Acting as a unified solution for all your Data Science, Machine Learning, and enterprise teams, it offers features like MFlow for complete ML Lifecycle Management, BI Reporting on Delta Lake for Real-time Business Analytics, and the Databricks Workspace that promotes workplace collaboration, where several teams can interact and work at the same time.

What is Databricks Autoloader?

Databricks Autoloader - Autoloader Relationship Diagram
Image Source

Databricks Autoloader is an Optimized File Source that can automatically perform incremental data loads from your Cloud storage as it arrives into the Delta Lake Tables. Databricks Autoloader presents a new Structured Streaming Source called cloudFiles. With the Databricks File System(DBFS) paths or direct paths to the data source as the input, it automatically sets up file notifications that subscribe to file events to handle new files on arrival with an option to process the existing ones in the directory.

Databricks Autoloader supports two methods to detect new files in your Cloud storage namely:

  • Directory Listing: This approach is useful for cases when only a few files are required to be streamed regularly. Here, the new files are recognised from listing the input directory. With just access to your Cloud Storage data, you can swiftly enable your Databricks Autoloader Streams. From the beginning, Databricks Autoloader automatically detects if the input directory is good for Incremental Listing. Though, you have the option to explicitly choose between the Incremental Listing or Full Directory Listing by setting cloudFiles.useIncrementalListing as true or false.
  • File Notification: As your directory size increases, you may want to switch over to the file notification mode for better scalability and faster performance. Using the Cloud services like Azure Event Grid and Queue Storage services, AWS SNS and SQS or GCS Notifications, and Google Cloud Pub/Sub services, it subscribes to file events in the input directory. 

Key Features of Databricks Autoloader

Databricks Autoloader - Autoloader Advantages
Image Source

Using Autoloader can simplify your Data Ingestion process providing the following benefits to you:

  • Scalability: Databricks Autoloader can track billions of files by leveraging the Cloud Services and RockDB without the need to list all the files in your directory.
  • Cost-Effective: The notification mode for file detection eliminates the need for a directory list, thereby reducing costs. The cost of file discovery also directly depends on the number of files that are ingested rather than the number of directories in which the files arrive.
  • Ease of Use: The Databricks Autoloader sets up the notification mode and message queue services to perform Incremental Data Load. You also don’t require to track files or manage any state information on what files have arrived.
  • Schema Inference and Evolution: For cases when there are schema drifts such as new columns, Databricks Autoloader will manage it and notify you whenever schema changes. Using the semi-structured data access APIs, you can also rescue data (unexpected data in a column such as different data types) that otherwise may be lost or ignored. 

Simplify Databricks ETL and Analysis with Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully-managed solution to set up data integration from 100+ Data Sources (including 40+ Free Data Sources) and will let you directly load data to Databricks or a Data Warehouse/Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its Fault-Tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data.

Its completely automated Data Pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensure that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different BI tools as well.

Get Started with Hevo for Free

Check out some of the cool features of Hevo:

  • Completely Automated: The Hevo platform can be set up in just a few minutes and requires minimal maintenance.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; and MySQL, SQL Server, TokuDB, DynamoDB, PostgreSQL Databases to name a few.  
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • 100% Complete & Accurate Data Transfer: Hevo’s robust infrastructure ensures reliable data transfer with zero data loss.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (Including 40+ Free Sources) that can help you scale your data infrastructure as required.
  • 24/7 Live Support: The Hevo team is available round the clock to extend exceptional support to you through chat, email, and support calls.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow so you can check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

Prerequisites

  • An active Databricks account.
  • Working knowledge of Python.

How to use Databricks Autoloader: 3 Key Ways

Databricks Autoloader provides a seamless way to load raw data with low latency and less coding effort from your Engineering Team. The fundamental code in Python to start your Autoloader stream for writing data to Delta Lake in directory listing mode is:

df = spark.readStream.format("cloudFiles") 
  .option(<cloudFiles-option>, <option-value>) 
  .schema(<schema>) 
  .load(<input-path>)

df.writeStream.format("delta") 
  .option("checkpointLocation", <checkpoint-path>) 
  .trigger(<trigger>) 
  .start(<output-path>)

Here, 

  • cloudFiles-option: Autoloader Configuration option.
  • Schema: The data schema of the file you provide.
  • Input-path & utput-path: The input path to the storage where the new files arrive and the output stream path respectively.
  • checkpointLocation: Stream Checkpoint Location.
  • Trigger: An optional parameter to trigger your stream.

To understand the basics of using Databricks Autoloader effectively, let’s go through the following critical aspects:

A) Scheduled Batch & Streaming Loads using Databricks Autoloader

Databricks Autoloader - Batch and Streaming loads into Delta lake Using Autoloader
Image Source

For Streaming Jobs, you can start using Databricks Autoloader’s functionalities via the following code:

spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .load("/input/path")

In this example, similar to defining a streaming source, Databricks Autoloader creates cloudFiles that expect JSON files with a Directory Input Path that is continuously monitored for new files.

For instance, when data is coming at regular intervals such as every few hours, you can use Autoloader to create a Scheduled Job and reduce your running cost for Clusters by using the Structured Streaming’s Trigger.Once mode

val df = spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
         .load("/input/path")

df.writeStream.trigger(Trigger.Once)
         .format(“delta”)
         .start(“/output/path”)

B) Configure your Databricks Autoloader

Common for both file detection methods i.e. the Directory Listing, and File Notification mode, you can configure your Databricks Autoloader using the following parameter:

  • cloudFiles.allowOverwrites: With the default value set as true, this decides whether to permit input directory file changes to overwrite existing data.
  • cloudFiles.format: It specifies the data coming from the source path. For example, it takes .json for JSON files, .csv for CSV Files, etc.
  • cloudFiles.includeExistingFiles: Set to true by default, this checks whether to include existing files in the Stream Processing Input Path or to only handle the new files arriving after initial setup.
  • cloudFiles.inferColumnTypes: With the initial value set as false, this checks whether to infer exact column types when leveraging schema inference.
  • cloudFiles.maxBytesPerTrigger: This sets the maximum number of bytes the Autoloader will process in every trigger.
  • cloudFiles.maxFileAge: Used to set the duration for which the event is tracked for deduplication purposes. This is commonly used when rapidly ingesting millions of files per hour.
  • cloudFiles.resourceTags: Key-value pairs to help identify the right resources.
  • cloudFiles.schemaEvolutionMode: Sets various modes for schema evolution i.e when new columns are detected in the data.
  • cloudFiles.schemaHints: This is the schema information of your data provided by you to the Autoloader.
  • cloudFiles.schemaLocation: This describes the location for storing the inferred schema along with associated changes.
  • cloudFiles.validateOptions: This checks if the Autoloader options discussed above so far are having a valid value or not.
  • cloudFiles.backfillInterval: 100% delivery of the file that has been uploaded is not guaranteed via the file notification mode. You can use the Backfills to ensure all the files get processed. This parameter decides the interval for triggering the Backfills.   

C) Ingest CSV, JSON, or Image Data with Databricks Autoloader

Dataloader allows efficient data ingestion for various file formats such as JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC files.

To ingest the CSV data via the Autoloader, you can use the following code:

spark.readStream.format("cloudFiles") 
  .option("cloudFiles.format", "csv") 
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") 
  .load("<path-to-source-data>") 
  .writeStream 
  .option("mergeSchema", "true") 
  .option("checkpointLocation", "<path-to-checkpoint>") 
  .start("<path-to-target")

The above example has CSV for the cloudFiles.Format configuration parameter. You can set it to json for ingesting JSON files.

To write your image data in a optimised format into a Delta Lake Table, use the code given below:

spark.readStream.format("cloudFiles") 
  .option("cloudFiles.format", "binaryFile") 
  .load("<path_to_source_data>") 
  .writeStream 
  .option("checkpointLocation", "<path_to_checkpoint>") 
  .start("<path_to_target")

All of the above codes are in Python, you can also perform the above operations in Scala.

Conclusion

In this article, you have learned the basics to effectively use Databricks Autoloader for ingesting data from your Cloud Storage with enhanced Performance, Scalability, Cost-Effectiveness, and Ease of use. It reduces the time and effort your DevOps teams need to put in manually listing the files, handling Cloud notifications setup, etc. According to your business use cases, you can configure the Autoloader options and eventually save costs. Autoloader supports Data Ingestion for several file formats and can automatically detect changes in the data such as the addition of new columns.

Apart from the data on the Cloud Storage, business data is also stored in various applications used for Marketing, Customer Relationship Management, Accounting, Sales, Human Resources, etc. Collecting data from all these applications is of utmost importance as they provide a clear and deeper understanding of your business performance. To efficiently handle this massive amount of data can be extremely challenging as you have to build integrations with every existing platform as well as all the new ones that may add up in the future. Your Engineering Team would also require to continuously update the connectors as they evolve with every new release. All of this can be effortlessly automated by a Cloud-Based ETL tool like Hevo Data.  

Visit our Website to Explore Hevo

Hevo Data is a No-code Data Pipeline that assists you in seamlessly transferring data from a vast collection of sources into a Data Lake like Databricks, Data Warehouse, or a Destination of your choice to be visualized in a BI Tool. It is a secure, reliable, and fully automated service that doesn’t require you to write any code!

If you are using Databricks as a Data Lakehouse and Analytics platform in your business and searching for a stress-free alternative to Manual Data Integration, then Hevo can effectively automate this for you. Hevo with its strong integration with 100+ Data Sources & BI tools (Including 40+ Free Sources), allows you to not only export & load Data but also transform & enrich your Data & make it analysis-ready.

Want to simplify your Data Integration process using Hevo? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. Check out the pricing details to get a better understanding of which plan suits you the most.

Share with us your experience of using Databricks Autoloader. Let us know in the comments section below!  

No-code Data Pipeline for Databricks