Do you want to transfer your Salesforce data using Kafka? Are you finding it difficult to connect Kafka to Salesforce?

  1. Follow our easy step-by-step solution to help you master the skill to efficiently transferring your data from Salesforce using Kafka.
  2. It will help you take charge in a hassle-free way without compromising efficiency. This method is aimed at making the data export process as smooth as possible.
  3. It will further help you build a customized ETL pipeline for your organization. Through this article, you will get a deep understanding of the tools and techniques being mentioned & thus it will help you hone your skills further.

Introduction to Kafka

  • Apache Kafka is an open-source distributed software.
  • This allows a real-time transfer of data from one location to another.
  • Kafka, written in Scala, makes use of the Broker’s concept to transfer data in a fault-tolerant manner as per the requirement and subject-wise.
  • Kafka provides a high-throughput and low-latency distributed commit log system and a robust queue that can handle a high volume of data.

Introduction to Salesforce

  • Salesforce is a cloud-based CRM tool that helps you maintain and manage your organization’s interactions with its customer base. Salesforce generates a lot of data from managing these interactions. It also offers cloud-based tools such as data analytics, and IoT products.

Prerequisites

  • Working knowledge of Salesforce.
  • Working knowledge of Kafka.
  • A general idea about Apex programming language.
  • A general idea about databases and their operations.

Methods to Move Data From Kafka to Salesforce

  1. Salesforce Streaming APIs allow Kafka to capture the real-time events from Salesforce via an HTTP connection. T
  2. he data is available in the form of a Salesforce Object (sObject).
  3. Users can easily modify the data by performing various operations such as deleting, updating, and creating new events. You can use a query written in Salesforce Object Query Language (SOQL) to retrieve information about various data events as per the need.

This method can be implemented using the following steps:

Step 1: Creating a PushTopic record in Salesforce

The easiest way to create a new PushTopic record is by using the Salesforce Developer Console, an integrated development environment that you can use to create, debug, and test applications in Salesforce.
From the debug menu, select open execute anonymous window in the Developers Console and paste the following Apex code to create a PushTopic record: Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'ContactUpdates';
pushTopic.Query = 'SELECT Id, Name FROM Contact';
pushTopic.ApiVersion = 36.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

Using the above Apex code, you will be able to create a PushTopic called ContantUpadates, that will create a new event every time a user, creates or modifies or deletes a contact. The PushTopic will send the Id and name of the contact the user is trying to access.

Step 2: Installing the Kafka connector for Salesforce

To install the Kafka connector for Salesforce, start your Kafka server and use the following command:

sudo npm install -g salesforce-kafka-connect

This command will download and install the Kafka connector for Salesforce on your system.

Step 3: Configuring the Salesforce Streaming Events

To configure the Kafka connector, begin by using the following command:

sudo nano /usr/lib/node_modules/salesforce-kafka-connect/config/default.js

The configuration file will now open up as follows. Replace the username and password fields with your credentials to allows Kafka to access the data associated with your account.

"use strict";
const path = require("path");
const config = {
    kafka: {
        zkConStr: "localhost:2181/",
        logger: null,
        groupId: "kc-salesforce-group",
        clientName: "kc-salesforce-client",
        workerPerPartition: 1,
        options: {
            sessionTimeout: 8000,
            protocol: ["roundrobin"],
            fromOffset: "earliest", //latest
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 10,
            heartbeatInterval: 250,
            retryMinTimeout: 250,
            autoCommit: true,
            autoCommitIntervalMs: 1000,
            requireAcks: 1,
            //ackTimeoutMs: 100,
            //partitionerType: 3
        }
    },
    topic: "sf-test-topic",
    partitions: 1,
    maxTasks: 1,
    maxPollCount: 5,
    pollInterval: 250,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        username: "user",
        password: "password",
        loginUrl: "https://user.salesforce.com",
        streamingSource: {
            batchSize: 5,
            topic: "StreamingTopic",
            kafkaTopic: "sf-test-topic",
            idProperty: "id"
        },
        restSink: {
            sObject: "sobject",
            idProperty: "id"
            batchSize: 500
        }
    },
    http: {
        port: 3149,
        middlewares: []
    },
    enableMetrics: true
};
module.exports = config;

Perform the same operation and replace the username and password parameters with your credentials in test-config and source-config files. You can access the files by using the following file paths:

/usr/lib/node_modules/salesforce-kafka-connect/test/sink-config.js
/usr/lib/node_modules/salesforce-kafka-connect/test/source-config.js

Once you have configured your Salesforce data source to create events whenever the Salesforce Streaming APIs produces a request, you now need to set up a sink, that will retrieve the data from Salesforce and transfer it to Kafka. You can make use of the Salesforce-Kafka-Connect package to implement this. You can open the index.js using the following file path:

/usr/lib/node_modules/salesforce-kafka-connect/index.js

The streaming of Salesforce data into Kafka is handled by the runSourceConnector function.

Function to run source connector.

Once you have called the function, you need to transfer data from Salesforce, use the following command in your command-line interface: Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

nkc-salesforce-source --help

Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

Step 4: Configuring the ETL Pipeline for connecting Kafka to Salesforce

To fetch data from the Salesforce Streaming APIs and store it into Kafka and then transform the data into an outgoing Salesforce PoducerRecord, you need to set up the Kafka sink. The runSinkConnector function can help you performs this operation.

Function to run sink connector.

You can use the following command in your command-line interface to call this function:

nkc-salesforce-sink --help

Once you have called the function, you now need to transfer your Salesforce data into the Kafka sink. To do this, you can create a function called messageToProducerRecord as follows:

Passing message from Kafka to Salesforce.

This is how you can use the Salesforce Streaming APIs to connect Kafka to Salesforce and transfer your data in real-time. Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

Talha
Software Developer, Hevo Data

Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.

No-code Data Pipeline For Salesforce