To stay relevant in today’s industry, processing data in real-time and responding to it is critical for enterprises. Processing real-time data can offer insights that help ‘tech-savvy’ companies to stay ahead in the market, capitalize on customer demands and improve operational efficiency. You can leverage an AWS Lambda function to process records in a data stream. In streaming applications, data arrives continually, frequently from several sources, and is handled progressively. AWS Kinesis Lambda can capture these data streams of AWS Kinesis and help you process data effectively.

Prerequisites

  • Understanding of Cloud Computing.

What is AWS Lambda?

AWS Kinesis Lambda: AWS logo

Introduced in 2014, AWS Lambda allows you to run code as functions without deploying or managing servers. This Platform-as-a-Service (or PaaS) enables you to run your code on a high-availability computing infrastructure while handling all compute resource administration, such as server and operating system maintenance, capacity provisioning and automated scaling, code monitoring, and logging. You can use Lambda to run code for almost any form of application or backend service. All you have to do is write your code in one of Lambda’s supported languages.

The key benefit of utilizing Lambda as part of AWS is that it will consistently execute application events in milliseconds. Application developers don’t have to worry about managing the computing environment since AWS Lambda executes back-end code within an application, ensuring that it won’t falter or fail. Developers can rely on Lambda to handle transactions in near-real-time, with great dependability, and without any processing hiccups.

What are Lambda Functions?

The key characteristics of Lambda functions are:

  • They are made up of code and any dependencies that come with it.
  • The function is linked to configuration information.
  • When you construct the function, you specify the configuration information. You can update configuration data via the API.
  • They can access:
    • AWS services or non-AWS services.
    • VPCs hosting AWS services (e.g., Redshift, Elasticache, RDS instances).
    • Non-AWS services operating in an AWS VPC on EC2 instances.

You can either use Lambda console, Lambda API, AWS SDK, AWS CLI, or AWS toolkits to call Lambda functions directly. You must supply additional VPC-specific configuration information, including VPC subnet IDs and security group IDs, to enable your Lambda function to access resources within your private VPC. This information is used by AWS Lambda to create Elastic Network Interfaces (ENIs) that allow your function.

Streamline Your AWS Data Processing with Hevo

Hevo simplifies data integration with AWS services like S3, Redshift, and more. By automating data flows, Hevo allows you to focus on actionable insights while effortlessly managing complex data pipelines. Streamline your integration process and enhance your data-driven decisions with Hevo’s powerful features.

Why Hevo is the Best:

  • Minimal Learning Curve: Hevo’s simple, interactive UI makes it easy for new users to get started and perform operations.
  • Connectors: With over 150 connectors, Hevo allows you to integrate various data sources into your preferred destination seamlessly.
  • Cost-Effective Pricing: Transparent pricing with no hidden fees, helping you budget effectively while scaling your data integration needs.

Try Hevo today and experience seamless data transformation and integration. 

Get Started with Hevo for Free

What is Amazon Kinesis?

AWS Kinesis Lambda: Amazon Kinesis Logo

AWS Kinesis is a fully managed and highly scalable platform for collecting, processing, and analyzing real-time data, thereby allowing you to get insights and respond to new information quickly. AWS Kinesis has a number of useful capabilities for cost-effectively processing streaming data at the necessary scale. It also gives you the freedom to choose tools that properly match the needs of your applications. For Analytics, Machine Learning, and a variety of other applications, Amazon Kinesis aids in the ingestion of real-time data such as video, application logs, IoT telemetry data, audio, website clickstreams, and audio.

Key Components of Amazon Kinesis

1) Amazon Kinesis Firehose 

Firehose allows customers to load or transform their data streams into Amazon Web Services, which they may then utilize for additional functions such as analyzing or storing. It is totally automated and scales automatically in response to data, so it does not require ongoing maintenance. Firehose can ingest, analyze, and distribute real-time data to an unlimited number of endpoints and services. This includes service providers as well as Amazon S3, Amazon Redshift, Amazon ElasticSearch Service, or basic HTTP endpoints.

2) Amazon Kinesis Data Streams  

Amazon Kinesis Data Streams provides a proven solution for highly scalable and durable real-time data streaming for continually recording, processing, and storing data streams. The capacity of Kinesis Data Streams to gather terabytes of data per second from many sources is what makes them so intriguing.

3) Amazon Kinesis Video Streams 

