How to Build a Message Queue using Python? | Made Easy

on Message Queue, Python • April 21st, 2022 • Write for Hevo

Python Message Queue - Featured Image

Python Message Queue – What it is all about? And, how can one build a message queue for effective communication using Python programming language?

Continue reading..

Modern-day applications rely on inputs from several other software to perform desired functions. These inputs are received in the form of triggers to start a new action in the application that receives the inputs/messages. However, it is necessary to manage these messages to ensure the end application is receiving the required information for further processing. This is where message queues help maintain seamless communication between two or more applications. This article will talk about how you can build a Python Message Queue for effective communication.

Table of Contents

Prerequisites

  • Basics of Python programming.

What is a Message Queue?

A message queue is an asynchronous system that allows data to move between different services and applications.

Message Queue Process | Hevo Data
Image Source

The service that sends the message is known as the producer worker, while the service that receives and responds to messages is known as the consumer worker. This is how the services communicate with one another. However, workable MQs must be extremely scalable in order to allocate the appropriate resources to the number of messages delivered and received. Depending on their loads, you may scale up or down the number of producer and consumer personnel running. 

Your system will execute synchronously if you don’t use a message queue. Even though synchronous programming is designed to be simple, it is not particularly efficient.

Take, for example, registering for an online service. When you finish filling out your information and click “Sign up,” the system will send you an email and establish a database record for you. Assume you typed in an incorrect, inaccessible email address. For around 10 seconds, the system will try to send you the activation email again. Before being forwarded to the next page, you must wait at least 10 seconds for the system to finish attempting to send an email as a new user. Isn’t that an unsatisfactory user experience? Waiting is a characteristic of an unreliable system.

But not to worry, MQs are asynchronous systems that handle a range of jobs. The user does not have to wait for one job to finish before moving on to the next. Let us discuss a message queue example of placing an order through the Internet. When you click the “purchase now” button, data is transferred to shipping and distribution networks, but you don’t have to wait for it. Your order is immediately confirmed, and you generally receive an email confirmation and an order reference. A message queue will transport these various pieces of data to their destinations. It distributes them among systems and services, resulting in a more pleasant user experience and a more stable system.

It is important that MQs must be persistent. This indicates that the MQ will keep attempting to deliver to the customer even if it isn’t currently available. It’s possible that this is due to a system outage. Persistent MQs, such as IronMQ, will keep attempting until they obtain confirmation from the customer that the data has been received. A persistent message queue (MQ) is a secure technique to distribute duties across a system or network of computers.

There is a middle-man or a message broker required for the MQs, and this is where RabbitMQ comes into the picture.

Hevo Simplifies ETL, Making Data Analysis Hassle-free, Try it Today!

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

Get Started with Hevo for Free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

What is RabbitMQ?

RabbitMQ is a lightweight message broker, which sends and receives messages. It is one of the most popular open-source message brokers, with tens of thousands of users. To address high-scale and high-availability needs, RabbitMQ may also be implemented in a distributed environment. RabbitMQ supports a broad range of OS systems, cloud settings, and a comprehensive set of developer tools for the many common programming languages.

Some of RabbitMQ’s alternatives are Apache Kafka, ActiveMQ, KubeMQ, IBM MQ, ZeroMQ, and much more.

We will discuss some of the major terms used in RabbitMQ:

  • Producing is just the act of sending. A producer is a software that transmits messages.
  • Queue is referred to as a post box. Messages pass between RabbitMQ and your apps, but they can only be kept in queues. A queue is effectively a big message buffer that is often limited by the host’s RAM and disc space.
  • Receiving and consuming have comparable meanings. A consumer is a software that mostly waits for messages to arrive.

Let us understand some of the major features of RabbitMQ.

  1. Developer Experience: BOSH, Chef, Docker, and Puppet can easily be deployed on RabbitMQ. You can also create cross-language communications using popular programming languages like Java,.NET, PHP, JavaScript, Ruby, Go, and Python Message Queues.
  2. Distributed Deployment: For high availability and throughput, you can deploy it as clusters and also federate across many availability zones and regions.
  3. Asynchronous Messaging: RabbitMQ supports numerous messaging protocols, message queuing, delivery acknowledgment, configurable queue routing, etc.
  4. Enterprise & Cloud Ready: RabbitMQ also supports authentication, authorization, TLS, and LDAP. It is lightweight and simple to implement in both public and private clouds.
  5. Tools & Plugins: Continuous integration, operational metrics, and interaction with other corporate systems are all supported by a diverse set of tools and plugins. RabbitMQ capabilities may be extended via a flexible plugin method.
  6. Supports Monitoring: RabbitMQ is managed and monitored via an HTTP-API, a command-line tool, and a UI.

Steps to Build a Python Message Queue

You can follow the steps given below to get started with building a Python Message Queue:

Step 1: Install RabbitMQ via their official website. You’ll be using Pika 1.0.0 in this tutorial series, which is the RabbitMQ team’s preferred Python client. You may use the pip package management tool to install it:

python -m pip install pika --upgrade

Now you have Pika installed, we can start writing the code.

A single message “Hello World” is sent by a producer (sender), and messages are received and printed by a consumer (receiver).

As you can see in the figure below, the producer is “P” and the consumer is “C”. The queue in the centre is a message buffer maintained by RabbitMQ on behalf of the consumer.

Messages are sent to the “hello” queue by the producer. Messages from that queue are delivered to the customer.

Step 2: Sending, send.py is your first program, and it will send one message to your Python Message Queue. The first step is to create a connection with the RabbitMQ server.

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

You’ve established a connection with a broker running on the local system, which is the localhost. If you wanted to connect to a broker on a different computer, you’d just type its name or IP address.

Before transmitting, you must first verify that the recipient Python Message Queue exists. If you send a message to an address that doesn’t exist, RabbitMQ will just ignore it. Just start by making a hello queue to which the message will be sent:

channel.queue_declare(queue='hello')

You’re ready to send a message at this moment. The first message you will send is the phrase “Hello World!”.

A message may never be sent directly to the queue in RabbitMQ; it must always go through an exchange. But don’t get confused with the technicalities; all you need to know now is how to utilize an empty string to identify a default exchange. This exchange is unique in that it allows us to designate which queue the message should be sent to. In the routing key argument, the queue name must be specified:

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")

You must ensure that the network buffers have been flushed and that your message has been sent to RabbitMQ before quitting the application. By carefully shutting the link, you may accomplish this.

connection.close()

Step 3: Receiving; the second program, receive.py, will read messages from the Python Message Queue and display them on the screen. Again, you must first establish a connection to the RabbitMQ server. The code that connects to Rabbit is the same as it was before.

The next step is to double-check that the queue exists. You can simply use queue_declare to perform this action.

channel.queue_declare(queue='hello')

You might wonder why you need to declare the queue again since you previously did so in your code. If you were certain that the queue already exists, you could avoid performing this action. For example, if the send.py application has been previously executed but you’re still unsure on which software to run it on, you can simply declare the queue again in both applications.

It’s more difficult to get messages from the queue. It works by having a callback function subscribed to a queue. The Pika library calls this callback method whenever you get a message. In our situation, this function will print the contents of the message on the screen.

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

After that, you must inform RabbitMQ that this callback function should receive messages from your hello queue:

channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)

To make that command work, you must first verify that the queue you wish to subscribe to exists. Fortunately, you will be confident since you’ve used queue declare to build the Python Message Queue.

Finally, you can begin a never-ending loop that awaits data and executes callbacks as needed, as well as catching KeyboardInterrupt after the program shutdown.

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

	if __name__ == '__main__':
    try:
        main()
 	    except KeyboardInterrupt:
 	       print('Interrupted')
                   try:
 	           sys.exit(0)
                   except SystemExit:
            	os._exit(0)

Here’s What Makes Hevo’s ETL Solution Unique

Manually connecting data sources to Databases and then creating Data Pipelines is a mammoth task. Experience Hevo’s automated No Code Data Pipeline solution that not only helps you replicate data from a vast array of sources but also automates the ETL process and you don’t have to write a single line of code.

Check out why Hevo is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the schema of your Relational Database.
  • Auto Schema Mapping: Hevo takes away the tedious task of schema management & automatically detects the format of incoming data from a source or Relational Database and replicates it to the destination schema. 
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, E-Mail, and support calls.

Want to take Hevo for a spin? Sign Up For a 14 Day Free Trial and experience the feature-rich Hevo.

Step 4: Putting it Together; now you can use a terminal to test your applications. Let’s start with a consumer who will be waiting for delivery indefinitely:

python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Hello World!'

Now it’s time to start the producer. After each run, the producer software will come to a halt:

python send.py
# => [x] Sent 'Hello World!'

This is how you can design a Python Message Queue based on RabbitMQ.

Conclusion

Through RabbitMQ as a message broker and Python as your programming language, you will be able to send your first message via a queue. The main purpose of a message broker is to take incoming messages from applications and perform some action on them. Although this was a simple example to build a Python Message Queue and send a message, you can use this article as a base to further build more complex workflows with Python Message Queues. 

Visit our Website to Explore Hevo

Hevo Data with its strong integration with 100+ Data Sources (including 40+ Free Sources) allows you to not only export data from your desired data sources & load it to the destination of your choice but also transform & enrich your data to make it analysis-ready. Hevo also allows integrating data from non-native sources using Hevo’s in-built REST API & Webhooks Connector. You can then focus on your key business needs and perform insightful analysis. 

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Share your experience of building your Python Message Queue! Let us know in the comments below!

No-code Data Pipeline For your Data Warehouse