There are some scenarios in real life where messages must be sent with a delay or at a specific time, such as a smart water heater that needs to be turned on after 30 minutes, an unpaid order, or sending SMS, email, and push notifications for a sale that begins at 2:00 PM.

The RabbitMQ Delayed Message AMQP protocol lacks a native delayed queue feature, if you search for “how to delay/schedule messages in RabbitMQ,” you’ll most likely come across two possible solutions. The first solution is to use the combination of the message TTL function and the dead-lettering function. And, the second option is using the official RabbitMQ delayed messages exchange plugin.

This article goes into great detail about RabbitMQ Delayed Messages. Additionally, it provides a brief overview of RabbitMQ.

What is RabbitMQ?

RabbitMQ is an open-source message broker (also known as message-oriented middleware) that was created to support the Advanced Message Queuing Protocol (AMQP). It has since been expanded with a plug-in architecture to support protocols such as Simple (or Streaming) Text Oriented Message Protocol (STOMP), Message Query Telemetry Transport (MQTT), and others.

How do delayed messages work?

  • As an endpoint starts, the transport requires a solid infrastructure to support delayed messages. It declares a set of topic exchanges, queues, and bindings that work together for this.
  • Gives 28 delay levels by grouping exchanges and queues. Apart from the delay-level exchange, there is one final delivery exchange. 
  • The value of the desired delay gets converted to seconds when you need to delay a message. 
  • When the message is sent to the delay-level exchange, the binary representation of this value is used as part of the routing key. The format of the full routing key looks like:
N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.destination

Representing the delay value in binary, and destination is the name of endpoint of the delayed message when N is 0 or 1. Let’s take an example. The routing key is given below when there is a delay of 10 seconds (1010 in binary) on a message bound for the destination:

0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.0.1.0.destination

Delay levels

  • One bit of the total delay value is represented by each exchange/queue pair that makes up a level. 
  • The infrastructure can delay a message for any value that can be represented by a 28-bit number. This would be by having 28 of these levels, corresponding to 2^27 through 2^0.
  • When you create a delay level, you are also declaring a topic exchange that is bound to,
    •  a queue with a routing key of 1, 
    • to the exchange corresponding to level – 1 with a routing key of 0. 
  • Corresponding to 2^level seconds, the queue for the delay level is declared with an x-message-ttl value.
  • Corresponding to the level – 1 exchange, the queue is also declared with an x-dead-letter-exchange value. This is helpful to route the expired queues to the level – 1 exchange.
  • Within the range of 27 to 0, the delay levels are connected in this manner. Each delay level’s routing key adds wildcards as required. Therefore, each routing key is looking at the portion of the message’s routing key corresponding to its delay level.

You can see the relation between the exchanges and queues in the delay infrastructure in the diagram. Bindings and x-dead-letter-exchange values connect them. The wildcard rules for RabbitMQ binding expressions are given below, where word describes a dot-delimited segment:

  • * substitutes for exactly one word
  • # substitutes for 0 or more words
Data level exchange, Queues, Bindings and TTLs
  • There could be chances of unnecessary traversal through the delay infrastructure. To avoid this, the message is released to the first applicable exchange. This is done by identifying the first level that will route to a queue,i.e the first 1 in the routing key. 
  • In the 10-second example (binary 1010), the message is released to the nsb.v2.delay-level-03 exchange first. And, all of that exchange’s bindings are evaluated. As just 2 bindings exist for the exchange:
If the value is a 1The exchange will route the message to the nsb.v2.delay-level-03 queue, where it will wait until the TTL expires (2^3 seconds) before being forwarded to the nsb.v2.delay-level-02 exchange
If the value is a 0The exchange will route the message to the nsb.v2.delay-level-02 exchange

Delivery

At the final step, the message is routed to the nsb.v2.delay-delivery exchange.Then, the message destination is analyzed to decide where the message should be routed to.

To control the final routing, every endpoint that can receive delayed messages will create bindings like #.EndpointName to this exchange. The routing topology in use influences the correct process. Those bindings match any delay value combinations. But, the matched ones are with the correct destination endpoint and that makes sure that the message is being delivered only to the right endpoint.

How to Delay & Schedule RabbitMQ Delayed Messages?

Delay/Scheduling RabbitMQ Delayed Message
Image Source: Self

People have been looking for ways to implement delayed messaging with RabbitMQ for quite some time. To date, the accepted solution has been to use a combination of the message  — TTL and Dead Letter Exchanges  — as proposed by James Carr.

The RabbitMQ Delay Message Plugin adds a new exchange type to RabbitMQ that allows messages routed through that exchange to be delayed if the user so desires.

Let us see how the deployment and scheduling go with these 2 methods.

1) Using TTL and DLX to Delay Message Delivery

By combining these functions, we can publish a message to a queue that will expire its message after the TTL and then reroute it to the exchange with the dead-letter routing key so that it ends up in a queue.

  • Step-by-step instructions:
    • Declare the pending queue
    • Set the x-dead-letter-exchange argument property to the default value “”.
    • Set the argument property x-dead-letter-routing-key to the name of the destination queue.
    • Set the x-message-ttl argument property to the number of milliseconds you want the message to be delayed.
  • Sign up for the destination queue.

The following Ruby snippet, which relies on the excellent Bunny library, shows how to implement delayed messages.

#RabbitMQ Delayed Message
require 'bunny'

