Businesses around the world record data from a plethora of sources such as Relational Databases, SaaS applications, Flat files such as CSV, etc. Nowadays, a gradual shift from traditional On-premise Storage solutions to Cloud platforms has been observed in various organisations globally. Cloud Solutions such as Data Warehouses provide a structured, scalable central repository whereas Data Lakes offer the storage of all data formats including unstructured data with better accessibility and ease of use. 

In this article, you’ll learn how to effectively use Databricks Autoloader to simplify your Data Ingestion process.

What is Databricks?

Databricks Autoloader - Databricks Logo

Databricks is a flexible Cloud Data Lakehousing Engine that allows you to prepare & process data, train models, and manage the entire Machine Learning Lifecycle, from testing to production. Built on top of Apache Spark, a fast and generic engine for Large-Scale Data Processing, Databricks delivers reliable, top-notch performance. In addition to being easy to use, it also supports programming languages ​​like Python, R, Java, and SQL. On top of your Data lakes, Databricks provides Delta Lake, an Open Format Storage Layer that assists in ACID transactions, Scalable Metadata Handling, and unifies Streaming and Batch Data Processing.

What is Databricks Autoloader?

Databricks Autoloader is an Optimized File Source that can automatically perform incremental data loads from your Cloud storage as it arrives into the Delta Lake Tables. Databricks Autoloader presents a new Structured Streaming Source called cloudFiles. With the Databricks File System(DBFS) paths or direct paths to the data source as the input, it automatically sets up file notifications that subscribe to file events to handle new files on arrival with an option to process the existing ones in the directory.

Databricks Autoloader supports two methods to detect new files in your Cloud storage namely:

  • Directory Listing: This approach is useful for cases when only a few files are required to be streamed regularly. Here, the new files are recognised from listing the input directory. With just access to your Cloud Storage data, you can swiftly enable your Databricks Autoloader Streams. From the beginning, Databricks Autoloader automatically detects if the input directory is good for Incremental Listing. Though, you have the option to explicitly choose between the Incremental Listing or Full Directory Listing by setting cloudFiles.useIncrementalListing as true or false.
  • File Notification: As your directory size increases, you may want to switch over to the file notification mode for better scalability and faster performance. Using the Cloud services like Azure Event Grid and Queue Storage services, AWS SNS and SQS or GCS Notifications, and Google Cloud Pub/Sub services, it subscribes to file events in the input directory. 

Key Features of Databricks Autoloader

Using Autoloader can simplify your Data Ingestion process providing the following benefits to you:

  • Scalability: Databricks Autoloader can track billions of files by leveraging the Cloud Services and RockDB without the need to list all the files in your directory.
  • Cost-Effective: The notification mode for file detection eliminates the need for a directory list, thereby reducing costs. The cost of file discovery also directly depends on the number of files that are ingested rather than the number of directories in which the files arrive.
  • Ease of Use: The Databricks Autoloader sets up the notification mode and message queue services to perform Incremental Data Load. You also don’t require to track files or manage any state information on what files have arrived.
  • Schema Inference and Evolution: For cases when there are schema drifts such as new columns, Databricks Autoloader will manage it and notify you whenever schema changes. Using the semi-structured data access APIs, you can also rescue data (unexpected data in a column such as different data types) that otherwise may be lost or ignored. 

Prerequisites

  • An active Databricks account.
  • Working knowledge of Python.

How to use Databricks Autoloader: 3 Key Ways

Databricks Autoloader provides a seamless way to load raw data with low latency and less coding effort from your Engineering Team. The fundamental code in Python to start your Autoloader stream for writing data to Delta Lake in directory listing mode is:

df = spark.readStream.format("cloudFiles") 
  .option(<cloudFiles-option>, <option-value>) 
  .schema(<schema>) 
  .load(<input-path>)

df.writeStream.format("delta") 
  .option("checkpointLocation", <checkpoint-path>) 
  .trigger(<trigger>) 
  .start(<output-path>)

Here, 

  • cloudFiles-option: Autoloader Configuration option.
  • Schema: The data schema of the file you provide.
  • Input-path & utput-path: The input path to the storage where the new files arrive and the output stream path respectively.
  • checkpointLocation: Stream Checkpoint Location.
  • Trigger: An optional parameter to trigger your stream.

To understand the basics of using Databricks Autoloader effectively, let’s go through the following critical aspects:

