Magneto is a leading e-commerce builder software that has millions of users. It provides an improved solution for B2C and B2B communications. This software solution provides the tools necessary to drive the business forward.

A Message Queue is an asynchronous method of communication wherein there is no direct communication between sender and receiver. This article provides a comprehensive guide on Magneto 2, Message Queues, and steps to create a Magento 2 Queue.

What is Magento 2 (Adobe Commerce)?

Magneto 2

Initially released on March 31, 2008, Magento 2 is now known as Adobe Commerce. It’s an eCommerce software built using PHP programming language. Adobe Commerce is open-source software designed to improve multi-channel business experiences for B2B and B2C customers. This eCommerce software offers high-level customization features, advanced commerce functionality, and top-level performance tools, which are all vital to running an online business.

The importance of this tool can be determined by the fact that it secured a leading position among eCommerce platforms across the globe, as shown by Techliance. On top of it, big industrial giants such as Coca-Cola, HP, Samsung, Wilson, Jaguar, and AsusTek use Magento to accelerate their business operations

What is a Message Queue Framework?

While building an eCommerce website through Magento 2, developers often face several challenges in adapting to its features and services, like cross-calling between services. Magento 2 subsidizes Message Queue Framework (MQF), enabling the module to send messages in queues. A Message Queue is a type of communication system in which senders and receivers don’t communicate with each other at the same time.

A sender, for instance, sends a message to a receiver that will be stored in the queue until the receiver retrieves it. Since both sender and receiver don’t contact each other at the same instant, an Asynchronous Mechanism will be established between the two. This article will guide you on how to create an asynchronous module in Magento 2 through easy steps. Before we dive deeper, understand the basic elements of MQF:

Elements of a Message Queue Framework

  • Sender/Publisher: The element of MQF that publishes messages and sends them to an exchange. 
  • Message Queue Broker: A broker is present between a publisher and a consumer to perform communication between these two elements.
  • Queue: The job of a queue is to store messages. 
  • Receiver/Consumer: The primary responsibility of consumers is to receive messages from an exchange and determine which queue is ready to consume data. 

If you’re a newbie and don’t know where to begin, check out a previous blog post about message queues. 

Effortless Magento Data Replication with Hevo’s No-Code Platform

Hevo Data provides a fully managed, no-code data pipeline solution that simplifies Magento data replication. With just a few clicks, you can extract and load data from Magento (via MySQL) and various other data sources directly into your data warehouse or databases. Hevo automates and enriches data replication, allowing you to focus on insights instead of infrastructure.

What Makes Hevo’s Magento ETL Process Best-In-Class:

  • Fully Managed Platform: Hevo is entirely automated, removing the need for maintenance and management.
  • Real-Time Data Replication: Hevo offers near real-time replication for faster access to insights.
  • Dynamic Schema Management: Hevo detects and maps source data schema automatically, saving time and reducing errors.

Thousands of customers worldwide trust Hevo for their data integration needs. Join them today to experience seamless, hassle-free data migration and transformation!

Get Started with Hevo for Free

General Example: Using RabbitMQ as a Message Broker for Magento 2 Queue

Let’s consider a case in which MQF exploits RabbitMQ as a multi-protocol messaging broker for Magento 2 Queue. It is considered a reliable and robust platform to send and receive messages. It uses AMQP (Advanced Message Queuing Protocol) and has a built-in mechanism for storing undelivered messages. 

Rabbit MQ

In order to dispatch a message to the Magento 2 Queue, use the following code:

$publisher->publish($topic, $message)

As mentioned before, queue, queue_message and queue_message_status tables are required to publish messages in multiple queues, where the queue_message table is used to represent a single record and queue_message_status is used to represent multiple records. 

A) Instantiating a Consumer in RabbitMQ

If the MQF uses RabbitMQ as a Message Queue System for Magento 2 Queue, then queue_consumer.xml will be used to instantiate a consumer. The customer_created_listener enables the consumer to attend to the queue and receive all incoming messages. 

