Press "Enter" to skip to content

Salesforce to Snowflake: Steps to Migrate Data

salesforce to snowflake

Salesforce is an important CRM system and it acts as one of the basic source systems to integrate while building a data warehouse or a system for analytics. Snowflake is a Software as a Service (SaaS) which provides data warehouse on cloud ready to use and has enough connectivity options to connect any reporting suite using JDBC or provided libraries.

This article uses APIs, UNIX commands or tools and Snowflake’s web client that will be used to set-up this data ingestion from Salesforce to Snowflake. It also focuses on high volume data and performance and these steps can be used to load millions of records from Salesforce to Snowflake.

Salesforce and Snowflake – Overview

Salesforce is a leading cloud-based CRM platform. As a Platform as a Service (Paas), Salesforce is known for their CRM applications for sales, marketing, service, community, analytics etc. It also is highly scalable and flexible. As Salesforce contains CRM data including sales, it is one of the important sources for data ingestion into analytical tools or databases like Snowflake.

Snowflake is a fully relational ANSI SQL data warehouse provided as a Software-as-a-Service (SaaS). It provides data warehouse on cloud ready to use, with zero management or administration. It uses cloud-based persistent storage and virtual compute instances for computation purposes.

Key features of Snowflake include Time Travel, Fail-Safe, Web-based GUI client for administration and querying, SnowSQL and extensive set of connectors or drivers for major programming languages.

How to move data from Salesforce to Snowflake?

There are two methods that can help you migrate data:

Method 1: A ready to use Official Snowflake ETL Partner like Hevo (7 Days Free Trial).

Method 2: Build Custom ETL Scripts to move data from Salesforce to Snowflake.

For the scope of this article, let us see how to replicate data from Salesforce to Snowflake through custom code.

Towards the end of the article, you can also avail a brief on the easier alternatives to this approach.

Salesforce DATA API’s

As, we will be loading data from Salesforce to Snowflake, extracting data out from Salesforce is the initial step. Salesforce provides various general purpose APIs that can be used to access Salesforce data, general-purpose APIs provided by Salesforce:

  1. REST API
  2. SOAP API
  3. Bulk API
  4. Streaming API

Along with these Salesforce provides various other specific purpose APIs such as Apex API, Chatter API, Metadata API etc. which are beyond the scope of this post.

The following section gives a high-level overview of general purpose APIs:

API Protocol Formats Supported Request Type
REST API REST JSON, XML Synchronous
SOAP API SOAP XML Synchronous
Bulk API REST CSV, JSON, XML Asynchronous

Synchronous API: Synchronous request blocks the application/client until the operation is completed and a response is received.

Asynchronous API: An asynchronous API request doesn’t block the application/client making the request. In Salesforce this API type can be used to process/query a large amount of data, as Salesforce processes the batches/jobs at the background in asynchronous calls.

Understanding the difference between Salesforce APIs is important, as depending on the use case we can choose the best of the available options.

APIs will be enabled by default for the Salesforce enterprise edition, if not we can create a developer account and get the token required to access API. In this post, we will be using Bulk API to access the data.

Read the introduction of Bulk API. If you’re new to Salesforce, read the steps to set up the Bulk API required for next steps.

Advantages of using Bulk API:

  1. REST-based Asynchronous API.
  2. Can be used to query a large number of records from a few thousand to millions in the background.
  3. Supports all three available data file formats in Salesforce i.e. CSV, JSON, and XML.
  4. Supports parallel execution of batches.

Required for next steps:

  1. Bulk API access (Salesforce Developer access).
  2. cURL command line tool. (Available in most UNIX systems)
  3. Xmllint command tool. (Available in most UNIX systems)
  4. Snowflake Web Client access, to run queries.
  5. Unix System.

A detailed guide to using Bulk API for moving data from Salesforce to Snowflake

The process flow for querying salesforce data using Bulk API

The steps are given below, each one of them explained in detail to get data from Salesforce using Bulk API on a Unix based machine.

Login to Salesforce API

Bulk API uses SOAP API for login as Bulk API doesn’t provide login operation.

Save below XML as login.xml, replace username and password with your respective salesforce account username and password, which will be a concatenation of account password and access token.

<?xml version="1.0" encoding="utf-8" ?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
  <env:Body>
    <n1:login xmlns:n1="urn:partner.soap.sforce.com">
      <n1:username>username</n1:username>
      <n1:password>password</n1:password>
    </n1:login>
  </env:Body>
</env:Envelope>

Using a Terminal execute the following command:

curl https://login.salesforce.com/services/Soap/u/41.0 -H "Content-Type: text/xml; 
charset=UTF-8" -H "SOAPAction: login" -d @login.xml > login_response.xml

Above command if executed successfully will return an XML loginResponse with <sessionId> and <serverUrl> which will be used in subsequent API calls to download data.

