How to Create a Magento 2 Queue?| Made Easy

on e-commerce, Magento, Magento 2, Message Queue, RabbitMQ • May 29th, 2022 • Write for Hevo

Magento 2 Queue_FI

Magneto is a leading e-commerce builder software that has millions of users. It provides an improved solution for B2C and B2B communications. This software solution provides the tools necessary to drive the business forward.

A Message Queue is an asynchronous method of communication wherein there is no direct communication between sender and receiver. This article provides a comprehensive guide on Magneto 2, Message Queues, and steps to create a Magento 2 Queue.

Table of Contents

What is Magento 2 (Adobe Commerce)?

Initially released on March 31, 2008, Magento 2 is now known as Adobe Commerce. It’s an eCommerce software built using PHP programming language. Adobe Commerce is open-source software designed to improve multi-channel business experiences for B2B and B2C customers. This eCommerce software offers high-level customization features, advanced commerce functionality, and top-level performance tools, which are all vital to running an online business.

The importance of this tool can be determined by the fact that it secured a leading position among eCommerce platforms across the globe, as shown by Techliance. On top of it, big industrial giants such as Coca-Cola, HP, Samsung, Wilson, Jaguar, and AsusTek use Magento to accelerate their business operations

What is a Message Queue Framework?

While building an eCommerce website through Magento 2, developers often face several challenges in adapting to its features and services, like cross-calling between services. Magento 2 subsidizes Message Queue Framework (MQF), enabling the module to send messages in queues. A Message Queue is a type of communication system in which senders and receivers don’t communicate with each other at the same time.

A sender, for instance, sends a message to a receiver that will be stored in the queue until the receiver retrieves it. Since both sender and receiver don’t contact each other at the same instant, an Asynchronous Mechanism will be established between the two. This article will guide you on how to create an asynchronous module in Magento 2 through easy steps. Before we dive deeper, understand the basic elements of MQF:

Elements of a Message Queue Framework

  • Sender/Publisher: The element of MQF that publishes messages and sends them to an exchange. 
  • Message Queue Broker: A broker is present between a publisher and a consumer to perform communication between these two elements.
  • Queue: The job of a queue is to store messages. 
  • Receiver/Consumer: The primary responsibility of consumers is to receive messages from an exchange and determine which queue is ready to consume data. 

If you’re a newbie and don’t know where to begin, check out a previous blog post about message queues. 

Replicate Magento Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from Magento (via MySQL) and 100+ Data Sources straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

GET STARTED WITH HEVO FOR FREE

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

General Example: Using RabbitMQ as a Message Broker for Magento 2 Queue

Let’s consider a case in which MQF exploits RabbitMQ as a multi-protocol messaging broker for Magento 2 Queue. It is considered a reliable and robust platform to send and receive messages. It uses AMQP (Advanced Message Queuing Protocol) and has a built-in mechanism for storing undelivered messages. 

RabbitMQ Structure | Magento 2 Queue | Hevo Data
Image Source: Self

In order to dispatch a message to the Magento 2 Queue, use the following code:

$publisher->publish($topic, $message)

As mentioned before, queue, queue_message and queue_message_status tables are required to publish messages in multiple queues, where the queue_message table is used to represent a single record and queue_message_status is used to represent multiple records. 

A) Instantiating a Consumer in RabbitMQ

If the MQF uses RabbitMQ as a Message Queue System for Magento 2 Queue, then queue_consumer.xml will be used to instantiate a consumer. The customer_created_listener enables the consumer to attend to the queue and receive all incoming messages. 

For every receiving message in the Magento 2 Queue, it entreats the following command:

MagentoSomeClass::processMessage($message)
$this->consumerFactory->get('customer_created_listener')
     ->process();

B) Instantiating a Consumer in MySQL Adapter

If you think RabbitMQ is complicated to use as a message broker for Magento 2 Queue, you can also replace it with a MySQL Adapter. The MySQL adapter will enable the implementation of a simple data queue system along with three database tables. Compared to RabbitMQ, the MySQL adapter utilizes cron jobs to assure that consumers receive messages in Magento 2 Queue. 

In order to instantiate a consumer in MySQL adapter, use the following command:

MagentoFrameworkMessageQueueConsumerInterface::process($maxNumberOfMessages)

Execute the following steps:

Step 1: Define the Queue

In the first step, it is mandatory to define the queue name for Magento 2 Queue depending upon the consumer configuration.

MagentoFrameworkMessageQueueConsumerConfigurationInterface::getQueueName.

Step 2: Improve Load Distribution

In the second step, select the message records via the  $maxNumberOfMessages command and apply filtering on the following queue_name field. Combine all the 3 tables to drive load distribution across multiple consumers. 

Step 3: Decode Messages

Perform the decoding of messages by taking the topic name from the following command:

MagentoFrameworkMessageQueueConsumerConfigurationInterface

Step 4: Invoke Callback

