To stay relevant in today’s industry, processing data in real-time and responding to it is critical for enterprises. Processing real-time data can offer insights that help ‘tech-savvy’ companies to stay ahead in the market, capitalize on customer demands and improve operational efficiency. You can leverage an AWS Lambda function to process records in a data stream. In streaming applications, data arrives continually, frequently from several sources, and is handled progressively. AWS Kinesis Lambda can capture these data streams of AWS Kinesis and help you process data effectively.

Table of Contents

Prerequisites

  • Understanding of Cloud Computing.

What is AWS Lambda?

AWS Kinesis Lambda: AWS logo
Image Source: AWS Lambda

Introduced in 2014, AWS Lambda allows you to run code as functions without deploying or managing servers. This Platform-as-a-Service (or PaaS) enables you to run your code on a high-availability computing infrastructure while handling all compute resource administration, such as server and operating system maintenance, capacity provisioning and automated scaling, code monitoring, and logging. You can use Lambda to run code for almost any form of application or backend service. All you have to do is write your code in one of Lambda’s supported languages.

The key benefit of utilizing Lambda as part of AWS is that it will consistently execute application events in milliseconds. Application developers don’t have to worry about managing the computing environment since AWS Lambda executes back-end code within an application, ensuring that it won’t falter or fail. Developers can rely on Lambda to handle transactions in near-real-time, with great dependability, and without any processing hiccups.

What are Lambda Functions?

The key characteristics of Lambda functions are:

  • They are made up of code and any dependencies that come with it.
  • The function is linked to configuration information.
  • When you construct the function, you specify the configuration information. You can update configuration data via the API.
  • They can access: 
    • AWS services or non-AWS services.
    • VPCs hosting AWS services (e.g., Redshift, Elasticache, RDS instances).
    • Non-AWS services operating in an AWS VPC on EC2 instances.

You can either use Lambda console, Lambda API, AWS SDK, AWS CLI, or AWS toolkits to call Lambda functions directly. You must supply additional VPC-specific configuration information, including VPC subnet IDs and security group IDs, to enable your Lambda function to access resources within your private VPC. This information is used by AWS Lambda to create Elastic Network Interfaces (ENIs) that allow your function.

What is Amazon Kinesis?

AWS Kinesis Lambda: Amazon Kinesis Logo
Image: Amazon Kinesis

AWS Kinesis is a fully managed and highly scalable platform for collecting, processing, and analyzing real-time data, thereby allowing you to get insights and respond to new information quickly. AWS Kinesis has a number of useful capabilities for cost-effectively processing streaming data at the necessary scale. It also gives you the freedom to choose tools that properly match the needs of your applications. For Analytics, Machine Learning, and a variety of other applications, Amazon Kinesis aids in the ingestion of real-time data such as video, application logs, IoT telemetry data, audio, website clickstreams, and audio.

Key Components of Amazon Kinesis

1) Amazon Kinesis Firehose 

Firehose allows customers to load or transform their data streams into Amazon Web Services, which they may then utilize for additional functions such as analyzing or storing. It is totally automated and scales automatically in response to data, so it does not require ongoing maintenance. Firehose can ingest, analyze, and distribute real-time data to an unlimited number of endpoints and services. This includes service providers as well as Amazon S3, Amazon Redshift, Amazon ElasticSearch Service, or basic HTTP endpoints.

2) Amazon Kinesis Data Streams  

Amazon Kinesis Data Streams provides a proven solution for highly scalable and durable real-time data streaming for continually recording, processing, and storing data streams. The capacity of Kinesis Data Streams to gather terabytes of data per second from many sources is what makes them so intriguing.

3) Amazon Kinesis Video Streams 

Amazon Kinesis Video Streams is a video streaming service that is similar to AWS Kinesis Data Streams. It enables you to securely stream video from a variety of devices and offers the data for playback, machine learning, analytics, and other processing. It can capture data from almost any video device you can think of, including security cameras, smartphone video, drones, RADARs, LIDARs, satellites, and more. It can help you create apps with real-time computer vision capabilities and video analytics utilizing popular open-source machine learning frameworks by integrating with Amazon Rekognition Video.

4) Amazon Kinesis Data Analytics 

