Databricks is an Enterprise Software company that was founded by the creators of Apache Spark. It is known for combining the best of Data Lakes and Data Warehouses in a Lakehouse Architecture.

This blog talks about the different commands you can use to leverage SQL in Databricks in a seamless fashion. These include commands like SELECT, CREATE FUNCTION, INSERT, LOAD, etc. It also gives a brief overview of Databricks and SQL before diving into the various Databricks SQL functions that you can leverage for your business.

What is Databricks?

Databricks SQL: Databricks Logo

Databricks is a Cloud-based Data platform powered by Apache Spark. It primarily focuses on Big Data Analytics and Collaboration. With Databricks’ Machine Learning Runtime, managed ML Flow, and Collaborative Notebooks, you can avail a complete Data Science Workspace for Business Analysts, Data Scientists, and Data Engineers to collaborate. Databricks houses the Dataframes and Spark SQL libraries that allow you to interact with Structured data.

Key Features of Databricks

Databricks has carved a name for itself as an industry-leading solution for Data Analysts and Data Scientists due to its ability to transform and handle large amounts of data. Here are a few key features of Databricks:

  • Delta Lake: Databricks houses an Open-source Transactional Storage Layer meant to be used for the whole Data Lifecycle.
  • Optimized Spark Engine: Databricks allows you to avail the most recent versions of Apache Spark. You can also effortlessly integrate various Open-source libraries with Databricks.
  • Machine Learning: Databricks offers you one-click access to preconfigure Machine Learning environments with the help of cutting-edge frameworks like Tensorflow, Scikit-Learn, and Pytorch.  
  • Collaborative Notebooks: Armed with the tools and the language of your choice, you can instantly analyze and access your data, collectively build models, discover and share new actionable insights.

What is SQL?

SQL is designed for managing the data in a Relational Database Management System (RDBMS) based on Tuple Relational Calculus and Relational Algebra. You need to install and set up a database first to perform SQL queries. 

SQL is divided into several language elements such as:

  • Predicates: The predicates specify the conditions that can be evaluated to SQL three-valued logic (3VL) or Boolean truth values.
  • Expressions: These can produce either table consisting of rows and columns of data or scalar values. 
  • Clauses: These are constituent components of statements and queries.
  • Queries: Queries can help you retrieve the data based on specific criteria, and is an important part of SQL.
  • Statements: These may have a persistent effect on data and schema, or may be involved in controlling connections, transactions, sessions, program flow, or diagnostics.

How to Use SQL in Databricks?

Here are a few commands that you can leverage to use SQL in Databricks in a seamless fashion:

ALTER TABLE

This command can be used to alter the properties or schema of a table. If the table is cached, then this command clears the cached data of the table and all the dependents referring to this table. The cache will then be lazily filled when the table or any of its dependents are accessed the next time. Here is the syntax for this command:

ALTER TABLE table_name
   { RENAME clause |
     ADD COLUMN clause |
     ALTER COLUMN clause |
     ADD CONSTRAINT clause |
     DROP CONSTRAINT clause |
     SET LOCATION clause |
     ADD PARTITION clause |
     DROP PARTITION clause |
     RENAME PARTITION clause |
     RECOVER PARTITIONS clause |
     SET TBLPROPERTIES clause |
     UNSET TBLPROPERTIES clause |
     SET LOCATION clause }

CREATE DATABASE

This command can be used to create a Database with the specified name. If the newly created Database shares its name with a database that already exists, then an exception is thrown. Here is the syntax to create a Database:

CREATE { DATABASE | SCHEMA } [ IF NOT EXISTS ] database_name
    [ COMMENT database_comment ]
    [ LOCATION database_directory ]
    [ WITH dbproperties ]
dbproperties
  DBPROPERTIES ( { property_name = property_value } [, ...] )

CREATE TABLE

This command can be used to define a table in an existing Database. There are three primary ways to create a table for multiple purposes:

  • CREATE TABLE LIKE: You can use this syntax to create a new table based on the definition, instead of the data of another table.
  • CREATE TABLE [USING]: You can leverage this syntax if the new table is either derived from a query, or derived from data at an existing storage location, or based on a column definition provided by you.
  • CREATE TABLE CLONE: You can leverage table cloning for Delta Lake tables to either make a complete, independent copy of a table including its data and definition at a specific version also known as a DEEP CLONE, or you can use it to make a copy of the definition of the table. This refers to the original table’s storage for the initial data for a specific version.

Here is the syntax for the CREATE TABLE LIKE command:

CREATE TABLE [ IF NOT EXISTS ] table_name LIKE source_table_name [table_clauses]
table_clauses
   { USING data_source |
     LOCATION path |
     TBLPROPERTIES ( property_key [ = ] property_val [, ...] ) } [...]
property_key
  { identifier [. ...] | string_literal }

CREATE VIEW

With this command, you can construct a Virtual Table that has no physical data based on the result-set of a SQL query. This is unlike DROP VIEW and ALTER VIEW that can only change metadata.

Here is the syntax for this command:

CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name
    [ column_list ]
    [ COMMENT view_comment ]
    [ properties ]
    AS query
column_list
   ( { column_alias [ COMMENT column_comment ] } [, ...] )
properties
   TBLPROPERTIES ( { property_key = property_value } [, ...] )

CREATE FUNCTION

This command creates an SQL Scalar Function that can take on a set of arguments. It returns a Scalar Value. The function body for this command can be any valid SQL expression. Here is the syntax for this command:

CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS]
    function_name ( [function_parameter [, ... ] ] )
    RETURNS data_type
    [characteristic [...] ]
    RETURN { expression | query }
function_parameter
    param_name data_type [COMMENT parameter_comment]
characteristic
  { LANGUAGE SQL |
    [NOT] DETERMINISTIC |
    COMMENT function_comment |
    [CONTAINS SQL | READS SQL DATA] }

INSERT

This command inserts new rows into a table and sometimes truncates the partitions or table. You simply need to specify the inserted rows by the results of a query or by value expressions. Databricks SQL offers support for this command only for Delta Lake tables. Here is the syntax for this command:

INSERT { OVERWRITE | INTO } [ TABLE ] table_name [ PARTITION clause ]  [ ( column_name [, ...] ) ] query

SHOW DATABASES

This command can be used to list the databases that match an optionally supplied Regular Expression pattern. In case no pattern is supplied, the command will then list all the Databases in the system. The usage of DATABASES and SCHEMAS are interchangeable and mean the same thing. Here is the syntax for this command:

SHOW { DATABASES | SCHEMAS } [ LIKE regex_pattern ]

MERGE INTO

With this command, you can merge a set of insertions, updates, and deletions based on a source table into a target Delta Table. Similar to the INSERT command, this command is also only supported for Delta Lake tables. Here is the syntax for this command:

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   [ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
   [ WHEN NOT MATCHED [ AND condition ]  THEN not_matched_action ] [...]
matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column1 = value1 } [, ...] }
not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES (value1 [, ...])

DROP TABLE

You can use this command to delete the table and remove the directory associated with the table from the file system. This takes place if the table is not an EXTERNAL table. An exception gets thrown if the table doesn’t exist.

If you are dealing with an External Table, only the associated metadata information is removed from the Metastore Database. In case the table isn’t cached, the command uncaches the table and all of its dependents. Here is the syntax for this command:

DROP TABLE [ IF EXISTS ] table_name

DROP FUNCTION

With this command, you can easily drop a Permanent or Temporary User-Defined Function (UDF). Here is the syntax for this command:

DROP [ TEMPORARY ] FUNCTION [ IF EXISTS ] function_name

UPDATE

With this command, you can update the column values for the rows that match a predicate. If no predicate is provided, then all the column values of all rows get updated. This statement is also only supported for Delta Lake Tables. Here is the syntax for this command:

UPDATE table_name [table_alias]
   SET  { { column_name | field_name }  = expr } [, ...]
   [WHERE clause]

TRUNCATE

You can use the TRUNCATE command to remove all the rows from a partition or a table. The table should not be a view or a temporary or an external table. If you wish to truncate multiple partitions at the same time, you can specify the partitions in partition_spec. If no partition_spec is specified, all the partitions from the table get removed.

If the table is cached, the TRUNCATE command clears cached data of the table along with all the dependents that refer to it. The cache gets lazily fulfilled when the table or its dependents are accessed the next time around. Here is the syntax for this command:

TRUNCATE TABLE table_name [ PARTITION clause ]

RESTORE

This command can come in handy if you wish to restore a Delta Table to its earlier state. It restores the table to an earlier version number or timestamp. Here is the syntax for this command:

RESTORE [TABLE] table_name [TO] time_travel_version

CONVERT TO DELTA

This command can convert an existing Parquet Table to a Delta Table in-place. With this command, you can list all the files in the directory and create a Delta Lake transaction log that tracks these files. It can also automatically infer the Data Schema simply by reading the footers of all the Parquet files. The conversion process simply collects statistics to improve query performance on the converted Delta Table. If you give a table name, the metastore also gets updated to depict that the table is now a Delta table. Here is the syntax for this command:

CONVERT TO DELTA table_name [ NO STATISTICS ] [ PARTITIONED BY clause ]

VACUUM

You can use this command to clean up all the files that are associated with a table. You can recursively vacuum directories associated with the Delta Table and remove data files that are no longer in the latest state of the transaction log for the table. Here is the syntax for this command:

VACUUM table_name [RETAIN num HOURS] [DRY RUN]

ANALYZE TABLE

This statement can be leveraged to collect statistics about a specific table or all the tables in one specific database. These statistics can then be used by the query optimizer to find a better query execution plan. Like a lot of the commands mentioned above, this statement also only applies to Delta Lake tables. Here is the syntax for this command:

ANALYZE TABLE table_name [ PARTITION clause ]
COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col1 [, ...] | FOR ALL COLUMNS ]
ANALYZE TABLES [ { FROM | IN } database_name ] COMPUTE STATISTICS [ NOSCAN ]

How to Create a User-Defined Function (UDF)?

