DynamoDB Streams: How To Sync Data Conveniently

• September 23rd, 2021

DynamoDB Streams_FI

When you want the data to be continuously available for use and for processing, the periodic data capture and movement are not useful, and that’s when you need real-time data sync between systems. That can be accomplished with the help of ‘DynamoDB Streams’ to set up a data streaming pipeline. 

This article is all about how you can sync the data in real-time using DynamoDB Streams. You will understand the entire infrastructure that is required to set up the sync between source and destination. You will go through the process step-by-step with examples.

Here is how this article is structured:

Table of Contents

Let’s take a look at this in detail.

What are DynamoDB Streams?

DynamoDB Streams | Hevo Data
Image Source: DEV Community

DynamoDB database system originated from the principles of Dynamo, a progenitor of NoSQL, and introduces the power of the cloud to the NoSQL database world. It falls under non-relational databases.

The streams are a feature of DynamoDB that emits events when record modifications occur on a DynamoDB table. The events can be of any type -insert, update, or remove and carry the content of the rows being modified.

You can perform the customization on the streams, let’s say if you want to see the record before the event and after the event so the change on the data is identifiable. The order of the events is in sequential order as the modifications happen.

Benefits Of DynamoDB Streams

Let’s look at some benefits of DynamoDB Streams:

  • Well suited for distributed systems’ infrastructure.
  • Faster and more efficient processing.
  • Better availability and scalability.
  • Enhanced performance.

Use Cases of DynamoDB Streams

DynamoDB Streams have a wide range of applications. Here are some examples of how DynamoDB streams may be used.

  • Invalidate a cache immediately.
  • Update the materialized view to reflect the write-side modifications.
  • Update a search index in real-time.
  • Modify and update the dashboard in real-time.
  • Perform on-the-fly aggregations and store the results in a different table.
  • Use the Transactional Outbox pattern in Microservices.

Example Use Case: Aggregating data into another table

Let’s imagine you have a lot of data and want to keep the count of tasks for each list in the ‘todoTaskCount‘ table for caching purposes. When a new task is added to the list, this table must be updated so that the application user may view the updated task count. The following is the table schema:

Partition KeyAttribute 1
ListIdTaskCount
12
12

Every time there is a change in the `todoListTask`table, a stream event is generated and handled by Lambda to update the aggregate table. The system architecture looks like this:

Image Source

To begin, you’ll make a new table called ‘todoTaskCount.’ Then, on this ‘todoListTask‘ table, you will enable DynamoDB Streams with only new photos (because you don’t need to see the old values). Finally, you will construct the Lambda function and assign the appropriate IAM roles to it.

What is the Lambda function?

DynamoDB Streams: Lambda function | Hevo Data
Image Source

AWS Lambda is an Amazon Web Services compute service that allows you to run applications without having to manage any servers. It’s a serverless, event-driven computing platform that executes code in response to events. The languages Python, Node.js, Java, Go, Ruby, and C# are officially supported by AWS Lambda.

AWS Lambda is a serverless computing solution that lets you use Lambda functions to do everything from serving web pages to processing data streams.

What is CloudWatch?

DynamoDB Streams: CloudWatch | Hevo Data

For DevOps engineers, developers, site reliability engineers (SREs), IT managers, and product owners, Amazon CloudWatch provides a monitoring and observability solution. To monitor your apps, respond to system-wide performance changes, and optimize resource use, CloudWatch offers you data and actionable insights.

CloudWatch Logs, Metrics, and Events are used to collect monitoring and operational data. You obtain total visibility of your AWS resources, apps, and services running on AWS and on-premises, as well as a single picture of operational health. To keep your apps running smoothly, you can use CloudWatch to detect aberrant behavior in your environments, trigger alarms, analyze logs and metrics side by side, take automated actions, troubleshoot issues, and find insights.

Download the Ultimate Guide on Database Replication
Download the Ultimate Guide on Database Replication
Download the Ultimate Guide on Database Replication
Learn the 3 ways to replicate databases & which one you should prefer.

Replicate Dynamo DB Data in Minutes using Hevo’s Data Pipelines

Hevo can be your go-to tool if you’re looking for Data Replication from 100+ Data Sources (including 40+ Free Data Sources) into Amazon Redshift, Snowflake, and many other databases and warehouse systems. Hevo supports Real-time streaming. To further streamline and prepare your data for analysis, you can process and enrich Raw Granular Data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