Kinesis Data Analytics uses the Apache Flink open-source framework & engine to process and analyze streaming data in real-time. It’s created to make developing, operating, and connecting Flink applications with other AWS services easier. It also works with Kinesis Data Streams (KDS), Managed Streaming for Apache Kafka (Amazon MSK, Kinesis Firehose, Amazon Elasticsearch), and other Amazon Web services.

Replicate Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from Amazon S3, Elasticsearch, and 100+ Data Sources straight into your Data Warehouse (like AWS Redshift) or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

GET STARTED WITH HEVO FOR FREE

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

Getting Started with using AWS Kinesis Lambda

Before we proceed with using AWS Kinesis Lambda, do the following:

Step 1: Install the Kinesis CDK package.

npm i @aws-cdk/aws-kinesis

Step 2: Open lib/how-to-trigger-lambda-from-kinesis-stack.ts, add a new Kinesis stream and deploy.

import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';

export class HowToTriggerLambdaFromKinesisStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stream = new kinesis.Stream(this, 'MyKinesisStream', {
      streamName: 'MyKinesisStream',
    });
  }
}

Step 3: Install the Lambda CDK package.

npm i @aws-cdk/aws-lambda

Deploying a Lambda function necessitates bootstrapping your CDK app, which provides us with an S3 bucket in which you can store Lambda source code. This is a one-time procedure.

npm run cdk bootstrap

Step 4: Create src/index.js and paste the following code:

import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as lambda from '@aws-cdk/aws-lambda';

export class HowToTriggerLambdaFromKinesisStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stream = new kinesis.Stream(this, 'MyKinesisStream', {
      streamName: 'MyKinesisStream',
    });

    const lambdaFunction = new lambda.Function(this, 'Function', {
      code: lambda.Code.fromAsset('src'),
      handler: 'index.handler',
      functionName: 'KinesisMessageHandler',
      runtime: lambda.Runtime.NODEJS_12_X,
    });
  }
}
  • Deploy again
npm run cdk deploy
  • You might need to confirm some IAM changes.
AWS Kinesis Lambda: IAM roles and changes
Image Source: lh5.googleusercontent.com

How to use AWS Kinesis Lambda Functions?

You can use an AWS Lambda function for processing records in an Amazon Kinesis Data Stream for AWS Kinesis Lambda.

A Kinesis Data Stream is a collection of shards where each shard is made up of a series of data records. A Lambda function can be assigned to either a shared-throughput consumer or a dedicated-throughput consumer with improved fan-out. Here, a consumer is a program that consumes data from a Kinesis Data Stream and processes it.

Lambda queries each shard in your Kinesis stream for records using the HTTP protocol for conventional iterators. The read-throughput of the event source mapping is shared with the other consumers of the shard.

Step 1: Create a Trigger

You can create a Kinesis trigger in the AWS Kinesis Lambda console to set up your function to read from Kinesis.

  • Open the Lambda console’s Functions page.
  • Pick a name for a function.
  • Select Add a trigger for AWS Kinesis Lambda from the Function overview menu.
  • Select a type of trigger.
  • Choose Add after configuring the needed settings.

For AWS Kinesis event sources, AWS Kinesis Lambda provides the following options.

  • Kinesis Stream: The Kinesis stream from which you can read records.
  • Consumer (optional): Read from the stream using a stream consumer via a dedicated connection.
  • Batch Size: The maximum number of records to deliver to the function in a single batch, up to 10,000.
  • Batch Window: In seconds, specify the maximum length of time to gather records before calling the function.
  • Starting Point: Only process new records, all current records, or records produced after a specific date.
    • Latest: Process newly added records to the stream.
    • Trim the Horizon: Run through all of the records in the stream.
    • At timestamp: Begin processing data at a given time.
  • On-failure Destination: An SQS Queue or SNS Topic for failed records. When Lambda discards a batch of data because it is too old or has exhausted all retries, it sends batch information to the queue or topic.
  • Retry Attempts: The number of times Lambda tries when a function returns an error.
  • Maximum Record Age: The maximum age of a record sent to your function via Lambda.
  • Split Batch on Error: If the function produces an error, split the batch in half before retrying.
  • Concurrent Batches per Shard: Process several batches from the same shard at the same time.
  • Enabled: When set to true, the event source mapping is enabled. To stop processing records, set false.

Step 2: Create the Execution Role