Amazon Kinesis Video Streams is a video streaming service that is similar to AWS Kinesis Data Streams. It enables you to securely stream video from a variety of devices and offers the data for playback, machine learning, analytics, and other processing. It can capture data from almost any video device you can think of, including security cameras, smartphone video, drones, RADARs, LIDARs, satellites, and more. It can help you create apps with real-time computer vision capabilities and video analytics utilizing popular open-source machine learning frameworks by integrating with Amazon Rekognition Video.

4) Amazon Kinesis Data Analytics 

Kinesis Data Analytics uses the Apache Flink open-source framework & engine to process and analyze streaming data in real-time. It’s created to make developing, operating, and connecting Flink applications with other AWS services easier. It also works with Kinesis Data Streams (KDS), Managed Streaming for Apache Kafka (Amazon MSK, Kinesis Firehose, Amazon Elasticsearch), and other Amazon Web services.

Getting Started with using AWS Kinesis Lambda

Before we proceed with using AWS Kinesis Lambda, do the following:

Step 1: Install the Kinesis CDK package.

npm i @aws-cdk/aws-kinesis

Step 2: Open lib/how-to-trigger-lambda-from-kinesis-stack.ts, add a new Kinesis stream and deploy.

import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';

export class HowToTriggerLambdaFromKinesisStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stream = new kinesis.Stream(this, 'MyKinesisStream', {
      streamName: 'MyKinesisStream',
    });
  }
}

Step 3: Install the Lambda CDK package.

npm i @aws-cdk/aws-lambda

Deploying a Lambda function necessitates bootstrapping your CDK app, which provides us with an S3 bucket in which you can store Lambda source code. This is a one-time procedure.

npm run cdk bootstrap

Step 4: Create src/index.js and paste the following code:

import * as cdk from '@aws-cdk/core';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as lambda from '@aws-cdk/aws-lambda';

export class HowToTriggerLambdaFromKinesisStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stream = new kinesis.Stream(this, 'MyKinesisStream', {
      streamName: 'MyKinesisStream',
    });

    const lambdaFunction = new lambda.Function(this, 'Function', {
      code: lambda.Code.fromAsset('src'),
      handler: 'index.handler',
      functionName: 'KinesisMessageHandler',
      runtime: lambda.Runtime.NODEJS_12_X,
    });
  }
}
  • Deploy again
npm run cdk deploy
  • You might need to confirm some IAM changes.
AWS Kinesis Lambda: IAM roles and changes

How to use AWS Kinesis Lambda Functions?

You can use an AWS Lambda function for processing records in an Amazon Kinesis Data Stream for AWS Kinesis Lambda.

A Kinesis Data Stream is a collection of shards where each shard is made up of a series of data records. A Lambda function can be assigned to either a shared-throughput consumer or a dedicated-throughput consumer with improved fan-out. Here, a consumer is a program that consumes data from a Kinesis Data Stream and processes it.

Lambda queries each shard in your Kinesis stream for records using the HTTP protocol for conventional iterators. The read-throughput of the event source mapping is shared with the other consumers of the shard.

Step 1: Create a Trigger

You can create a Kinesis trigger in the AWS Kinesis Lambda console to set up your function to read from Kinesis.

  • Open the Lambda console’s Functions page.
  • Pick a name for a function.
  • Select Add a trigger for AWS Kinesis Lambda from the Function overview menu.
  • Select a type of trigger.
  • Choose Add after configuring the needed settings.

For AWS Kinesis event sources, AWS Kinesis Lambda provides the following options.

  • Kinesis Stream: The Kinesis stream from which you can read records.
  • Consumer (optional): Read from the stream using a stream consumer via a dedicated connection.
  • Batch Size: The maximum number of records to deliver to the function in a single batch, up to 10,000.
  • Batch Window: In seconds, specify the maximum length of time to gather records before calling the function.
  • Starting Point: Only process new records, all current records, or records produced after a specific date.
    • Latest: Process newly added records to the stream.
    • Trim the Horizon: Run through all of the records in the stream.
    • At timestamp: Begin processing data at a given time.
  • On-failure Destination: An SQS Queue or SNS Topic for failed records. When Lambda discards a batch of data because it is too old or has exhausted all retries, it sends batch information to the queue or topic.
  • Retry Attempts: The number of times Lambda tries when a function returns an error.
  • Maximum Record Age: The maximum age of a record sent to your function via Lambda.
  • Split Batch on Error: If the function produces an error, split the batch in half before retrying.
  • Concurrent Batches per Shard: Process several batches from the same shard at the same time.
  • Enabled: When set to true, the event source mapping is enabled. To stop processing records, set false.