With Hevo in place, you can reduce your Data Extraction, Cleaning, Preparation, and Enrichment time & effort by many folds! In addition, Hevo’s native integration with BI & Analytics Tools such as Tableau will empower you to mine your replicated data and easily employ Predictive Analytics to get actionable insights. With Hevo as one of the DynamoDB ETL tools replication of data becomes easier.

Get Started with Hevo for Free

Prerequisites

It would be helpful if you have an idea about the following areas before you want to learn the real-time data sync using DynamoDB Streams.

  1. DynamoDB as a source system.
  2. AWS Lambda function.
  3. Any other system like CloudWatch, ElasticSearch, RDS, MySQL, DynamDB itself as a destination. In this article, you will consider the CloudWatch as a destination system where the data will be synced.
  4. Knowledge about Node.js.
  5. You will use CLI to run commands.

Set-Up For Real-Time Data Sync

You will quickly take a look at how the data will flow along with the systems involved.

Whenever the CRUD operation happens on the DynamoDB table, the change is recorded in the form of events in the DynamoDB Stream. The Lambda function then connects to the other system and sends the data accordingly to that destination system.

Different Process for Real-time Data Sync for DynamoDB Streams | Hevo Data
Image Source: Self

Below are the high-level details of the set-up without many technicalities to understand it at a glance.

  • Enable the DynamoDB Stream in the DynamoDB Console.
  • Read change events that are occurring on the table in real-time. Every time an insertion happens, you can get an event.
  • Hook up a Lambda to DynamDB Stream. Every time an event occurs, you have a Lamda that gets involved.
  • Lamda’s arguments are the content of the change that occurred.
  • Replicate the content in the datastore. (The other system could be CloudWatch, DynamoDB, SNS, Kinesis, etc, depending on the use case).

Now let’s dig into all of these steps by step with pieces of codes that you can understand in detail.

Step 1: Create IAM Role

In the AWS management console, you will need an IAM role to execute the Lambda function. Let’s create an IAM role first for the AWS Lambda entity. 

IAM role for entity AWS Lambda:

role name: lambda-dynamodb-role

policy with permission: AWSLambdaDynamoDBExecutionRole.

DynamoDB Streams: Create IAM Role | Hevo Data
Image Source: Amazon AWS

Step 2: Create AWS Lambda Function

Now you will create an AWS Lambda function for which you will use Node.js to write code. The Node.js is one of the languages supported in the AWS Lambda function.

Please note that you can use any other language supported in the Lambda function.

Let’s create index.js with the following piece of code and compress it using any compress utility. The file can be compressed with any tool that you may have or want to use.

console.log('Loading function');
exports.handler = function(event, context, callback) {
console.log(JSON.stringify(event, null, 2));
 event.Records.forEach(function(record) {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log('DynamoDB Record: %j', record.dynamodb);
    });
    callback(null, "message"); 
};

You will do it using the java jar command. Here is the command that you will use in the CLI tool:

jar -cfM function.zip index.js

If you look at the command closely, you have mentioned which file needs to be compressed and what would be the name of the compressed file.

DynamoDB Streams: AWS Lambda Function Process | Hevo Data
Image Source: AWS Amazon

What makes Hevo’s Data Replication Experience Unique?

Replicating data can be a tiresome task without the right set of tools. Hevo’s automated platform empowers you with everything you need to have a smooth Data Collection, Processing, and Replication experience. Hevo supports Real-time streaming capabilities for DynamoDB as well. Our platform has the following in store for you!

  • Exceptional Security: A Fault-tolerant Architecture that ensures Zero Data Loss.
  • Built to Scale: Exceptional Horizontal Scalability with Minimal Latency for Modern-data Needs.
  • Built-in Connectors: Support for 100+ Data Sources, including Databases, SaaS Platforms, Files & More. Native Webhooks & REST API Connector available for Custom Sources.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Auto Schema Mapping: Hevo takes away the tedious task of schema management & automatically detects the format of incoming data and replicates it to the destination schema. You can also choose between Full & Incremental Mappings to suit your Data Replication requirements.
  • Blazing-fast Setup: Straightforward interface for new customers to work on, with minimal setup time.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Step 3: Run Lambda Create-Function