login_response.xml will look as shown below:

<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" 
xmlns="urn:partner.soap.sforce.com" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 <soapenv:Body>
   <loginResponse>
    <result>

<metadataServerUrl>https://organization.my.salesforce.com/services/Soap/m/41.0/00AB0000000AB</metadataServerUrl>
 <passwordExpired>false</passwordExpired>
 <sandbox>false</sandbox>
 <serverUrl>https://organization.my.salesforce.com/services/Soap/u/41.0/00AB0000000ABc1</serverUrl>
 <sessionId>00Dj00001234ABCD5!AQcAQBgaabcded12XS7C6i3FNE0TMf6EBwOasndsT4O</sessionId>
 <userId>0010a00000ABCDefgh</userId>
 <userInfo>
  <currencySymbol>$</currencySymbol>
  <organizationId>00XYZABCDEF123</organizationId>
  <organizationName>ABCDEFGH</organizationName>
  <sessionSecondsValid>43200</sessionSecondsValid>
  <userDefaultCurrencyIsoCode xsi:nil="true"/>
  <userEmail>user@organization</userEmail>
  <userFullName>USERNAME</userFullName>
  <userLanguage>en_US</userLanguage>
  <userName>user@organization</userName>
  <userTimeZone>America/Los_Angeles</userTimeZone>
   </userInfo>
   </result>
   </loginResponse>
 </soapenv:Body>
</soapenv:Envelope>

Using the above XML, we need to initialize three variables: serverUrl, sessionId, and instance. First two variables are available in the response XML, the instance is the first part of the hostname in serverUrl.

The shell script snippet given below can extract these three variables from the login_response.xml file:

sessionId=$(xmllint --xpath 
"/*[name()='soapenv:Envelope']/*[name()='soapenv:Body']/*[name()='loginResponse']/*
[name()='result']/*[name()='sessionId']/text()" login_response.xml) 

serverUrl=$(xmllint --xpath 
"/*[name()='soapenv:Envelope']/*[name()='soapenv:Body']/*[name()='loginResponse']/*
[name()='result']/*[name()='serverUrl']/text()" login_response.xml)

instance=$(echo ${serverUrl/\.salesforce.com*/} | sed 's|https://||') 
sessionId = 00Dj00001234ABCD5!AQcAQBgaabcded12XS7C6i3FNE0TMf6EBwOasndsT4O
serverUrl = https://organization.my.salesforce.com/services/Soap/u/41.0/00AB0000000ABc1
instance =  organization

Create a Job

Save the given below XML as job_account.xml. The XML given below is used to download Account object data from Salesforce in JSON format. Edit the bold text to download different objects or to change content type as per the requirement i.e. to CSV or XML. We are using JSON here.

job_account.xml:

<?xml version="1.0" encoding="UTF-8"?>
 <jobInfo
     xmlns="http://www.force.com/2009/06/asyncapi/dataload">
   <operation>query</operation>
   <object>Account</object>
   <concurrencyMode>Parallel</concurrencyMode>
   <contentType>JSON</contentType>
 </jobInfo>

Execute the command given below to create the job and get the response, from the XML response received (account_jobresponse.xml), we will extract the jobId variable.

curl -s -H "X-SFDC-Session: ${sessionId}" -H "Content-Type: application/xml; charset=UTF-8" -d 
@job_account.xml https://${instance}.salesforce.com/services/async/41.0/job > 
account_job_response.xml

jobId = $(xmllint --xpath "/*[name()='jobInfo']/*[name()='id']/text()" account_job_response.xml)

account_job_response.xml:

<?xml version="1.0" encoding="UTF-8"?>
<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">
  <id>1200a000001aABCD1</id>
  <operation>query</operation>
  <object>Account</object>
  <createdById>00580000003KrL0AAK</createdById>
  <createdDate>2018-05-22T06:09:45.000Z</createdDate>
  <systemModstamp>2018-05-22T06:09:45.000Z</systemModstamp>
  <state>Open</state>
  <concurrencyMode>Parallel</concurrencyMode>
  <contentType>JSON</contentType>
  <numberBatchesQueued>0</numberBatchesQueued>
  <numberBatchesInProgress>0</numberBatchesInProgress>
  <numberBatchesCompleted>0</numberBatchesCompleted>
  <numberBatchesFailed>0</numberBatchesFailed>
  <numberBatchesTotal>0</numberBatchesTotal>
  <numberRecordsProcessed>0</numberRecordsProcessed>
  <numberRetries>0</numberRetries>
  <apiVersion>41.0</apiVersion>
  <numberRecordsFailed>0</numberRecordsFailed>
  <totalProcessingTime>0</totalProcessingTime>
  <apiActiveProcessingTime>0</apiActiveProcessingTime>
  <apexProcessingTime>0</apexProcessingTime>
</jobInfo>
jobId = 1200a000001aABCD1