Step 2: Create the Execution Role

You can create the execution role, which grants your function access to AWS Kinesis Lambda. To create a role for execution for AWS Kinesis Lambda:

  • In the IAM console, navigate to the roles page.
  • Select the Create role option.
  • Create a role using the attributes listed below.
    • Trusted Entity: AWS Lambda.
    • Permissions: AWSLambdaKinesisExecutionRole.
    • Role Name: lambda-kinesis-role.

The AWSLambdaKinesisExecutionRole policy grants the function the rights it requires to receive things from AWS Kinesis and publish logs to CloudWatch Logs in AWS Kinesis Lambda.

Step 3: Create a Function

The following code receives a Kinesis event input and handles the messages contained inside it. To create the function for AWS Kinesis Lambda:

  • Copy the sample code into a file called index.js. 
console.log('Loading function');

exports.handler = function(event, context) {
    //console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        // Kinesis data is base64 encoded so decode here
        var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
    });
};
  • Create a deployment package for AWS Kinesis Lambda.
zip function.zip index.js
  • The create-function command is used to build a Lambda function for AWS Kinesis Lambda.
aws lambda create-function --function-name ProcessKinesisRecords 
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x 
--role arn:aws:iam::123456789012:role/lambda-kinesis-role

Step 4: Test the Lambda Function

In this section, you can manually execute your Lambda function by using the invoke AWS Lambda CLI command and a sample AWS Kinesis Lambda event.

  • Copy the JSON below into a text file and save it as input.txt for AWS Kinesis Lambda.
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
  • To send the event to the function, use the ‘invoke’ command for AWS Kinesis Lambda.
aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt
  • The output will be saved to out.txt.

Step 5: Create a Kinesis stream

  • Use the ‘create-stream’ command to create a stream using AWS Kinesis Lambda.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
  • Run this ‘describe-stream’ command to get the Stream ARN for your AWS Kinesis Lambda function.
aws kinesis describe-stream --stream-name lambda-stream

Step 6: Delete the Lambda Function

  1. Navigate to the Lambda console’s Functions page.
  2. Choose the function you created.
  3. Choose Delete from the Actions menu.
  4. Select Delete.

Step 7: Delete the Kinesis Stream

  1. Sign in to the AWS Management Console and navigate to https://console.aws.amazon.com/kinesis.
  2. Choose the stream you made.
  3. Select Actions, then Delete.
  4. In the text box, type delete.
  5. Select Delete.

Best Practices for using AWS Kinesis Lambda

1) Enhanced shard-level Metrics

Enabling shard-level metrics with Kinesis Data Streams is a recommended practice for AWS Kinesis Lambda Functions. Kinesis Data Streams transmits extra shard-level metrics to CloudWatch every minute. This can help you find hot shards and locate failed customers for a given record or shard.

2) IteratorAge

You must pay particular attention to the IteratorAge (GetRecords.IteratorAgeMilliseconds) statistic. The gap in time between the current and when the final record of the GetRecords call was sent to the stream is called age. If this number increases, data from the stream gets delayed. The expired records are permanently lost if the iterator age exceeds your retention period. Make sure to use CloudWatch alerts on the maximum statistic to notify you when this loss is imminent. CloudWatch Alarm also notifies you when function metrics such as ConcurrentExecutions or Invocations exceed your threshold. 

3) Poison Message

A Lambda function is executed for a batch of data from a shard, and it checkpoints each batch’s progress, so either a batch is processed successfully or the whole batch is retried until processing is successful or records fall off the stream based on retention duration. A poison message causes a batch process to fail, resulting in two scenarios: duplicate findings or delayed data processing and data loss.

There are two approaches to dealing with failure:

  • The first is to include logic in the Lambda function code to catch errors and log them for offline analysis before proceeding to the next batch.
  • The second (and recommended) approach is to use AWS Kinesis Lambda as the consumer for AWS Kinesis Data Streams and define the following retry and failure behaviour settings: On-failure destination, Retry attempts, Maximum age of the record, and Split batch on error.

3) ReadProvisionedThroughputExceeded

The ReadProvisionedThroughputExceeded metric displays the number of GetRecords calls throttled over a certain time period. You can use this indicator to see if your readings are being throttled because you have exceeded your read throughput limits. If the Average statistic has a value other than 0, some of your consumers are throttled. To enhance throughput, add shards to the stream or use an EFO consumer to activate your Lambda function.