You will now run Lambda create-function CLI command with IAM role ARN:

aws lambda create-function --function-name ProcessDynamoDBRecords --zip-file fileb://function.zip --handler index.handler --runtime nodejs8.10 --role arn:aws:iam::392821317968:role/lambda-dynamodb-role

ARN needs to be taken from the role that you created in the AWS console. 

DynamoDB Streams: Run Lambda Create-Function | Hevo Data
Image Source: Amazon AWS

Once you run the command, the lambda function will be created, and you need to test it by invoking the function. Here is the command for the same:

aws lambda invoke --function-name ProcessDynamoDBRecords --payload file://input.txt outputfile.txt

This command will take the sample DynamoDB data, which is there in the input.txt. The same will be passed to the lambda function, which will then write the output in the outputfile.txt as per the input file and function execution.

This step is just for testing to ensure that your function is working as expected. It has no relation to the real-time data sync that you are covering in this blog.

Step 4: Create DynamoDB Table

Next step is to create a DynamoDB table with following details:

Table name: lambda-dynamodb-stream

Primary key: id (number)

DynamoDB Streams: Create DynamoDB Table
Image Source: Amazon AWS

Step 5: Enable DynamoDB Streams

Now enable the DynamoDB Stream as shown below:

Enabling DynamoDB Streams | Hevo Data
Image Source: GeekforGeeks

Once the stream is enabled by clicking on the “Manage Stream” button, copy the Latest Stream ARN as shown in the screenshot:

DynamoDB Streams: Enable DynamoDB Streams
Image Source: Rubyonjets

Step 6: Event Mapping Of Lambda Function

The Latest Stream ARN copied in the previous step will be used to create event mapping of the Lambda function with the stream.

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords --batch-size 100 --starting-position LATEST --event-source arn:aws:dynamodb:ap-south-1:392821317968:table/lambda-dynamodb-stream/stream/2019-06-29T12:39:47.680

This means that if any event happens, it will cause the function to be triggered, to execute which can then sync the data to the destination system.

Verify the list of event mapping by executing the following commands.

aws lambda list-event-source-mappings
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

Step 7: Set CRUD Operations

Now you have set up everything. You need to try CRUD operations on the DynamoDB table.

See whether the data logs are getting captured in the AWS CloudWatch dashboard, which is made available for monitoring and insights.

DynamoDB Streams: Set CRUD Operations | Hevo Data
Image Source: Amazon AWS

Step 8: Sync Data In Real-Time

You will see the data inserted into the table on DynamoDB is being synced in real-time in the CloudWatch logs as shown in the screenshot below:

DynamoDB Streams: Sync Data In Real-Time | Hevo Data
Image Source: Medium

Understanding Anatomy of DynamoDB Streams

Shards

Shards make up the Stream. Each Shard is made up of Records, each of which corresponds to a single data update in the stream’s table.

AWS creates and deletes shards automatically. Shards can also split into several shards on their own, and this happens without human intervention.

Contents of a Stream Record

Furthermore, while constructing a stream, you have limited options for what data should be sent to it. Among the possibilities are:

  • OLD_IMAGE: Stream records will have an item that hasn’t been changed.
  • NEW_IMAGE: After an item has been updated, stream records will contain it.
  • NEW_AND_OLD_IMAGES: Stream records will include snapshots taken before and after the change.
  • KEYS_ONLY: The name says it all.

Understanding DynamoDB Lambda Trigger

Due to its event-driven nature, DynamoDB Streams pairs nicely with AWS Lambda. The scale is in proportion to the amount of data pushed through the stream, and streams are only used when data needs to be processed.

To subscribe your Lambda function to DynamoDB streams in Serverless Framework, use the following syntax:

functions:
  compute:
    handler: handler.compute
    events:
      - stream: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
      - stream:
          type: dynamodb
          arn:
            Fn::GetAtt: [MyDynamoDbTable, StreamArn]
      - stream:
          type: dynamodb
          arn:
            Fn::ImportValue: MyExportedDynamoDbStreamArnId


The following is an example of an event that will be provided to your Lambda function:

[
  {
    "eventID": "fe981bbed304aaa4e666c0ecdc2f6666",
    "eventName": "MODIFY",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "us-east-1",
    "dynamodb": {
      "ApproximateCreationDateTime": 1576516897,
      "Keys": {
        "sk": {
          "S": "sk"
        },
        "pk": {
          "S": "pk"
        }
      },
      "NewImage": {
        "sk": {
          "S": "sk"
        },
        "pk": {
          "S": "pk"
        },
        "List": {
          "L": [
            {
              "S": "FirstString"
            },
            {
              "S": "SecondString"
            }
          ]
        },
        "Map": {
          "M": {
            "Name": {
              "S": "Rafal"
            },
            "Age": {
              "N": "30"
            }
          }
        },
        "IntegerNumber": {
          "N": "223344"
        },
        "String": {
          "S": "Lorem Ipsum"
        },
        "StringSet": {
          "SS": ["Test1", "Test2"]
        }
      },
      "SequenceNumber": "125319600000000013310543218",
      "SizeBytes": 200,
      "StreamViewType": "NEW_IMAGE"
    },
    "eventSourceARN": "arn:aws:dynamodb:us-east-1:1234567890:table/my-test-table/stream/2021-12-02T00:00:00.000"
  }
]

There are a few key elements to remember about this event:

  • INSERT, MODIFY, or REMOVE are the three possible values for the eventName property.
  • The main key of the record that was edited by dynamodb is stored in the dynamodb.Keys property.
  • The new values of the record that were updated are stored in the NewImage attribute. It’s important to note that this data is in DynamoDB JSON format rather than regular JSON. To convert it to normal JSON, use a tool like Jeremy Daly’s dynamodb-streams-processor.

How are DynamoDB Streams better than Kinesis Streams?

Think of real-time analytics with Apache Flink when you hear the term “data analytics.” Video streams are simple to understand because their applications all revolve around video processing and machine learning. In contrast to Kinesis, DynamoDB Streams creates a log of changes made, which can then be used to activate other services.

Best Practices for DynamoDB Streams

  • Be aware of the solution’s eventual consistency. The events generated by DynamoDB Streams are near-real-time but not real-time. The time of the event and the time of the event delivery will be separated by a little amount of time.
  • Be careful of the limitations: events in the stream are only kept for 24 hours, and only two processes can read from a single stream shard at the same time.
  • Use one Lambda function per DynamoDB Stream to ensure the optimal separation of responsibilities. It will assist you in keeping IAM permissions to a minimum.
  • Deal with failures. Wrap the entire processing mechanism in a try/catch clause, save unsuccessful events in a DLQ (Dead Letter Queue), and try again later.
  • Across regions, standardize the settings and indexes used in Global tables (e.g., TTL, local, or global secondary index; provisioned throughput).
  • Since TTL does not happen in real-time, use a ‘expires’ table property to filter for expired objects.

FAQs for DynamoDB Streams

What are the prices of DynamoDB streams?

The basis for DynamoDB Streams is “Read Request Units.” Visit this DynamoDB Pricing Calculator to learn more about them.

What is the best way to see DynamoDB Stream’s metrics?

Metrics for DynamoDB Stream can be found in two places:

  • In the Metrics tab of the AWS DynamoDB Console
  • AWS Cloudwatch is a service provided by Amazon Web Services.

What are the assurances for DynamoDB Stream delivery?

DynamoDB Streams use an exactly-once delivery mode, which means that just one event is sent to your subscribers for each data alteration.

Is it possible to filter DynamoDB Stream events that arrive at the Lambda function?

Yes, you may filter events arriving into your Lambda function using “event source mapping” and a filter criterion. The syntax is the same as EventBridge event patterns.

Conclusion

With the above benefits, the DynamoDB Streams stand out with efficiency and performance. However, one needs to write custom code to manage the data changes and deal with keeping the data warehousing engines in sync wherever there are changes in business requirements. So, if you don’t want to deal with the hassle of setting up code, try Hevo.

visit our website to explore hevo

Hevo is a No-code Data Pipeline. It can migrate the data from DynamoDB to your desired data warehouse in minutes. It offers hassle-free data integration from 100+ data sources.

SIGN UP for a 14-day free trial and see the difference!

Let us know about your experience with DynamoDB Streams in the comment section below.

No-code Data Pipeline for DynamoDB