Snowflake has a very elastic infrastructure and its Compute and Storage resources scale well to cater to your changing storage needs. Apache Spark provides a suite of high-level APIs for application development, including Java, Scala, Python, and R. It is a framework for executing Spark applications seamlessly.

Spark makes use of Hadoop in two ways: one for Storage and the other for Process Management. As Spark has its own Cluster Management, it makes use of Hadoop for storage. This article will teach you about Snowflake, Apache Spark, and how to link Snowflake Spark using a connector to read Snowflake tables into Spark DataFrame and write DataFrame into Snowflake tables using Scala codes.

What is Snowflake Spark Connector?

The “Spark-Snowflake” is a Snowflake Spark Connector that allows Apache Spark to read and write data to Snowflake Databases. When you establish a connection, Spark treats Snowflake as if it were any other data source, such as HDFS, S3, JDBC, and so on. In actuality, the data source “net.snowflake.spark.snowflake” and its short-form “Snowflake” are provided by Snowflake Spark Connector.

Since each version of Spark has its unique Snowflake Spark Connector, make sure you download and use the correct version for your Spark instance. The Snowflake Spark Connector communicates with Snowflake via the JDBC driver and performs the following actions.

  • By reading a table from Snowflake, you may create a Spark DataFrame.
  • Create a Snowflake table from a Spark DataFrame.

The data is transferred between Spark RDD/DataFrame/Dataset and Snowflake via internal storage (generated automatically) or external storage (provided by the user), which is used by the Snowflake Spark connector to store temporary session data.

When you access Snowflake from Spark, it performs the following actions:

  • A stage is used to create the session, as well as storage on the Snowflake schema.
  • It keeps the stage in place for the duration of the session.
  • The stage is used to hold intermediate data and is then dropped when the connection is terminated.
Simplify ETL and Data Integration using Hevo’s No-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

Get Started with Hevo for free

What is Maven Dependency?

The Maven Dependency automatically downloads the Snowflake 1.1 dependent library and includes the relevant jar files in the project.

The code in this section should be included in the Maven configuration file pom.xml under <dependencies>……/dependencies> tag.

<dependencies>
  ...
  <dependency>
    <groupId>net.snowflake</groupId>
    <artifactId>snowflake-jdbc</artifactId>
    <version>3.13.7</version>
  </dependency>
  ...
</dependencies>

The <version> tag indicates which version of the driver you want to use. Version 3.13.7 is used in this example solely for demonstration purposes. It’s possible that the most recent version of the driver is higher.

How to Create a Snowflake Table Using Spark?

You can’t use the default database that comes with your Snowflake account when working with Spark as the Snowflake Spark Connector requires you to establish a stage on the schema. You can’t modify the default schema after Snowflake Spark data transfer, therefore you’ll have to construct a new database and table.

To create a database, log in to the Snowflake Web Console, navigate to the Databases Menu, pick “Create a new database,” then fill out the form with the database name and click on the “Finish” button.

You can construct a table using either the Snowflake Online Console or the Snowflake Spark dedicated code mentioned below:

val properties = new java.util.Properties()
  properties.put("user", "user")
  properties.put("password", "#########")
  properties.put("account", "oea82")
  properties.put("warehouse", "mywh")
  properties.put("db", "EMP")
  properties.put("schema", "public")
  properties.put("role","ACCOUNTADMIN")

  //JDBC connection string
  val jdbcUrl = "jdbc:snowflake://oea82.us-east-1.snowflakecomputing.com/"
  val connection = DriverManager.getConnection(jdbcUrl, properties)
  val statement = connection.createStatement
  statement.executeUpdate("create or replace table EMPLOYEE(name VARCHAR, department VARCHAR, salary number)")
  statement.close
  connection.close()

The GitHub Snowflake Spark program repository is available here. Check out Hevo’s connector for Snowflake here.

What are the 7 Snowflake Spark Integration Parameters?

You must use the following Snowflake Spark arguments to read/write:

  • sfURL: Your account’s URL, such as https://oea82.us-east-1.snowflakecomputing.com/. 
  • sfAccount : Your account’s username and password. You can retrieve your account name from the URL, for example, “oea82”.
  • sfUser : Snowflake user name, which is usually the same as your login user.
  • sfPassword : User Password.
  • sfWarehouse : Name of the Snowflake Data Warehouse.
  • sfDatabase : Name of the Snowflake Database.
  • sfSchema : The Database schema to which your table belongs.