You can create the execution role, which grants your function access to AWS Kinesis Lambda. To create a role for execution for AWS Kinesis Lambda:

  • In the IAM console, navigate to the roles page.
  • Select the Create role option.
  • Create a role using the attributes listed below.
    • Trusted Entity: AWS Lambda.
    • Permissions: AWSLambdaKinesisExecutionRole.
    • Role Name: lambda-kinesis-role.

The AWSLambdaKinesisExecutionRole policy grants the function the rights it requires to receive things from AWS Kinesis and publish logs to CloudWatch Logs in AWS Kinesis Lambda.

Step 3: Create a Function

The following code receives a Kinesis event input and handles the messages contained inside it. To create the function for AWS Kinesis Lambda:

  • Copy the sample code into a file called index.js. 
console.log('Loading function');

exports.handler = function(event, context) {
    //console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        // Kinesis data is base64 encoded so decode here
        var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
    });
};
  • Create a deployment package for AWS Kinesis Lambda.
zip function.zip index.js
  • The create-function command is used to build a Lambda function for AWS Kinesis Lambda.
aws lambda create-function --function-name ProcessKinesisRecords 
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x 
--role arn:aws:iam::123456789012:role/lambda-kinesis-role

Step 4: Test the Lambda Function

In this section, you can manually execute your Lambda function by using the invoke AWS Lambda CLI command and a sample AWS Kinesis Lambda event.

  • Copy the JSON below into a text file and save it as input.txt for AWS Kinesis Lambda.
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
  • To send the event to the function, use the ‘invoke’ command for AWS Kinesis Lambda.
aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt
  • The output will be saved to out.txt.

Step 5: Create a Kinesis stream

  • Use the ‘create-stream’ command to create a stream using AWS Kinesis Lambda.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
  • Run this ‘describe-stream’ command to get the Stream ARN for your AWS Kinesis Lambda function.
aws kinesis describe-stream --stream-name lambda-stream

Step 6: Delete the Lambda Function

  1. Navigate to the Lambda console’s Functions page.
  2. Choose the function you created.
  3. Choose Delete from the Actions menu.
  4. Select Delete.

Step 7: Delete the Kinesis Stream

  1. Sign in to the AWS Management Console and navigate to https://console.aws.amazon.com/kinesis.
  2. Choose the stream you made.
  3. Select Actions, then Delete.
  4. In the text box, type delete.
  5. Select Delete.

Best Practices for using AWS Kinesis Lambda

1) Enhanced shard-level Metrics

Enabling shard-level metrics with Kinesis Data Streams is a recommended practice for AWS Kinesis Lambda Functions. Kinesis Data Streams transmits extra shard-level metrics to CloudWatch every minute. This can help you find hot shards and locate failed customers for a given record or shard.

2) IteratorAge

You must pay particular attention to the IteratorAge (GetRecords.IteratorAgeMilliseconds) statistic. The gap in time between the current and when the final record of the GetRecords call was sent to the stream is called age. If this number increases, data from the stream gets delayed. The expired records are permanently lost if the iterator age exceeds your retention period. Make sure to use CloudWatch alerts on the maximum statistic to notify you when this loss is imminent. CloudWatch Alarm also notifies you when function metrics such as ConcurrentExecutions or Invocations exceed your threshold. 

3) Poison Message

A Lambda function is executed for a batch of data from a shard, and it checkpoints each batch’s progress, so either a batch is processed successfully or the whole batch is retried until processing is successful or records fall off the stream based on retention duration. A poison message causes a batch process to fail, resulting in two scenarios: duplicate findings or delayed data processing and data loss.

There are two approaches to dealing with failure:

  • The first is to include logic in the Lambda function code to catch errors and log them for offline analysis before proceeding to the next batch.
  • The second (and recommended) approach is to use AWS Kinesis Lambda as the consumer for AWS Kinesis Data Streams and define the following retry and failure behaviour settings: On-failure destination, Retry attempts, Maximum age of the record, and Split batch on error.

3) ReadProvisionedThroughputExceeded

The ReadProvisionedThroughputExceeded metric displays the number of GetRecords calls throttled over a certain time period. You can use this indicator to see if your readings are being throttled because you have exceeded your read throughput limits. If the Average statistic has a value other than 0, some of your consumers are throttled. To enhance throughput, add shards to the stream or use an EFO consumer to activate your Lambda function.