For every receiving message in the Magento 2 Queue, it entreats the following command:

MagentoSomeClass::processMessage($message)
$this->consumerFactory->get('customer_created_listener')
     ->process();

B) Instantiating a Consumer in MySQL Adapter

If you think RabbitMQ is complicated to use as a message broker for Magento 2 Queue, you can also replace it with a MySQL Adapter. The MySQL adapter will enable the implementation of a simple data queue system along with three database tables. Compared to RabbitMQ, the MySQL adapter utilizes cron jobs to assure that consumers receive messages in Magento 2 Queue. 

In order to instantiate a consumer in MySQL adapter, use the following command:

MagentoFrameworkMessageQueueConsumerInterface::process($maxNumberOfMessages)

Execute the following steps:

Step 1: Define the Queue

In the first step, it is mandatory to define the queue name for Magento 2 Queue depending upon the consumer configuration.

MagentoFrameworkMessageQueueConsumerConfigurationInterface::getQueueName.

Step 2: Improve Load Distribution

In the second step, select the message records via the  $maxNumberOfMessages command and apply filtering on the following queue_name field. Combine all the 3 tables to drive load distribution across multiple consumers. 

Step 3: Decode Messages

Perform the decoding of messages by taking the topic name from the following command:

MagentoFrameworkMessageQueueConsumerConfigurationInterface

Step 4: Invoke Callback

Request callback MagentoFrameworkMessageQueueConsumerConfigurationInterface::getCallback , and depart the decoded message as an argument. 

Integrate Magento via MySQL to PostgreSQL
Integrate Magento via MySQL to Redshift
Integrate Magento via MySQL to Snowflake

Conversion of Magento 2 Queue from MySQL to AMQP

If you want to convert or migrate data of the Magento 2 Queue from MySQL to AMQP, execute the following operation:

'queue' => [
    'topics' => [
        'product_action_attribute.update' => [
            'publisher' => 'amqp-magento'
        ]
    ],
    'config' => [
        'publishers' => [
            'product_action_attribute.update' => [
                'connections' => [
                    'amqp' => [
                        'name' => 'amqp',
                        'exchange' => 'magento',
                        'disabled' => false
                    ],
                    'db' => [
                        'name' => 'db',
                        'disabled' => true
                    ]
                ]
            ]
        ]
    ],
    'consumers' => [
        'product_action_attribute.update' => [
            'connection' => 'amqp',
        ],
    ],
],

Understanding the Magento 2 Queue Topology

To understand the Magento 2 Queue, consider a case in which a client bought Salesforce integration to perform synchronization between his website and Salesforce company. Let’s consider the website has already 30k orders. Now, if the client wants to add all of them into the ‘hevodata_salesfore_queue’ table, he could face timeouts due to a shortage of storage. A Magento 2 Queue, therefore, would be needed to solve this challenge.

First off, configure the message queue topology which requires 4 XML files in <vendor>/<module>/etc folder:

A) Communication.xml

The purpose of this file is to define aspects of the asynchronous message systems that are common to all types of communication.

Let’s consider our topic is ‘salesforce.queue.order’.

Since this is a character, we need to define its data type, which is a string in this case. The string data type is used to message messages through Magento. 

The second step is to declare the handler class ‘HevoDataSalesforceModelQueueConsumer’ with the process method to control input from the sequence.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="salesforce.queue.order" request="string">
        <handler name="processAddOrderToQueue"  type="HevoDataSalesforceModelQueueConsumer" method="process" />
    </topic>
</config>

Now define the handler class HevoDataSalesforceModelQueueConsumer.php as follows:

Class Consumer
{
    /** @var PsrLogLoggerInterface  */
    protected $_logger;

    public function process($orders)
    {
        try{
        //function execute handles saving order object to table
            $this->execute($orders);

        }catch (Exception $e){
            //logic to catch and log errors
            $this->_logger->critical($e->getMessage());
        }
    }
}