How to Write Spark DataFrame into Snowflake Tables?

  • You can write a Spark DataFrame to a Snowflake table by utilizing the DataFrame’s write() method (which returns a DataFrameWriter object) with the values below.
  • Use format() method to give either snowflake or net.snowflake.spark.snowflake as the data source name.
  • To specify connection options like URL, account, username, password, database name, schema, role, and more, use the Option() method.
  • To specify the Snowflake table name that you want to write to, use the dbtable option.
  • If the file is already existing, use mode() to specify whether you want to overwrite, append, or ignore it.

Here is a sample Snowflake Spark Connector code in Scala:

package com.sparkbyexamples.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
object WriteEmpDataFrameToSnowflake extends App {
  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._
  val simpleData = Seq(("James","Sales",3000),
    ("Michael","Sales",4600),
    ("Robert","Sales",4100),
    ("Maria","Finance",3000),
    ("Raman","Finance",3000),
    ("Scott","Finance",3300),
    ("Jen","Finance",3900),
    ("Jeff","Marketing",3000),
    ("Kumar","Marketing",2000)
  )
  val df = simpleData.toDF("name","department","salary")
  df.show()
  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "####################",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )
  df.write
    .format("snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .mode(SaveMode.Overwrite)
    .save()

}

How to Read Snowflake Tables into Spark DataFrames?

Using the read() method of the SparkSession (which is a DataFrameReader object) and providing the data source name via read(), connection settings, and table name via dbtable.

Here are two samples of Snowflake Spark Connector code in Scala:

The Snowflake Spark example below utilizes the dbtable option to read the whole Snowflake table and create a Spark DataFrame,

package com.sparkbyexamples.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadEmpFromSnowflake extends App{
  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();
  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "#############",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )
  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake") // or just use "snowflake"
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()
  df.show(false)
}

Output:

+-------+----------+------+
|NAME   |DEPARTMENT|SALARY|
+-------+----------+------+
|James  |Sales     |3000  |
|Michael|Sales     |4600  |
|Robert |Sales     |4100  |
|Maria  |Finance   |3000  |
|Raman  |Finance   |3000  |
|Scott  |Finance   |3300  |
|Jen    |Finance   |3900  |
|Jeff   |Marketing |3000  |
|Kumar  |Marketing |2000  |
+-------+----------+------+

The example below uses the query option to conduct a group by aggregate SQL query.

val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()
df1.show(false)

Output: 

+----------+------------+
|DEPARTMENT|TOTAL_SALARY|
+----------+------------+
|Sales     |11700       |
|Finance   |13200       |
|Marketing |5000        |
+----------+------------+

The Spark Connector can convert between a variety of data types.

From Spark SQL to Snowflake

Spark Data TypeSnowflake Data Type
ArrayTypeVARIANT
BinaryTypeNot supported
BooleanTypeBOOLEAN
ByteTypeINTEGER. Snowflake does not support the BYTE type.
DateTypeDATE
DecimalTypeDECIMAL
DoubleTypeDOUBLE
FloatTypeFLOAT
IntegerTypeINTEGER
LongTypeINTEGER
MapTypeVARIANT
ShortTypeINTEGER
StringTypeIf length is specified, VARCHAR(N); otherwise, VARCHAR
StructTypeVARIANT
TimestampTypeTIMESTAMP

From Snowflake to Spark SQL

Snowflake Data TypeSpark Data Type
ARRAYStringType
BIGINTDecimalType(38, 0)
BINARYNot supported
BLOBNot supported
BOOLEANBooleanType
CHARStringType
CLOBStringType
DATEDateType
DECIMALDecimalType
DOUBLEDoubleType
FLOATDoubleType
INTEGERDecimalType(38, 0)
OBJECTStringType
TIMESTAMPTimestampType
TIMEStringType (Spark Connector Version 2.4.14 or later)
VARIANTStringType

How to Achieve Column Mapping?

When the names of your columns differ between the Spark DataFrame schema and the Snowflake table, utilize the <strong>columnmap</strong> options with a single string literal as a parameter.

Here is a sample Snowflake Spark Column Mapping code in Scala:

.option("columnmap", "Map(col_2 -> col_b, col_3 -> col_a)")

What are the Saving Modes?

  • DataFrameWriter: The method mode() of the Spark DataFrameWriter specifies the SaveMode; the parameter to this method is either the string below or a constant from the SaveMode class.
  • Overwrite: The Overwrite mode is used to overwrite an existing file; SaveMode.Overwrite can be used as an alternative.
  • Append: You can also use SaveMode.Append to add data to an existing file.
  • Ignore: When the file already exists, you can use SaveMode.Ignore instead of writing. Ignore.
  • errorifexists or error: When a file already exists, this option returns an error; alternatively, SaveMode.ErrorIfExists can be used.