Add a Batch to the Job

Next step is to add a batch to the Job created in the previous step. A batch contains a SOQL query used to get the data from SFDC. After submitting the batch, we will extract the batchId from the JSON response received.

query = ‘select ID,NAME,PARENTID,PHONE,ACCOUNT_STATUS from ACCOUNT’

curl -d "${query}" -H "X-SFDC-Session: ${sessionId}" -H "Content-Type: application/json; 
charset=UTF-8" https://${instance}.salesforce.com/services/async/41.0/job/${jobId}/batch | 
python -m json.tool > account_batch_response.json
batchId = $(grep \"id\": $work_dir/job_responses/account_batch_response.json | awk -F':' '{print $2}' | tr -d ' ,"')

account_batch_response.json:

{
    "apexProcessingTime": 0,
    "apiActiveProcessingTime": 0,
    "createdDate": "2018-11-30T06:52:22.000+0000",
    "id": "1230a00000A1zABCDE",
    "jobId": "1200a000001aABCD1",
    "numberRecordsFailed": 0,
    "numberRecordsProcessed": 0,
    "state": "Queued",
    "stateMessage": null,
    "systemModstamp": "2018-11-30T06:52:22.000+0000",
    "totalProcessingTime": 0
}
batchId = 1230a00000A1zABCDE

Check The Batch Status

As Bulk API is an asynchronous API, the batch will be run at salesforce end and the state will be changed to Completed or Failed once the results are ready to download. We need to repeatedly check for the batch status until the status changes either to Completed or Failed.

status=""
while [ ! "$status" == "Completed" || ! "$status" == "Failed" ] 
do
sleep 10; #check status every 10 seconds
curl -H "X-SFDC-Session: ${sessionId}" 
https://${instance}.salesforce.com/services/async/41.0/job/${jobId}/batch/${batchId} | 
python -m json.tool > account_batchstatus_response.json
status=$(grep -i '"state":' account_batchstatus_response.json | awk -F':' '{print $2}' | 
tr -d ' ,"')
done;

account_batchstatus_response.json:

{
    "apexProcessingTime": 0,
    "apiActiveProcessingTime": 0,
    "createdDate": "2018-11-30T06:52:22.000+0000",
    "id": "7510a00000J6zNEAAZ",
    "jobId": "7500a00000Igq5YAAR",
    "numberRecordsFailed": 0,
    "numberRecordsProcessed": 33917,
    "state": "Completed",
    "stateMessage": null,
    "systemModstamp": "2018-11-30T06:52:53.000+0000",
    "totalProcessingTime": 0
}

Retrieve The Results

Once the state is updated to Completed, we can download the result dataset which will be in JSON format. The code snippet given below will extract the resultId from the JSON response and then will download the data using the resultId.

if [ "$status" == "Completed" ]; then

curl -H "X-SFDC-Session: ${sessionId}" 
https://${instance}.salesforce.com/services/async/41.0/job/${jobId}/batch/${batchId}/result | 
python -m json.tool > account_result_response.json

resultId = $(grep '"' account_result_response.json | tr -d ' ,"')

curl -H "X-SFDC-Session: ${sessionId}" 
https://${instance}.salesforce.com/services/async/41.0/job/${jobId}/batch/${batchId}/result/
${resultId} > account.json

fi

account_result_response.json:

[
    "7110x000008jb3a"
]
resultId = 7110x000008jb3a

Close the Job

Once the results have been retrieved, we can close the Job. Save below XML as close-job.xml.

<?xml version="1.0" encoding="UTF-8"?>
<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">
  <state>Closed</state>
</jobInfo>

Use the code given below to close the job, by suffixing the jobId to the close-job request URL.

curl -s -H "X-SFDC-Session: ${sessionId}" -H "Content-Type: text/csv; charset=UTF-8" -d 
@close-job.xml https://${instance}.salesforce.com/services/async/41.0/job/${jobId}

After running all the above steps, we will have the account.json generated in the current working directory, which contains the account data downloaded from Salesforce in JSON format, which we will use to load data into Snowflake in next steps.

Downloaded data file:

$ cat ./account.json

[ {
  "attributes" : {
    "type" : "Account",
    "url" : "/services/data/v41.0/sobjects/Account/2x234abcdedg5j"
  },
 "Id": "2x234abcdedg5j",
 "Name": "Some User",
 "ParentId": "2x234abcdedgha",
 "Phone": 124567890,
 "Account_Status": "Active"
}, {
  "attributes" : {
    "type" : "Account",
    "url" : "/services/data/v41.0/sobjects/Account/1x234abcdedg5j"
  },
 "Id": "1x234abcdedg5j",
 "Name": "Some OtherUser",
 "ParentId": "1x234abcdedgha",
 "Phone": null,
 "Account_Status": "Active"
} ]

Loading Data to Snowflake

Now that we have the JSON file downloaded from Salesforce, we can use it to load the data into a Snowflake table. File extracted from Salesforce has to be uploaded to Snowflake’s internal stage or to an external stage such as Microsoft Azure or AWS S3 location. Then we can load the Snowflake table using the created Snowflake Stage.

Creating Snowflake Stage

Stage in the snowflake is a location where data files are stored and that location is accessible by Snowflake, then we can use the Stage name to access the file in Snowflake or to load the table.

We can create a new stage, by following below steps:

  1. Login to the Snowflake Web Client UI.
  2. Select the desired Database from the Databases tab.
  3. Click on Stages tab
  4. Click Create, Select desired location (Internal, Azure or S3)
  5. Click Next

creating snowflake stage

  1. Fill the form that appears in the next window (given below).

 Fill the details i.e. Stage name, Stage schema of Snowflake, Bucket URL and the required access keys to access the Stage location such as AWS keys to access AWS S3 bucket.

create stage in snowflake

  1. Click Finish.

Creating Snowflake File Format

Once the stage is created, we are all set with the file location. Next step is to create the file format in Snowflake. File Format menu can be used to create the named file format, which can be used for bulk loading data into Snowflake using that file format.

As we have JSON format for the extracted Salesforce file, we will create the file format to read a JSON file.

Steps to create File Format:

  1. Login to Snowflake Web Client UI.
  2. Select the Databases tab.
  3. Click File Formats tab.
  4. Click Create.

create file format in snowflake

This will open a new window where we can mention the file format properties.

We have selected type as JSON, Schema as Format which stores all our File Formats. Also, we have selected Strip Outer Array option, this is required to strip the outer array (square brace that encloses entire JSON) that Salesforce adds to JSON file.

file format in snowflake

File Format can also be created using SQL in Snowflake. Also, grants have to be given to allow other roles to access this format or stage we have created.

create or replace file format format.JSON_STRIP_OUTER 
type = 'json'
field_delimiter = none
record_delimiter = '\\n'
STRIP_OUTER_ARRAY = TRUE;

grant USAGE on FILE FORMAT FORMAT.JSON_STRIP_OUTER to role developer_role; 

Loading Salesforce JSON data to Snowflake table

Now that we have created the required Stage and File Format of Snowflake, we can use them to bulk load the generated Salesforce JSON file and load data into Snowflake.

The advantage of JSON type in Snowflake:
Snowflake can access the semi-structured type like JSON or XML as a schemaless object and can directly query/parse the required fields without loading to a staging table. To know more about accessing semi-structured data in Snowflake, click here.

Parsing JSON file in Snowflake:

Using PARSE_JSON function we can interpret the JSON in Snowflake, we can write a query as given below to parse the JSON file into a tabular format. Explicit type casting is required when using parse_json as it’ll always default to string.

SELECT  
 parse_json($1):Id::string,
 parse_json($1):Name::string,
 parse_json($1):ParentId::string,
 parse_json($1):Phone::int,
 parse_json($1):Account_Status::string
from @STAGE.salesforce_stage/account.json
( file_format=>('format.JSON_STRIP_OUTER')) t;

We will create a table in snowflake and use the above query to insert data into it. We are using Snowflake’s web client UI for running these queries.

Upload file to S3:

upload file to s3

Table creation and insert query:

insert query for salesforce

Data inserted into the Snowflake target table:

salesforce target table

Limitations of Bulk API

  1. Maximum single file size is 1GB (Data that is more than 1GB, will be broken into multiple parts while retrieving results).
  2. Bulk API query doesn’t support the following in SOQL query:
    COUNT, ROLLUP, SUM, GROUP BY CUBE, OFFSET and Nested SOQL queries.
  3. Bulk API doesn’t support base64 data type fields.

Easier Way to move data from Salesforce to Snowflake

Using Hevo, official Snowflake ETL partner you can easily load data from Salesforce to Snowflake with just 2 simple steps. This does not need you to write any code and will provide you with an error-free, fully managed set up to move data in minutes.

  • Connect and configure your Salesforce account.
  • Select the Destination where you want to load the data (in this case, Snowflake).

It is that simple. While you relax, Hevo will take care of fetching the data and sending it to your destination warehouse.

In addition to this, Hevo lets you bring data from a wide array of sources – Cloud Apps, Databases, SDKs, and more. You can explore the complete list here.

Conclusion

This blog has covered all the steps required to extract data from Salesforce using Bulk API. Additionally, an easier alternative using Hevo has also been discussed.

Do leave a comment on your experience of replicating data from Salesforce to Snowflake and let us know what worked for you. 

ETL Data to Redshift, Bigquery, Snowflake

Move Data from any Source to Warehouse in Real-time

Sign up today to get $500 Free Credits to try Hevo!
Start Free Trial

Related Posts