There are some scenarios in real life where messages must be sent with a delay or at a specific time, such as a smart water heater that needs to be turned on after 30 minutes, an unpaid order, or sending SMS, email, and push notifications for a sale that begins at 2:00 PM.
The RabbitMQ Delayed Message AMQP protocol lacks a native delayed queue feature, if you search for “how to delay/schedule messages in RabbitMQ,” you’ll most likely come across two possible solutions. The first solution is to use the combination of the message TTL function and the dead-lettering function. And, the second option is using the official RabbitMQ delayed messages exchange plugin.
This article goes into great detail about RabbitMQ Delayed Messages. Additionally, it provides a brief overview of RabbitMQ.
What is RabbitMQ?
RabbitMQ is an open-source message broker (also known as message-oriented middleware) that was created to support the Advanced Message Queuing Protocol (AMQP). It has since been expanded with a plug-in architecture to support protocols such as Simple (or Streaming) Text Oriented Message Protocol (STOMP), Message Query Telemetry Transport (MQTT), and others.
How do delayed messages work?
- As an endpoint starts, the transport requires a solid infrastructure to support delayed messages. It declares a set of topic exchanges, queues, and bindings that work together for this.
- Gives 28 delay levels by grouping exchanges and queues. Apart from the delay-level exchange, there is one final delivery exchange.
- The value of the desired delay gets converted to seconds when you need to delay a message.
- When the message is sent to the delay-level exchange, the binary representation of this value is used as part of the routing key. The format of the full routing key looks like:
N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.N.destination
Representing the delay value in binary, and destination is the name of endpoint of the delayed message when N is 0 or 1. Let’s take an example. The routing key is given below when there is a delay of 10 seconds (1010 in binary) on a message bound for the destination:
0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.0.1.0.destination
Delay levels
- One bit of the total delay value is represented by each exchange/queue pair that makes up a level.
- The infrastructure can delay a message for any value that can be represented by a 28-bit number. This would be by having 28 of these levels, corresponding to 2^27 through 2^0.
- When you create a delay level, you are also declaring a topic exchange that is bound to,
- a queue with a routing key of 1,
- to the exchange corresponding to level – 1 with a routing key of 0.
- Corresponding to 2^level seconds, the queue for the delay level is declared with an x-message-ttl value.
- Corresponding to the level – 1 exchange, the queue is also declared with an x-dead-letter-exchange value. This is helpful to route the expired queues to the level – 1 exchange.
- Within the range of 27 to 0, the delay levels are connected in this manner. Each delay level’s routing key adds wildcards as required. Therefore, each routing key is looking at the portion of the message’s routing key corresponding to its delay level.
You can see the relation between the exchanges and queues in the delay infrastructure in the diagram. Bindings and x-dead-letter-exchange values connect them. The wildcard rules for RabbitMQ binding expressions are given below, where word describes a dot-delimited segment:
- * substitutes for exactly one word
- # substitutes for 0 or more words
- There could be chances of unnecessary traversal through the delay infrastructure. To avoid this, the message is released to the first applicable exchange. This is done by identifying the first level that will route to a queue,i.e the first 1 in the routing key.
- In the 10-second example (binary 1010), the message is released to the nsb.v2.delay-level-03 exchange first. And, all of that exchange’s bindings are evaluated. As just 2 bindings exist for the exchange:
If the value is a 1 | The exchange will route the message to the nsb.v2.delay-level-03 queue, where it will wait until the TTL expires (2^3 seconds) before being forwarded to the nsb.v2.delay-level-02 exchange |
If the value is a 0 | The exchange will route the message to the nsb.v2.delay-level-02 exchange |
Hevo Data, a No-Code Data Pipeline Platform, empowers you to ETL your data from a multitude of sources to Databases, Data Warehouses, or any other destination of your choice in a completely hassle-free & automated manner.
Check out what makes Hevo amazing:
- It has a highly interactive UI that is easy to use.
- It streamlines your data integration task and allows you to scale horizontally.
- The Hevo team is available around the clock to provide exceptional support to you.
Hevo has been rated 4.7/5 on Capterra. Know more about our 2000+ customers and give us a try.
GET STARTED WITH HEVO FOR FREE
Delivery
At the final step, the message is routed to the nsb.v2.delay-delivery exchange.Then, the message destination is analyzed to decide where the message should be routed to.
To control the final routing, every endpoint that can receive delayed messages will create bindings like #.EndpointName to this exchange. The routing topology in use influences the correct process. Those bindings match any delay value combinations. But, the matched ones are with the correct destination endpoint and that makes sure that the message is being delivered only to the right endpoint.
How to Delay & Schedule RabbitMQ Delayed Messages?
People have been looking for ways to implement delayed messaging with RabbitMQ for quite some time. To date, the accepted solution has been to use a combination of the message — TTL and Dead Letter Exchanges — as proposed by James Carr.
The RabbitMQ Delay Message Plugin adds a new exchange type to RabbitMQ that allows messages routed through that exchange to be delayed if the user so desires.
Let us see how the deployment and scheduling go with these 2 methods.
1) Using TTL and DLX to Delay Message Delivery
By combining these functions, we can publish a message to a queue that will expire its message after the TTL and then reroute it to the exchange with the dead-letter routing key so that it ends up in a queue.
- Step-by-step instructions:
- Declare the pending queue
- Set the x-dead-letter-exchange argument property to the default value “”.
- Set the argument property x-dead-letter-routing-key to the name of the destination queue.
- Set the x-message-ttl argument property to the number of milliseconds you want the message to be delayed.
- Sign up for the destination queue.
The following Ruby snippet, which relies on the excellent Bunny library, shows how to implement delayed messages.
#RabbitMQ Delayed Message
require 'bunny'
B = Bunny.new ENV['CLOUDAMQP_URL']
B.start
DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'
def publish
ch = B.create_channel
# declare a queue with the DELAYED_QUEUE name for RabbitMQ Delayed Message
ch.queue(DELAYED_QUEUE, arguments: {
# set the dead-letter exchange to the default queue
'x-dead-letter-exchange' => '',
# when the message expires, set change the routing key into the destination queue name
'x-dead-letter-routing-key' => DESTINATION_QUEUE,
# the time in milliseconds to keep the message in the queue
'x-message-ttl' => 3000
})
# publish to the default exchange with the the delayed queue name as routing key,
# so that the message ends up in the newly declared delayed queue
ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
puts "#{Time.now}: Published the message"
ch.close
end
def subscribe
ch = B.create_channel
# declare the destination queue for RabbitMQ Delayed Message
q = ch.queue DESTINATION_QUEUE, durable: true
q.subscribe do |delivery, headers, body|
puts "#{Time.now}: Got the message"
end
end
subscribe()
publish()
sleep
#RabbitMQ Delayed Message
- Setup:
- Connects to RabbitMQ using
CLOUDAMQP_URL
.
- Defines
DELAYED_QUEUE
and DESTINATION_QUEUE
.
- Publish Method:
- Creates a channel and declares the delayed queue with TTL (3000 ms).
- Publishes a message to the delayed queue.
- Subscribe Method:
- Declares the destination queue and listens for messages.
- Prints a timestamp when a message is received.
- Execution:
- Calls
subscribe()
to start listening and publish()
to send a message.
- Uses
sleep
to keep the script running.
2) RabbitMQ Delayed Message Plugin
So, let’s start with the second solution by installing the plugin, but first, let’s take a look at the following prerequisites:
- Versions 3.5.8 and later of RabbitMQ.
- Versions 18.0 and later of Erlang/OTP
Here are the steps to get started:
- Plugin Installation
- Making Use of the Exchange
- Message Delays
- Routing Flexibility
- Checking Delayed Messages
2.1) Plugin Installation
To install the plugin, visit our Community Plugins page and download the .ez files for your RabbitMQ installation. Copy the plugin to RabbitMQ’s plugin folder, then run the following command to enable it:
#RabbitMQ Delayed Message
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
We can begin using the plugin once it has been enabled.
2.2) Making Use of the Exchange
To use the Delayed Message Exchange, simply declare an exchange with the “x-delayed-message” exchange type, as shown below:
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
Later on, we’ll explain what the special argument “x-delayed-type” in our exchange declaration means.
2.3) Message Delays
To delay a message, the user must publish it with the x-delay header, which accepts an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It’s worth noting that delay in this context means delaying message routing to queues or other exchanges. There is no concept of consumers in the exchange.
As a result, once the delay has passed, the plugin will attempt to route the message to the queues that match the routing rules of the exchange and the once assigned to the message. If the message cannot be routed to any queue, it will be discarded, as specified by AMQP for unroutable messages.
#RabbitMQ Delayed Message
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
- Message Creation:
byte[] messageBodyBytes = "delayed payload".getBytes();
- Converts the string
"delayed payload"
into a byte array for sending as a message.
- Properties Builder:
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
- Initializes a builder for setting message properties.
- Setting Headers:
headers = new HashMap<String, Object>();
- Creates a new map for custom headers.
headers.put("x-delay", 5000);
- Adds a header specifying the delay (5000 milliseconds or 5 seconds) before the message is delivered.
- Publish Message:
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
- Publishes the message to the specified exchange (
"my-exchange"
) with the given properties and payload.
The message will be delayed for five seconds in the preceding example before being routed by the plugin. That example assumes you’ve connected to RabbitMQ and obtained a channel.
2.4) Routing Flexibility
When we declared the exchange above, we used an x-delayed-type argument that was set to direct. That tells the exchange what kind of behavior we want it to have when routing messages, creating bindings, and so on.
2.5) Checking Delayed Messages
How can we tell if a message was delayed or not once we receive it on the consumer side? The x-delay message header is retained by the plugin, but the passed value is negated. So, if you send a message with a 5000-millisecond delay, the consumer will find the x-delay header set to -5000.
How to disable the Plugin?
Call rabbitmq-plugins disable rabbitmq_delayed_message_exchange to disable the plugin.But, you won’t be able to retrieve undelivered messages that are delayed messages.
Benefits of Delaying & Scheduling RabbitMQ Delayed Messages
Here are some benefits of delaying & scheduling RabbitMQ delayed messages:
- High Availability and Performance: RabbitMQ Delayed Message Queue supports hierarchical delayed messages. It has high performance and can accumulate messages indefinitely for high availability. It also allows for scheduling within seconds in order to achieve highly precise mass storage.
- Product Maturity is High: RabbitMQ Delayed & Scheduled messages have been widely used in various businesses and promotional scenarios throughout the economy, withstanding numerous traffic peaks and stability tests. RabbitMQ has evolved into the industry’s most reliable scheduled message system.
- High-Tech Community Activity: RabbitMQ Delayed Message has created a complete technical ecosystem as a top-level project. Its scheduled messages naturally inherit the advantages of top-level projects, with multi-language and multi-protocol support, and feature a complete technical ecology.
- Professional Technical Support: Cloud tech focuses on cloud computing, and RabbitMQ evolves in tandem with cloud storage’s technical architecture. While providing technical facilities, their strong technical teams also provide professional technical support to RabbitMQ customers.
Limitations of Delayed Messages
- The delayed messages have a single disk replica on the present node when they are stored in a Mnesia table. The timers will be re-initialized during plugin activation on node start though those which triggered scheduled delivery are not persisted. When you have just one copy of a schedule message in a cluster, losing that node or disabling the plugin on it will discard the messages on that node.
- This plugin doesn’t support RAM nodes because it was created for disk nodes.
- It attempts only once to publish each message. But, the only challenge in preventing the delivery is the lack of queues to route to, as the publishing is local.
- We are unsure that there would be minimum one queue we can route to in the future publishing point in time and the original connection would be intact to send a basic.return to.
- When there are huge number of delayed messages, current plugin design can’t help much.
Key Features of RabbitMQ
Here are some features of RabbitMQ:
- Clustering: The clustering in RabbitMQ was designed with two aims in mind. If one node fails, the event’s consumers and producers can continue to function while adding other nodes to scale messaging throughput linearly.
- Easy Routing: Messages are often routed through exchanges before arriving at queues using flexible routing. For more complicated routing, users can connect exchanges together or develop their exchange type as a plugin.
- Reliability: Persistence, delivery feedback, publisher confirmation, and high availability are among RabbitMQ’s key features that have a direct impact on performance.
- Security: Client Certificate Checking and SSL-only communication can assist secure client connections. The virtual host can regulate user access, ensuring high-level message isolation.
Conclusion
This article taught you about RabbitMQ Delayed Messages and how to delay and schedule messages using two different methods, as well as a brief introduction to RabbitMQ and its features. Organizations leverage various data sources to capture a variety of valuable data points. However, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task.
It requires you to code and maintain complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps solve this issue, and this is where Hevo comes into the picture. Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.
FAQs
1. How to delay messages in RabbitMQ?
To delay messages, use RabbitMQ’s plugin called rabbitmq_delayed_message_exchange. This allows messages to be delayed by setting the x-delayed-type header in your exchange configuration.
2. Why am I receiving delayed messages?
Delayed messages may occur due to network latency, message queue congestion, or the intentional use of delayed message settings to control message delivery timing.
3. What is the delay limit for RabbitMQ?
RabbitMQ doesn’t impose a strict delay limit, but the delay length is influenced by server memory and configuration.
Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.