Snowflake is a Software-as-a-Service (SaaS) platform that helps businesses to create Data Warehouses. It provides its users with an option for storing their data in the Cloud. Snowflake has a very elastic infrastructure and its Compute and Storage resources scale well to cater to your changing storage needs. 

Apache Spark is a Cluster Computing system that runs at breakneck speed. It provides a suite of high-level APIs for application development, including Java, Scala, Python, and R. Apache Spark is a framework for executing Spark applications seamlessly. Spark makes use of Hadoop in two ways: one for Storage and the other for Process Management. As Spark has its own Cluster Management, it makes use of Hadoop for storage.

This article will teach you about Snowflake, Apache Spark, and how to link Snowflake Spark using a connector to read Snowflake tables into Spark DataFrame and write DataFrame into Snowflake tables using Scala codes.

What is Snowflake Database?

Snowflake Database is a Software-as-a-Service data Storage and Analytics Data Ware company that is entirely Cloud-based (SaaS). Snowflake Database is a totally new SQL Database Engine that is designed to interact with Cloud infrastructure.

You don’t need to download and install the Database to use it; instead, you can simply establish an account online, which grants you access to the Web Dashboard, where you can construct the Database, Schema, and Tables. Web Console, ODBC, and JDBC Drivers, as well as Third-Party connectors, can all be used to access the database and tables.

Since the fundamental architecture is new, but the ANSI SQL syntax and functionality are the same, learning Snowflake is simple and quick if you have a background in SQL.

Snowflake provides a modern data architecture, having a host of innovative features and functionalities, which are discussed as follows:

  • Cloud Power Agnostic
  • Scalability
  • Concurrency & Workload Separability
  • Near-Zero Administration
  • Security

What is Apache Spark?

The Apache Software Foundation released Spark to speed up the Hadoop computational computing software process. Apache Spark is an Open-Source, Scalable, and Distributed General-Purpose Computing Engine that is used to process and analyze large data files from many sources such as HDFS, S3, Azure, and others.

Iterative algorithms, which visit their data set several times in a loop, and interactive/exploratory Data Analysis, i.e., repetitive database-style querying of data, are made easier using Spark. When compared to Apache Hadoop MapReduce, the latency of such applications could be reduced by several orders of magnitude. The training algorithms for Machine Learning systems, which provided the initial impetus for the development of Apache Spark, are an example of iterative algorithms.

The features that make Apache Spark one of the most widely used Big Data platforms are:

  • Lighting-fast processing speed.
  • Ease of use.
  • It offers support for sophisticated analytics.
  • Real-time stream processing is flexible.
  • Active and expanding community.
  • Spark for Machine Learning.
  • Spark for Fog Computing.
Simplify ETL and Data Integration using Hevo’s No-code Data Pipeline

A fully managed No-code Data Pipeline platform like Hevo Data helps you integrate data from 150+ Data Sources (including 40+ Free Data Sources) and will let you directly load data to a Data Warehouse or the destination of your choice like Snowflake. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data.

Get Started with Hevo for free

Let’s look at some of the salient features of Hevo:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100’s of sources that can help you scale your data infrastructure as required.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day Free Trial!

What is Snowflake Spark Connector?

The “Spark-Snowflake” is a Snowflake Spark Connector that allows Apache Spark to read and write data to Snowflake Databases. When you establish a connection, Spark treats Snowflake as if it were any other data source, such as HDFS, S3, JDBC, and so on. In actuality, the data source “net.snowflake.spark.snowflake” and its short-form “Snowflake” are provided by Snowflake Spark Connector.

Since each version of Spark has its unique Snowflake Spark Connector, make sure you download and use the correct version for your Spark instance. The Snowflake Spark Connector communicates with Snowflake via the JDBC driver and performs the following actions.

  • By reading a table from Snowflake, you may create a Spark DataFrame.
  • Create a Snowflake table from a Spark DataFrame.

The data is transferred between Spark RDD/DataFrame/Dataset and Snowflake via internal storage (generated automatically) or external storage (provided by the user), which is used by the Snowflake Spark connector to store temporary session data.

When you access Snowflake from Spark, it performs the following actions:

  • A stage is used to create the session, as well as storage on the Snowflake schema.
  • It keeps the stage in place for the duration of the session.
  • The stage is used to hold intermediate data and is then dropped when the connection is terminated.

What is Maven Dependency?