B) Queue_consumer.xml 

It explains the link between a present queue and its consumer.

Define the parameters (name and queue attributes are important to define in this file) in this class as shown below:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="salesforce.queue.order"
              queue="salesforce.queue.order"
              connection="db"
              maxMessages="5000"
              consumerInstance="MagentoFrameworkMessageQueueConsumer"
              handler="HevoDataSalesforceModelQueueConsumer::process"/>
</config>

C) Queue_topology.xml

The purpose of this topology is to explain the message routing rules and declare queues and exchanges. 

In this file, define the exchange name, type, and connection attributes. As we are using MySQL for the queue system, connection ‘db’ is necessary. Clients can also RabbitMQ depending upon their feasibility and usage.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="processAddOrderToQueueBinding"
                topic="salesforce.queue.order"
                destinationType="queue"
                destination="salesforce.queue.order"/>
    </exchange>
</config>

D) Queue_publisher.xml

It includes an exchange where a topic is considered. Note that the connection name must be kept similar to the one defined in the communication file. 

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <publisher topic="salesforce.queue.order">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>

How to Send a Message from Publisher to a Queue?

To send a message from the publisher to a Magento 2 Queue, we need to create a controller. All the messages will be stored in Magento’s database.

  • The purpose of the queue table is to manage queues.
  • The purpose of the queue_message table is to store messages.
  • The purpose of the queue_message_status table is to handle the overload of a message queue.

For adding orders in the form of a Magento 2 Queue, send an ajax request to salesforce/queue/order. The following code will explain more in detail:

<?php
namespace HevoDataSalesforceControllerAdminhtmlQueue;

/**
* Class Order
* @package HevoDataSalesforceControllerAdminhtmlQueue
*/
class Order extends MagentoBackendAppAction
{
    /**
    * Authorization level of a basic admin session
    */
    const ADMIN_RESOURCE = 'HevoData_Salesforce::config_salesforce';

    const TOPIC_NAME = 'salesforce.queue.order';

    const SIZE = 5000;

    /* @var MagentoSalesModelResourceModelOrderCollectionFactory  /
    protected $_orderColFactory;

    /* @var MagentoFrameworkSerializeSerializerJson  /
    protected $_json;

    /* @var MagentoFrameworkMessageQueuePublisherInterface  /
    protected $_publisher;

    /**
    * Order constructor.
    *
    * @param MagentoSalesModelResourceModelOrderCollectionFactory $orderColFactory
    * @param MagentoFrameworkMessageQueuePublisherInterface $publisher
    * @param MagentoFrameworkSerializeSerializerJson $json
    * @param MagentoBackendAppActionContext $context
    */
    public function __construct(
        MagentoSalesModelResourceModelOrderCollectionFactory $orderColFactory,
        MagentoFrameworkMessageQueuePublisherInterface $publisher,
        MagentoFrameworkSerializeSerializerJson $json,
        MagentoBackendAppActionContext $context
    ){
        $this->_orderColFactory = $orderColFactory;
        $this->_json = $json;
        $this->_publisher = $publisher;
    }

    /**
    * @return MagentoFrameworkAppResponseInterface|MagentoFrameworkControllerResultInterface|void
    */
    public function execute()
    {
        if ($this->getRequest()->isAjax()) {
            try {
        //get list of order IDs
$orderCollection = $this->_orderColFactory->create()->addFieldToSelect('entity_id')->getAllIds();
        //send data to publish function
                $this->publishData($orderCollection, $this->type);
                $this->getResponse()->setBody($this->_json->serialize([
                    'error' => 0,
                    'message' => __('Orders are being added to queue')
                ]));
                return;
            } catch (Exception $e) {
                $this->getResponse()->setBody($this->_json->serialize([
                    'error' => 0,
'message' => __('Something went wrong while adding record(s) to queue. Error: '.$e->getMessage())
                ]));
                return;
            }
        }
        return $this->_redirect('*/*/index');
    }