A) Scheduled Batch & Streaming Loads using Databricks Autoloader

Databricks Autoloader - Batch and Streaming loads into Delta lake Using Autoloader
Image Source

For Streaming Jobs, you can start using Databricks Autoloader’s functionalities via the following code:

spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .load("/input/path")

In this example, similar to defining a streaming source, Databricks Autoloader creates cloudFiles that expect JSON files with a Directory Input Path that is continuously monitored for new files.

For instance, when data is coming at regular intervals such as every few hours, you can use Autoloader to create a Scheduled Job and reduce your running cost for Clusters by using the Structured Streaming’s Trigger.Once mode

val df = spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
         .load("/input/path")

df.writeStream.trigger(Trigger.Once)
         .format(“delta”)
         .start(“/output/path”)

B) Configure your Databricks Autoloader

Common for both file detection methods i.e. the Directory Listing, and File Notification mode, you can configure your Databricks Autoloader using the following parameter:

  • cloudFiles.allowOverwrites: With the default value set as true, this decides whether to permit input directory file changes to overwrite existing data.
  • cloudFiles.format: It specifies the data coming from the source path. For example, it takes .json for JSON files, .csv for CSV Files, etc.
  • cloudFiles.includeExistingFiles: Set to true by default, this checks whether to include existing files in the Stream Processing Input Path or to only handle the new files arriving after initial setup.
  • cloudFiles.inferColumnTypes: With the initial value set as false, this checks whether to infer exact column types when leveraging schema inference.
  • cloudFiles.maxBytesPerTrigger: This sets the maximum number of bytes the Autoloader will process in every trigger.
  • cloudFiles.maxFileAge: Used to set the duration for which the event is tracked for deduplication purposes. This is commonly used when rapidly ingesting millions of files per hour.
  • cloudFiles.resourceTags: Key-value pairs to help identify the right resources.
  • cloudFiles.schemaEvolutionMode: Sets various modes for schema evolution i.e when new columns are detected in the data.
  • cloudFiles.schemaHints: This is the schema information of your data provided by you to the Autoloader.
  • cloudFiles.schemaLocation: This describes the location for storing the inferred schema along with associated changes.
  • cloudFiles.validateOptions: This checks if the Autoloader options discussed above so far are having a valid value or not.
  • cloudFiles.backfillInterval: 100% delivery of the file that has been uploaded is not guaranteed via the file notification mode. You can use the Backfills to ensure all the files get processed. This parameter decides the interval for triggering the Backfills.   

C) Ingest CSV, JSON, or Image Data with Databricks Autoloader

Dataloader allows efficient data ingestion for various file formats such as JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC files.

To ingest the CSV data via the Autoloader, you can use the following code:

spark.readStream.format("cloudFiles") 
  .option("cloudFiles.format", "csv") 
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") 
  .load("<path-to-source-data>") 
  .writeStream 
  .option("mergeSchema", "true") 
  .option("checkpointLocation", "<path-to-checkpoint>") 
  .start("<path-to-target")

The above example has CSV for the cloudFiles.Format configuration parameter. You can set it to json for ingesting JSON files.

To write your image data in a optimised format into a Delta Lake Table, use the code given below:

spark.readStream.format("cloudFiles") 
  .option("cloudFiles.format", "binaryFile") 
  .load("<path_to_source_data>") 
  .writeStream 
  .option("checkpointLocation", "<path_to_checkpoint>") 
  .start("<path_to_target")

Conclusion

In this article, you have learned the basics to effectively use Databricks Autoloader for ingesting data from your Cloud Storage with enhanced Performance, Scalability, Cost-Effectiveness, and Ease of use. It reduces the time and effort your DevOps teams need to put in manually listing the files, handling Cloud notifications setup, etc. According to your business use cases, you can configure the Autoloader options and eventually save costs. Autoloader supports Data Ingestion for several file formats and can automatically detect changes in the data such as the addition of new columns.

Share with us your experience of using Databricks Autoloader. Let us know in the comments section below!  

Sanchit Agarwal
Research Analyst, Hevo Data

Sanchit Agarwal is an Engineer turned Data Analyst with a passion for data, software architecture and AI. He leverages his diverse technical background and 2+ years of experience to write content. He has penned over 200 articles on data integration and infrastructures, driven by a desire to empower data practitioners with practical solutions for their everyday challenges.