The Maven Dependency automatically downloads the Snowflake 1.1 dependent library and includes the relevant jar files in the project.

The code in this section should be included in the Maven configuration file pom.xml under <dependencies>……/dependencies> tag.

<dependencies>
  ...
  <dependency>
    <groupId>net.snowflake</groupId>
    <artifactId>snowflake-jdbc</artifactId>
    <version>3.13.7</version>
  </dependency>
  ...
</dependencies>

The <version> tag indicates which version of the driver you want to use. Version 3.13.7 is used in this example solely for demonstration purposes. It’s possible that the most recent version of the driver is higher.

How to Create a Snowflake Table Using Spark?

You can’t use the default database that comes with your Snowflake account when working with Spark as the Snowflake Spark Connector requires you to establish a stage on the schema. You can’t modify the default schema after Snowflake Spark data transfer, therefore you’ll have to construct a new database and table.

To create a database, log in to the Snowflake Web Console, navigate to the Databases Menu, pick “Create a new database,” then fill out the form with the database name and click on the “Finish” button.

You can construct a table using either the Snowflake Online Console or the Snowflake Spark dedicated code mentioned below:

val properties = new java.util.Properties()
  properties.put("user", "user")
  properties.put("password", "#########")
  properties.put("account", "oea82")
  properties.put("warehouse", "mywh")
  properties.put("db", "EMP")
  properties.put("schema", "public")
  properties.put("role","ACCOUNTADMIN")

  //JDBC connection string
  val jdbcUrl = "jdbc:snowflake://oea82.us-east-1.snowflakecomputing.com/"
  val connection = DriverManager.getConnection(jdbcUrl, properties)
  val statement = connection.createStatement
  statement.executeUpdate("create or replace table EMPLOYEE(name VARCHAR, department VARCHAR, salary number)")
  statement.close
  connection.close()

The GitHub Snowflake Spark program repository is available here.

What are the 7 Snowflake Spark Integration Parameters?

You must use the following Snowflake Spark arguments to read/write:

  • sfURL: Your account’s URL, such as https://oea82.us-east-1.snowflakecomputing.com/. 
  • sfAccount : Your account’s username and password. You can retrieve your account name from the URL, for example, “oea82”.
  • sfUser : Snowflake user name, which is usually the same as your login user.
  • sfPassword : User Password.
  • sfWarehouse : Name of the Snowflake Data Warehouse.
  • sfDatabase : Name of the Snowflake Database.
  • sfSchema : The Database schema to which your table belongs.

How to Write Spark DataFrame into Snowflake Tables?

  • You can write a Spark DataFrame to a Snowflake table by utilizing the DataFrame’s write() method (which returns a DataFrameWriter object) with the values below.
  • Use format() method to give either snowflake or net.snowflake.spark.snowflake as the data source name.
  • To specify connection options like URL, account, username, password, database name, schema, role, and more, use the Option() method.
  • To specify the Snowflake table name that you want to write to, use the dbtable option.
  • If the file is already existing, use mode() to specify whether you want to overwrite, append, or ignore it.

Here is a sample Snowflake Spark Connector code in Scala:

package com.sparkbyexamples.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
object WriteEmpDataFrameToSnowflake extends App {
  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._
  val simpleData = Seq(("James","Sales",3000),
    ("Michael","Sales",4600),
    ("Robert","Sales",4100),
    ("Maria","Finance",3000),
    ("Raman","Finance",3000),
    ("Scott","Finance",3300),
    ("Jen","Finance",3900),
    ("Jeff","Marketing",3000),
    ("Kumar","Marketing",2000)
  )
  val df = simpleData.toDF("name","department","salary")
  df.show()
  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "####################",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )
  df.write
    .format("snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .mode(SaveMode.Overwrite)
    .save()

}

How to Read Snowflake Tables into Spark DataFrames?

Using the read() method of the SparkSession (which is a DataFrameReader object) and providing the data source name via read(), connection settings, and table name via dbtable.

Here are two samples of Snowflake Spark Connector code in Scala:

The Snowflake Spark example below utilizes the dbtable option to read the whole Snowflake table and create a Spark DataFrame,

package com.sparkbyexamples.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadEmpFromSnowflake extends App{
  val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate();
  var sfOptions = Map(
    "sfURL" -> "https://oea82.us-east-1.snowflakecomputing.com/",
    "sfAccount" -> "oea82",
    "sfUser" -> "user",
    "sfPassword" -> "#############",
    "sfDatabase" -> "EMP",
    "sfSchema" -> "PUBLIC",
    "sfRole" -> "ACCOUNTADMIN"
  )
  val df: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake") // or just use "snowflake"
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .load()
  df.show(false)
}