B = Bunny.new ENV['CLOUDAMQP_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name for RabbitMQ Delayed Message
  ch.queue(DELAYED_QUEUE, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 3000
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
  puts "#{Time.now}: Published the message"
  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue for RabbitMQ Delayed Message
  q = ch.queue DESTINATION_QUEUE, durable: true
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message"
  end
end

subscribe()
publish()

sleep
#RabbitMQ Delayed Message

2) RabbitMQ Delayed Message Plugin

So, let’s start with the second solution by installing the plugin, but first, let’s take a look at the following prerequisites:

  • Versions 3.5.8 and later of RabbitMQ.
  • Versions 18.0 and later of Erlang/OTP

Here are the steps to get started:

  • Plugin Installation
  • Making Use of the Exchange
  • Message Delays
  • Routing Flexibility
  • Checking Delayed Messages

2.1) Plugin Installation

To install the plugin, visit our Community Plugins page and download the .ez files for your RabbitMQ installation. Copy the plugin to RabbitMQ’s plugin folder, then run the following command to enable it:

#RabbitMQ Delayed Message
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

We can begin using the plugin once it has been enabled.

2.2) Making Use of the Exchange

To use the Delayed Message Exchange, simply declare an exchange with the “x-delayed-message” exchange type, as shown below:

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

Later on, we’ll explain what the special argument “x-delayed-type” in our exchange declaration means.

2.3) Message Delays

To delay a message, the user must publish it with the x-delay header, which accepts an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It’s worth noting that delay in this context means delaying message routing to queues or other exchanges. There is no concept of consumers in the exchange.

As a result, once the delay has passed, the plugin will attempt to route the message to the queues that match the routing rules of the exchange and the once assigned to the message. If the message cannot be routed to any queue, it will be discarded, as specified by AMQP for unroutable messages.

#RabbitMQ Delayed Message
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

The message will be delayed for five seconds in the preceding example before being routed by the plugin. That example assumes you’ve connected to RabbitMQ and obtained a channel.

2.4) Routing Flexibility

When we declared the exchange above, we used an x-delayed-type argument that was set to direct. That tells the exchange what kind of behavior we want it to have when routing messages, creating bindings, and so on.

2.5) Checking Delayed Messages

How can we tell if a message was delayed or not once we receive it on the consumer side? The x-delay message header is retained by the plugin, but the passed value is negated. So, if you send a message with a 5000-millisecond delay, the consumer will find the x-delay header set to -5000.

How to disable the Plugin?

Call rabbitmq-plugins disable rabbitmq_delayed_message_exchange to disable the plugin.But, you won’t be able to retrieve undelivered messages that are delayed messages.

Replicate Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 150+ Data Sources straight into your Data Warehouses such as Redshift, BigQuery, Snowflake, etc.

Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

GET STARTED WITH HEVO FOR FREE

Benefits of Delaying & Scheduling RabbitMQ Delayed Messages

RabbitMQ Delayed Message - Benefits of RabbitMQ Delayed Message
Image Source

Here are some benefits of delaying & scheduling RabbitMQ delayed messages:

  • High Availability and Performance: RabbitMQ Delayed Message Queue supports hierarchical delayed messages. It has high performance and can accumulate messages indefinitely for high availability. It also allows for scheduling within seconds in order to achieve highly precise mass storage.
  • Product Maturity is High: RabbitMQ Delayed & Scheduled messages have been widely used in various businesses and promotional scenarios throughout the economy, withstanding numerous traffic peaks and stability tests. RabbitMQ has evolved into the industry’s most reliable scheduled message system.
  • High-Tech Community Activity: RabbitMQ Delayed Message has created a complete technical ecosystem as a top-level project. Its scheduled messages naturally inherit the advantages of top-level projects, with multi-language and multi-protocol support, and feature a complete technical ecology.
  • Professional Technical Support: Cloud tech focuses on cloud computing, and RabbitMQ evolves in tandem with cloud storage’s technical architecture. While providing technical facilities, their strong technical teams also provide professional technical support to RabbitMQ customers.

Limitations of Delayed Messages

  • The delayed messages have a single disk replica on the present node when they are stored in a Mnesia table. The timers will be re-initialized during plugin activation on node start though those which triggered scheduled delivery are not persisted. When you have just one copy of a schedule message in a cluster, losing that node or disabling the plugin on it will discard the messages on that node. 
  • This plugin doesn’t support RAM nodes because it was created for disk nodes.
  • It attempts only once to publish each message. But, the only challenge in preventing the delivery is the lack of queues to route to, as the publishing is local.
  • We are unsure that there would be minimum one queue we can route to in the future publishing point in time and the original connection would be intact to send a basic.return to.
  • When there are huge number of delayed messages, current plugin design can’t help much.

Key Features of RabbitMQ

Here are some features of RabbitMQ:

  • Clustering: The clustering in RabbitMQ was designed with two aims in mind. If one node fails, the event’s consumers and producers can continue to function while adding other nodes to scale messaging throughput linearly.
  • Easy Routing: Messages are often routed through exchanges before arriving at queues using flexible routing. For more complicated routing, users can connect exchanges together or develop their exchange type as a plugin.
  • Reliability: Persistence, delivery feedback, publisher confirmation, and high availability are among RabbitMQ’s key features that have a direct impact on performance.
  • Security: Client Certificate Checking and SSL-only communication can assist secure client connections. The virtual host can regulate user access, ensuring high-level message isolation.

Conclusion

This article taught you about RabbitMQ Delayed Messages and how to delay and schedule messages using two different methods, as well as a brief introduction to RabbitMQ and its features. There are various Data Sources that organizations leverage to capture a variety of valuable data points. But, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task.

It requires you to code and maintains complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps in solving this issue and this is where Hevo comes into the picture.

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

visit our website to explore hevo

SIGN UP for a 14-day free trial and see the difference!

Share your experience of learning about RabbitMQ Delayed Messages in the comments section below.

mm
Former Research Analyst, Hevo Data

Davor is a data analyst at heart with a passion for data, software architecture, and writing technical content. He has experience writing more than 100 articles on data integration and infrastructure.

No-code Data Pipeline for your Data Warehouse