Common Pitfalls and How to Fix Them?

1) No Error Handling

In each call, AWS Kinesis Lambda Functions can process up to a hundred records. It ensures that everything works smoothly until a faulty message arrives and threatens the function. 

AWS Kinesis Lambda Function will attempt to process the batch 10,000 times by default, the maximum number of retries. However, it will not process any further records until the issue is fixed. This is due to Kinesis’ in-order processing promise.

Though you can repeat the procedure 10,000 times, you may want to bypass the troublesome message and go on to the next one, keeping your systems from being bogged down with old data.

To address this, you can use the following parameters:

  • maximumRetryAttempts: To minimize the number of retries in AWS Kinesis Lambda, increase the maximumRetryAttempts.
  • destinations.onFailure: While the Lambda metrics can notify us of problems, it’s typically a good idea to transmit information about unprocessed records to a Dead Letter Queue. The DLQ might be an SQS or SNS that you mention in the destinations.onFailure. You can re-ingest lost records after you locate and rectify the flawed Lambda logic that is causing the problem.
  • bisectBatchOnFunctionError: When set to true, the batch is divided in two and retried separately each time the AWS Kinesis Lambda execution fails. You can finally isolate the single faulty record, depending on batch size and amount of retries. After getting rid of the faulty record, you can effectively process all others.
What Makes Hevo’s AWS ETL Process Best-In-Class

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s automated, No-code platform empowers you with everything you need to have the most holistic AWS Data Integration experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for Amazon S3, Elasticsearh, and 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!

2) Inadequate Parallelization

Kinesis Data Stream is made up of shards, and you pay as per the number of shards you are using. Each shard may receive up to 1 MB of data per second, or 1,000 records per second. The fact that you have adequate throughput to ingest messages into the shard does not imply that you can read and process them at the same rate.

If your AWS Kinesis Lambda function takes longer to handle data, you risk lagging more frequently.

There are two ways to counter this. 

  • The first is to raise the number of shards on the Kinesis side, incurring additional expenditures.
  • parallelizationFactor metric allows you to handle messages from a single shard in up to ten parallel executions in AWS Kinesis Lambda. Despite concurrent reading from shards, the order of records with the same partition key is preserved. Thus, increased parallelization enables the safe processing of a larger data volume without increasing Kinesis rates.

3) Incorrect Starting Position

This is a common issue when generating a new AWS Kinesis Lambda trigger. However, this might happen during disaster recovery, so it is best to plan ahead of time.

By default, when you deploy a new Lambda function with Kinesis as a trigger, it will begin by reading all existing entries from the stream. Depending on the stream retention period, this could mean all communications over the previous 365 days.

As a result, if you wish to handle only new messages that arrive after you deploy your function, you must explicitly define the startingPosition. The default value is TRIM HORIZON, which causes the search to begin with the oldest accessible record. To begin with the most recent record at the time of function deployment, alter it to LATEST.

If you wish to handle records from a certain moment in time, another option is to specify a timestamp.

Conclusion

In this article, you learned how to perform data-stream basic functions using AWS Kinesis Lambda. The article also explained some of the best practices and Kinesis Lambda Examples that can be followed by users while leveraging the data streaming and analytics capabilities of Kinesis. In addition, it highlighted some of the common pitfalls while using both the AWS solutions to help you avail the comprehensive benefits of the same.

There are various Data Sources that organizations leverage to capture a variety of valuable data points. But, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task. It requires you to code and maintains complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps in solving this issue and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 100+ pre-built Integrations that you can choose from.

visit our website to explore hevo

Hevo can help you integrate data from 100+ data sources and load them into a destination like AWS Redshift to analyze real-time data at an affordable price. It will make your life easier and Data Migration hassle-free. It is user-friendly, reliable, and secure.

SIGN UP for a 14-day free trial and see the difference!

Share your experience of learning about AWS Kinesis Lambda Functions in the comments section below.

Preetipadma Khandavilli
Freelance Technical Content Writer, Hevo Data

Preetipadma is passionate about freelance writing within the data industry, expertly delivering informative and engaging content on data science by incorporating her problem-solving skills.

No-code Data Pipeline For your Data Warehouse

Get Started with Hevo