User-Defined Scalar Functions or UDFs are defined as user-programmable routines that can easily act on a single row. Here, you will be looking at the Classes that you will need for registering and seamlessly creating UDFs. You will also take a look at a helpful example that demonstrates how you can register and define UDFs and invoke them in Spark SQL.

You can use the following methods defined in this class, to define the properties of a User-Defined Function:

  • asNondeterministic(): UserDefinedFunction: This method can be used to update UserDefinedFunction to nondeterministic.
  • asNonNullable(): UserDefinedFunction: This method can be used to update UserDefinedFunction to non-nullable.
  • withName(name:String): UserDefinedFunction: This method updates the UserDefinedFunction with a given name.

Here is a Java code snippet to help you understand this better:

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.types.DataTypes;
SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL UDF scalar example")
      .getOrCreate();
// Define and register a zero-argument non-deterministic UDF
// UDF is deterministic by default, i.e. produces the same result for the same input.
UserDefinedFunction random = udf(
  () -> Math.random(), DataTypes.DoubleType
);
random.asNondeterministic();
spark.udf().register("random", random);
spark.sql("SELECT random()").show();
// +-------+
// |UDF()  |
// +-------+
// |xxxxxxx|
// +-------+
// Define and register a one-argument UDF
spark.udf().register("plusOne", new UDF1<Integer, Integer>() {
  @Override
  public Integer call(Integer x) {
    return x + 1;
  }
}, DataTypes.IntegerType);
spark.sql("SELECT plusOne(5)").show();
// +----------+
// |plusOne(5)|
// +----------+
// |         6|
// +----------+
// Define and register a two-argument UDF
UserDefinedFunction strLen = udf(
  (String s, Integer x) -> s.length() + x, DataTypes.IntegerType
);
spark.udf().register("strLen", strLen);
spark.sql("SELECT strLen('test', 1)").show();
// +------------+
// |UDF(test, 1)|
// +------------+
// |           5|
// +------------+
// UDF in a WHERE clause
spark.udf().register("oneArgFilter", new UDF1<Long, Boolean>() {
  @Override
  public Boolean call(Long x) {
    return  x > 5;
  }
}, DataTypes.BooleanType);
spark.range(1, 10).createOrReplaceTempView("test");
spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show();
// +---+
// | id|
// +---+
// |  6|
// |  7|
// |  8|
// |  9|
// +---+

How to Create an Aggregate User-Defined Function (UDF)?

User-Defined Aggregate Functions are defined as user-programmable routines that can act on multiple rows at once. It can then return a single aggregated value as a result. Here, you will be looking at the Classes that are needed for registering and creating UDAFs. You will also be looking at an example of how you can register and define UDAFs in Scala. You can then invoke the UDAFs in Spark SQL.

Aggregator is a base class for user-defined aggregations, which can then be leveraged in Dataset operations to take all of the elements of a group and reduce them to a single value. Here are a few handy components:

  • BUF: This defines the type of the intermediate value of the reduction.
  • IN: This is defined as the input type for the aggregation.
  • OUT: This defines the type of the final output result.
  • finish (reduction:BUF): OUT: This can be used to transform the output of the reduction.
  • bufferEncoder: Encoder[BUF]: This is the encoder for the intermediate value type.
  • outputEncoder: Encoder[OUT]: This is the encoder for the final output value type.
  • merge(b1:BUF, b2: BUF): BUF: This can be used to merge two intermediate values.
  • zero: BUF: This is the initial value of the intermediate result of this aggregation.
  • reduce(b:BUF, a:IN): BUF: This is the aggregate input value ‘a’ into the current intermediate value. For performance, the function may modify ‘b’ and return it instead of constructing a new object for ‘b’.

There are two types of User-Defined Aggregate Functions: Type-safe and Untyped. Type-safe User-Defined Aggregations are used for strongly typed Datasets that revolve around the Aggregator abstract class. Typed aggregations can be registered as untyped aggregating UDFs for use with DataFrames. For instance, a user-defined average for untyped DataFrames can be represented through the code snippet as follows:

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Long, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, data: Long): Average = {
    buffer.sum += data
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // The Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // The Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
// Register the function to access it
spark.udf.register("myAverage", functions.udaf(MyAverage))
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Conclusion

This blog talks in detail about the different Databricks SQL commands you can leverage to improve the efficiency of your Data Pipeline. It also gives a brief introduction to Databricks before diving into Databricks SQL functions.

Extracting complex data from a diverse set of data sources can be challenging, and this is where Hevo saves the day! Hevo is fully automated and hence does not require you to code.

mm
Content Marketing Manager, Hevo Data

Amit is a Content Marketing Manager at Hevo Data. He is passionate about writing for SaaS products and modern data platforms. His portfolio of more than 200 articles shows his extraordinary talent for crafting engaging content that clearly conveys the advantages and complexity of cutting-edge data technologies. Amit’s extensive knowledge of the SaaS market and modern data solutions enables him to write insightful and informative pieces that engage and educate audiences, making him a thought leader in the sector.

No-Code Data Pipeline for Databricks