Request callback MagentoFrameworkMessageQueueConsumerConfigurationInterface::getCallback , and depart the decoded message as an argument. 

Conversion of Magento 2 Queue from MySQL to AMQP

If you want to convert or migrate data of the Magento 2 Queue from MySQL to AMQP, execute the following operation:

'queue' => [
    'topics' => [
        'product_action_attribute.update' => [
            'publisher' => 'amqp-magento'
        ]
    ],
    'config' => [
        'publishers' => [
            'product_action_attribute.update' => [
                'connections' => [
                    'amqp' => [
                        'name' => 'amqp',
                        'exchange' => 'magento',
                        'disabled' => false
                    ],
                    'db' => [
                        'name' => 'db',
                        'disabled' => true
                    ]
                ]
            ]
        ]
    ],
    'consumers' => [
        'product_action_attribute.update' => [
            'connection' => 'amqp',
        ],
    ],
],

Understanding the Magento 2 Queue Topology

To understand the Magento 2 Queue, consider a case in which a client bought Salesforce integration to perform synchronization between his website and Salesforce company. Let’s consider the website has already 30k orders. Now, if the client wants to add all of them into the ‘hevodata_salesfore_queue’ table, he could face timeouts due to a shortage of storage. A Magento 2 Queue, therefore, would be needed to solve this challenge.

First off, configure the message queue topology which requires 4 XML files in <vendor>/<module>/etc folder:

A) Communication.xml

The purpose of this file is to define aspects of the asynchronous message systems that are common to all types of communication.

Let’s consider our topic is ‘salesforce.queue.order’.

Since this is a character, we need to define its data type, which is a string in this case. The string data type is used to message messages through Magento. 

The second step is to declare the handler class ‘HevoDataSalesforceModelQueueConsumer’ with the process method to control input from the sequence.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="salesforce.queue.order" request="string">
        <handler name="processAddOrderToQueue"  type="HevoDataSalesforceModelQueueConsumer" method="process" />
    </topic>
</config>

Now define the handler class HevoDataSalesforceModelQueueConsumer.php as follows:

Class Consumer
{
    /** @var PsrLogLoggerInterface  */
    protected $_logger;

    public function process($orders)
    {
        try{
        //function execute handles saving order object to table
            $this->execute($orders);

        }catch (Exception $e){
            //logic to catch and log errors
            $this->_logger->critical($e->getMessage());
        }
    }
}

B) Queue_consumer.xml 

It explains the link between a present queue and its consumer.

Define the parameters (name and queue attributes are important to define in this file) in this class as shown below:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="salesforce.queue.order"
              queue="salesforce.queue.order"
              connection="db"
              maxMessages="5000"
              consumerInstance="MagentoFrameworkMessageQueueConsumer"
              handler="HevoDataSalesforceModelQueueConsumer::process"/>
</config>

C) Queue_topology.xml

The purpose of this topology is to explain the message routing rules and declare queues and exchanges. 

In this file, define the exchange name, type, and connection attributes. As we are using MySQL for the queue system, connection ‘db’ is necessary. Clients can also RabbitMQ depending upon their feasibility and usage.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="processAddOrderToQueueBinding"
                topic="salesforce.queue.order"
                destinationType="queue"
                destination="salesforce.queue.order"/>
    </exchange>
</config>

D) Queue_publisher.xml

It includes an exchange where a topic is considered. Note that the connection name must be kept similar to the one defined in the communication file. 

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <publisher topic="salesforce.queue.order">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>

How to Send a Message from Publisher to a Queue?

To send a message from the publisher to a Magento 2 Queue, we need to create a controller. All the messages will be stored in Magento’s database.

  • The purpose of the queue table is to manage queues.
  • The purpose of the queue_message table is to store messages.
  • The purpose of the queue_message_status table is to handle the overload of a message queue.

For adding orders in the form of a Magento 2 Queue, send an ajax request to salesforce/queue/order. The following code will explain more in detail:

<?php
namespace HevoDataSalesforceControllerAdminhtmlQueue;

/**
* Class Order
* @package HevoDataSalesforceControllerAdminhtmlQueue
*/
class Order extends MagentoBackendAppAction
{
    /**
    * Authorization level of a basic admin session
    */
    const ADMIN_RESOURCE = 'HevoData_Salesforce::config_salesforce';

    const TOPIC_NAME = 'salesforce.queue.order';

    const SIZE = 5000;

    /* @var MagentoSalesModelResourceModelOrderCollectionFactory  /
    protected $_orderColFactory;

    /* @var MagentoFrameworkSerializeSerializerJson  /
    protected $_json;

    /* @var MagentoFrameworkMessageQueuePublisherInterface  /
    protected $_publisher;

    /**
    * Order constructor.
    *
    * @param MagentoSalesModelResourceModelOrderCollectionFactory $orderColFactory
    * @param MagentoFrameworkMessageQueuePublisherInterface $publisher
    * @param MagentoFrameworkSerializeSerializerJson $json
    * @param MagentoBackendAppActionContext $context
    */
    public function __construct(
        MagentoSalesModelResourceModelOrderCollectionFactory $orderColFactory,
        MagentoFrameworkMessageQueuePublisherInterface $publisher,
        MagentoFrameworkSerializeSerializerJson $json,
        MagentoBackendAppActionContext $context
    ){
        $this->_orderColFactory = $orderColFactory;
        $this->_json = $json;
        $this->_publisher = $publisher;
    }