Output:

+-------+----------+------+
|NAME   |DEPARTMENT|SALARY|
+-------+----------+------+
|James  |Sales     |3000  |
|Michael|Sales     |4600  |
|Robert |Sales     |4100  |
|Maria  |Finance   |3000  |
|Raman  |Finance   |3000  |
|Scott  |Finance   |3300  |
|Jen    |Finance   |3900  |
|Jeff   |Marketing |3000  |
|Kumar  |Marketing |2000  |
+-------+----------+------+

The example below uses the query option to conduct a group by aggregate SQL query.

val df1: DataFrame = spark.read
    .format("net.snowflake.spark.snowflake")
    .options(sfOptions)
    .option("query", "select department, sum(salary) as total_salary from EMPLOYEE group by department")
    .load()
df1.show(false)

Output: 

+----------+------------+
|DEPARTMENT|TOTAL_SALARY|
+----------+------------+
|Sales     |11700       |
|Finance   |13200       |
|Marketing |5000        |
+----------+------------+

The Spark Connector can convert between a variety of data types.

From Spark SQL to Snowflake

Spark Data TypeSnowflake Data Type
ArrayTypeVARIANT
BinaryTypeNot supported
BooleanTypeBOOLEAN
ByteTypeINTEGER. Snowflake does not support the BYTE type.
DateTypeDATE
DecimalTypeDECIMAL
DoubleTypeDOUBLE
FloatTypeFLOAT
IntegerTypeINTEGER
LongTypeINTEGER
MapTypeVARIANT
ShortTypeINTEGER
StringTypeIf length is specified, VARCHAR(N); otherwise, VARCHAR
StructTypeVARIANT
TimestampTypeTIMESTAMP

From Snowflake to Spark SQL

Snowflake Data TypeSpark Data Type
ARRAYStringType
BIGINTDecimalType(38, 0)
BINARYNot supported
BLOBNot supported
BOOLEANBooleanType
CHARStringType
CLOBStringType
DATEDateType
DECIMALDecimalType
DOUBLEDoubleType
FLOATDoubleType
INTEGERDecimalType(38, 0)
OBJECTStringType
TIMESTAMPTimestampType
TIMEStringType (Spark Connector Version 2.4.14 or later)
VARIANTStringType

How to Achieve Column Mapping?

When the names of your columns differ between the Spark DataFrame schema and the Snowflake table, utilize the <strong>columnmap</strong> options with a single string literal as a parameter.

Here is a sample Snowflake Spark Column Mapping code in Scala:

.option("columnmap", "Map(col_2 -> col_b, col_3 -> col_a)")

What are the Saving Modes?

  • DataFrameWriter: The method mode() of the Spark DataFrameWriter specifies the SaveMode; the parameter to this method is either the string below or a constant from the SaveMode class.
  • Overwrite: The Overwrite mode is used to overwrite an existing file; SaveMode.Overwrite can be used as an alternative.
  • Append: You can also use SaveMode.Append to add data to an existing file.
  • Ignore: When the file already exists, you can use SaveMode.Ignore instead of writing. Ignore.
  • errorifexists or error: When a file already exists, this option returns an error; alternatively, SaveMode.ErrorIfExists can be used.

Conclusion

As organizations expand their businesses, managing large data becomes crucial for achieving the desired efficiency. Snowflake Spark Integration powers stakeholders and management to collaborate on their workflow and build a quality product meeting the requirements with ease. In case you want to export data from a source of your choice into your desired Database/destination like Snowflake, then Hevo Data is the right choice for you! 

Visit our Website to Explore Hevo

Hevo Data provides its users with a simpler platform for integrating data from 100+ sources for Analysis. It is a No-code Data Pipeline that can help you combine data from multiple sources. You can use it to transfer data from multiple data sources into your Data Warehouses such as Snowflake, Database, or a destination of your choice. It provides you with a consistent and reliable solution to managing data in real-time, ensuring that you always have Analysis-ready data in your desired destination.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at our unbeatable pricing that will help you choose the right plan for your business needs!

Share your experience of learning about Snowflake Spark Integration! Let us know in the comments section below!

No-code Data Pipeline for Snowflake