Limitations of using the manual method for Snowflake Spark Integration:

  • Time Consuming: Using the Manual method to integrate Snowflake and Spark is tedious and time-consuming. It requires a lot of time to write code, debug it and customize it. 
  • Error Prone: Manually written codes are prone to errors and are not secure. It is difficult to control access and apply data regulations in manual codes. 
  • Scalability: Custom codes have limited capacity to handle large volumes of data. 

Use Cases of Snowflake Spark Integration

  • Faster Data Exchange: For data exchange that takes longer than 36 hours, you can prepare external locations for the files by Snowflake Spark integration. You can also Prepare Azure Blob Storage External Container and AWS External S3 Bucket for the exchange of data through Snowflake Spark. 
  • Complex ETL: Snowflake Spark connector options provide ease of doing complex ETL as Snowflake has various connectors to different data sources. It helps in data extraction, which is the first step of complex ETL. 
  • Machine Learning: You can easily come across a Spark Snowflake Connector example that enables Snowflake to allow Machine Learning in Spark. It expands Spark’s capacity to process large amounts of data for algorithm training and testing.  

Before we wrap up, let’s cover some basics.

What is Snowflake Database?

Snowflake Database is a Software-as-a-Service data Storage and Analytics Data Ware company that is entirely Cloud-based (SaaS). Snowflake Database is a totally new SQL Database Engine that is designed to interact with Cloud infrastructure.

You don’t need to download and install the Database to use it; instead, you can simply establish an account online, which grants you access to the Web Dashboard, where you can construct the Database, Schema, and Tables. Web Console, ODBC, and JDBC Drivers, as well as Third-Party connectors, can all be used to access the database and tables.

Since the fundamental architecture is new, but the ANSI SQL syntax and functionality are the same, learning Snowflake is simple and quick if you have a background in SQL.

Snowflake provides a modern data architecture, having a host of innovative features and functionalities, which are discussed as follows:

  • Cloud Power Agnostic
  • Scalability
  • Concurrency & Workload Separability
  • Near-Zero Administration
  • Security

What is Apache Spark?

The Apache Software Foundation released Spark to speed up the Hadoop computational computing software process. Apache Spark is an Open-Source, Scalable, and Distributed General-Purpose Computing Engine that is used to process and analyze large data files from many sources such as HDFS, S3, Azure, and others.

Iterative algorithms, which visit their data set several times in a loop, and interactive/exploratory Data Analysis, i.e., repetitive database-style querying of data, are made easier using Spark. When compared to Apache Hadoop MapReduce, the latency of such applications could be reduced by several orders of magnitude. The training algorithms for Machine Learning systems, which provided the initial impetus for the development of Apache Spark, are an example of iterative algorithms.

The features that make Apache Spark one of the most widely used Big Data platforms are:

  • Lighting-fast processing speed.
  • Ease of use.
  • It offers support for sophisticated analytics.
  • Real-time stream processing is flexible.
  • Active and expanding community.
  • Spark for Machine Learning.
  • Spark for Fog Computing.

Conclusion

As organizations expand their businesses, managing large data becomes crucial for achieving the desired efficiency. Snowflake Spark Integration powers stakeholders and management to collaborate on their workflow and build a quality product meeting the requirements with ease. This article also provides information on the use cases of the Snowflake Spark connector example. In case you want to export data from a source of your choice into your desired Database/destination like Snowflake, then Hevo Data is the right choice for you! 

Visit our Website to Explore Hevo

Hevo Data provides its users with a simpler platform for integrating data from 150+ sources for Analysis. It is a No-code Data Pipeline that can help you combine data from multiple sources. You can use it to transfer data from multiple data sources into your Data Warehouses such as Snowflake, Database, or a destination of your choice. It provides you with a consistent and reliable solution to managing data in real-time, ensuring that you always have Analysis-ready data in your desired destination.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at our unbeatable pricing that will help you choose the right plan for your business needs!

Share your experience of learning about Snowflake Spark Integration! Let us know in the comments section below!

mm
Former Research Analyst, Hevo Data

Harsh comes with experience in performing research analysis who has a passion for data, software architecture, and writing technical content. He has written more than 100 articles on data integration and infrastructure.

No-code Data Pipeline for Snowflake