    /**
    * @param $data
    * @param $type
    */
    public function publishData($data,$type)
    {
        if(is_array($data)){
        //split list of IDs into arrays of 5000 IDs each
            $chunks = array_chunk($data,self::SIZE);
            foreach ($chunks as $chunk){
            //publish IDs to queue
                    $rawData = [$type => $chunk];
$this->_publisher->publish(self::TOPIC_NAME, $this->_json->serialize($rawData));
            }
        }
    }
}

In the above case, we’ve divided order IDs into 5000, and after division, each chunk is then added to the message queue. The purpose of using chunks is to avoid unresponsive Magento backend and eliminate PHP timeout as there are too many orders. 

How to Process a Message from Queue? 

To process messages from the Magento 2 Queue, expand the handler class that was declared in 1st step:

<?php
namespace HevoDataSalesforceModelQueue;

/**
* Class Consumer
* @package HevoDataSalesforceModelQueue
*/
class Consumer
{
    ....
    /* @var MagentoFrameworkSerializeSerializerJson  /    
protected $_json;

    /**
    * @param string $orders
    */
    public function process($orders)
    {
        try{
            $this->execute($orders);
           
        }catch (Exception $e){
            $errorCode = $e->getCode();
            $message = __(Something went wrong while adding orders to queue');
            $this->_notifier->addCritical(
                $errorCode,
                $message
            );
            $this->_logger->critical($errorCode .": ". $message);
        }
    }

    /**
    * @param $orderItems
    *
    * @throws LocalizedException
    */
    private function execute($orderItems)
    {
        $orderCollectionArr = [];
        /* @var HevoDataSalesforceModelQueue $queue /
        $queue = $this->_queueFactory->create();
        $orderItems = $this->_json->unserialize($orderItems);
        if(is_array($orderItems)){
            foreach ($orderItems as $type => $orderId) {
            $orderCollectionArr[] = [
                    'type' => 'order',
                    'entity_id' => $orderId,
                    'priority' => 1,
                ];
            }
            //handle insertMulti orders into Salesforce queue
            $queue->add($orderCollectionArr);
        }
    }
}

Conclusion

Magento 2 or Adobe Commerce provides brand owners with an opportunity to connect multiple shopping experiences across mediums, introduce new brands, and websites and cover different geographies. With this eCommerce platform, owners can embed personalize end-to-end shopping ventures, resulting in higher conversion rates. However, developers must solve cross-calling between services to extract all the benefits of this software. The preceding article has discussed a procedure to create a Magento 2 Queue. The method is divided into three steps, i.e., sending a message from the publisher to Magento 2 Queue, processing a Magento 2 Queue, and executing of Magento 2 Queue. 

Organizations leverage various data sources to capture a variety of valuable data points. However, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task. It requires you to code and maintain complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps solve this issue, and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 150+ pre-built Integrations that you can choose from. Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

FAQs

1. How to create a queue in Magento 2?

In Magento 2, create a queue by defining it in the queue_topology.xml file within your module. Specify exchange and binding details, and then configure consumers and publishers in di.xml.

2. What is RabbitMQ in Magento 2?

RabbitMQ is a message broker in Magento 2 used to handle asynchronous communication, ensuring better scalability and performance for processes like order management and email notifications.

3. Is RabbitMQ a messaging queue?

Yes, RabbitMQ is a messaging queue. It is an open-source message broker that facilitates communication between applications or components using asynchronous messaging.

Syeda Famita Amber
Technical Content Writer, Hevo Data

Syeda is a technical content writer with a profound passion for data. She specializes in crafting insightful content on a broad spectrum of subjects, including data analytics, machine learning, artificial intelligence, big data, and business intelligence. Through her work, Syeda aims to simplify complex concepts and trends for data practitioners, making them accessible and engaging for data professionals.