Common Pitfalls and How to Fix Them?

1) No Error Handling

In each call, AWS Kinesis Lambda Functions can process up to a hundred records. It ensures that everything works smoothly until a faulty message arrives and threatens the function. 

AWS Kinesis Lambda Function will attempt to process the batch 10,000 times by default, the maximum number of retries. However, it will not process any further records until the issue is fixed. This is due to Kinesis’ in-order processing promise.

Though you can repeat the procedure 10,000 times, you may want to bypass the troublesome message and go on to the next one, keeping your systems from being bogged down with old data.

To address this, you can use the following parameters:

  • maximumRetryAttempts: To minimize the number of retries in AWS Kinesis Lambda, increase the maximumRetryAttempts.
  • destinations.onFailure: While the Lambda metrics can notify us of problems, it’s typically a good idea to transmit information about unprocessed records to a Dead Letter Queue. The DLQ might be an SQS or SNS that you mention in the destinations.onFailure. You can re-ingest lost records after you locate and rectify the flawed Lambda logic that is causing the problem.
  • bisectBatchOnFunctionError: When set to true, the batch is divided in two and retried separately each time the AWS Kinesis Lambda execution fails. You can finally isolate the single faulty record, depending on batch size and amount of retries. After getting rid of the faulty record, you can effectively process all others.

2) Inadequate Parallelization

Kinesis Data Stream is made up of shards, and you pay as per the number of shards you are using. Each shard may receive up to 1 MB of data per second, or 1,000 records per second. The fact that you have adequate throughput to ingest messages into the shard does not imply that you can read and process them at the same rate.

If your AWS Kinesis Lambda function takes longer to handle data, you risk lagging more frequently.

There are two ways to counter this. 

  • The first is to raise the number of shards on the Kinesis side, incurring additional expenditures.
  • parallelizationFactor metric allows you to handle messages from a single shard in up to ten parallel executions in AWS Kinesis Lambda. Despite concurrent reading from shards, the order of records with the same partition key is preserved. Thus, increased parallelization enables the safe processing of a larger data volume without increasing Kinesis rates.

3) Incorrect Starting Position

This is a common issue when generating a new AWS Kinesis Lambda trigger. However, this might happen during disaster recovery, so it is best to plan ahead of time.

By default, when you deploy a new Lambda function with Kinesis as a trigger, it will begin by reading all existing entries from the stream. Depending on the stream retention period, this could mean all communications over the previous 365 days.

As a result, if you wish to handle only new messages that arrive after you deploy your function, you must explicitly define the startingPosition. The default value is TRIM HORIZON, which causes the search to begin with the oldest accessible record. To begin with the most recent record at the time of function deployment, alter it to LATEST.

If you wish to handle records from a certain moment in time, another option is to specify a timestamp.

Conclusion

In this article, you learned how to perform data-stream basic functions using AWS Kinesis Lambda. The article also explained some of the best practices and Kinesis Lambda Examples that can be followed by users while leveraging the data streaming and analytics capabilities of Kinesis. In addition, it highlighted some of the common pitfalls while using both the AWS solutions to help you avail the comprehensive benefits of the same.

There are various Data Sources that organizations leverage to capture a variety of valuable data points. But, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task. It requires you to code and maintains complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps in solving this issue and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 150+ pre-built Integrations that you can choose from. . Sign up for Hevo’s 14-day free trial and experience seamless data migration.

FAQs

1. Can Lambda write to Kinesis?

Yes, AWS Lambda can write to Kinesis by using the PutRecord or PutRecords APIs to send data to a Kinesis stream.

2. When to use Kafka vs. Kinesis?

Use Kafka for high-throughput, distributed streaming with complex event processing. Use Kinesis for simpler AWS-native integration, scalability, and ease of use in real-time streaming applications within the AWS ecosystem.

3. What is the use of AWS Kinesis?

AWS Kinesis is used for real-time data streaming, enabling you to collect, process, and analyze large streams of data.

Preetipadma Khandavilli
Technical Content Writer, Hevo Data

Preetipadma is a dedicated technical content writer specializing in the data industry. With a keen eye for detail and strong problem-solving skills, she expertly crafts informative and engaging content on data science. Her ability to simplify complex concepts and her passion for technology makes her an invaluable resource for readers seeking to deepen their understanding of data integration, analysis, and emerging trends in the field.