    /**
    * @return MagentoFrameworkAppResponseInterface|MagentoFrameworkControllerResultInterface|void
    */
    public function execute()
    {
        if ($this->getRequest()->isAjax()) {
            try {
        //get list of order IDs
$orderCollection = $this->_orderColFactory->create()->addFieldToSelect('entity_id')->getAllIds();
        //send data to publish function
                $this->publishData($orderCollection, $this->type);
                $this->getResponse()->setBody($this->_json->serialize([
                    'error' => 0,
                    'message' => __('Orders are being added to queue')
                ]));
                return;
            } catch (Exception $e) {
                $this->getResponse()->setBody($this->_json->serialize([
                    'error' => 0,
'message' => __('Something went wrong while adding record(s) to queue. Error: '.$e->getMessage())
                ]));
                return;
            }
        }
        return $this->_redirect('*/*/index');
    }

    /**
    * @param $data
    * @param $type
    */
    public function publishData($data,$type)
    {
        if(is_array($data)){
        //split list of IDs into arrays of 5000 IDs each
            $chunks = array_chunk($data,self::SIZE);
            foreach ($chunks as $chunk){
            //publish IDs to queue
                    $rawData = [$type => $chunk];
$this->_publisher->publish(self::TOPIC_NAME, $this->_json->serialize($rawData));
            }
        }
    }
}

In the above case, we’ve divided order IDs into 5000, and after division, each chunk is then added to the message queue. The purpose of using chunks is to avoid unresponsive Magento backend and eliminate PHP timeout as there are too many orders. 

What Makes Hevo’s Magento ETL Process Best-In-Class

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s automated, No-code platform empowers you with everything you need to have for a smooth Magento (via MySQL) data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!

How to Process a Message from Queue? 

To process messages from the Magento 2 Queue, expand the handler class that was declared in 1st step:

<?php
namespace HevoDataSalesforceModelQueue;

/**
* Class Consumer
* @package HevoDataSalesforceModelQueue
*/
class Consumer
{
    ....
    /* @var MagentoFrameworkSerializeSerializerJson  /    
protected $_json;

    /**
    * @param string $orders
    */
    public function process($orders)
    {
        try{
            $this->execute($orders);
           
        }catch (Exception $e){
            $errorCode = $e->getCode();
            $message = __(Something went wrong while adding orders to queue');
            $this->_notifier->addCritical(
                $errorCode,
                $message
            );
            $this->_logger->critical($errorCode .": ". $message);
        }
    }

    /**
    * @param $orderItems
    *
    * @throws LocalizedException
    */
    private function execute($orderItems)
    {
        $orderCollectionArr = [];
        /* @var HevoDataSalesforceModelQueue $queue /
        $queue = $this->_queueFactory->create();
        $orderItems = $this->_json->unserialize($orderItems);
        if(is_array($orderItems)){
            foreach ($orderItems as $type => $orderId) {
            $orderCollectionArr[] = [
                    'type' => 'order',
                    'entity_id' => $orderId,
                    'priority' => 1,
                ];
            }
            //handle insertMulti orders into Salesforce queue
            $queue->add($orderCollectionArr);
        }
    }
}

Conclusion

Magento 2 or Adobe Commerce provides brand owners with an opportunity to connect multiple shopping experiences across mediums, introduce new brands, and websites and cover different geographies. With this eCommerce platform, owners can embed personalize end-to-end shopping ventures, resulting in higher conversion rates. However, developers must solve cross-calling between services to extract all the benefits of this software. The preceding article has discussed a procedure to create a Magento 2 Queue. The method is divided into three steps, i.e., sending a message from the publisher to Magento 2 Queue, processing a Magento 2 Queue, and executing of Magento 2 Queue. 

There are various Data Sources that organizations leverage to capture a variety of valuable data points. But, transferring data from these sources into a Data Warehouse for a holistic analysis is a hectic task. It requires you to code and maintains complex functions that can help achieve a smooth flow of data. An Automated Data Pipeline helps in solving this issue and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 100+ pre-built Integrations that you can choose from.

visit our website to explore hevo

Hevo can help you integrate data from Magento (via MySQL) and 100+ data sources and load them into a destination to analyze real-time data at an affordable price. It will make your life easier and Data Migration hassle-free. It is user-friendly, reliable, and secure.

SIGN UP for a 14-day free trial and see the difference!

Share your experience of learning about Magento 2 Queue in the comments section below.

No-code Data Pipeline For your Data Warehouse