Leveraging the queue in Magento 2 in a (semi) simple way

Magento 2, Queue

Working with queues is quite a neat feature. It used to be Commercial only, but since release 2.3 this is also available for the Open Source version of Magento 2.

Now i hear you thinking: why would i need a queue? Well, to execute actions in the background. This has a few advantages:

  • Expensive operations are executed in a separate process, allowing to execute the current process quickly.

  • When an action fails, for example a api call, you can retry this.

  • When the action fails the enduser won’t notice this as it executes in the background.

  • You can move the processing to an external server if needed.

Now the downside is that timing can be hard. If you need the data right now you can’t use queues.

Big frameworks like Laravel and Symfony come with support for queues out of the box. If you want to get familliar with them i can recommend to play with them as it is easier then with Magento.

Prerequisites

I have tested this dispatching using RabbitMQ. For this i have installed RabbitMQ locally and added this to my env.php:

'queue' => [
    'amqp' => [
        'host' => 'localhost',
        'port' => '5672',
        'user' => 'rabbitmquser',
        'password' => 'rabbitmqpassword',
        'virtualhost' => 'rabbitmqvirtualhost',
        'ssl' => false
    ]
],

There is also an options to use MySql to run the queue, but i have no experience with that.

MageTested.com (ad)

Do you want your Magento store to be more reliable? Tired of things that suddenly break the checkout process without anyone noticing? You can hire my services to kickstart End-2-End testing for your Magento store. This way you know for sure that your store is behaving as expected before releasing that new feature or that update.

View MageTested.com for more information.

Dispatching tasks

When i needed to implement a queue driven task, i started to Google around and found this and this blogs. They basically boil down to a set of 5 XML files to configure everything. After some digging around in the Magento code, i bumped at this method:

\Magento\AsynchronousOperations\Model\MassSchedule::publishMass

 You need to feed it two things:

  • A topic name.

  • An entities array

Let's see how you should use this:

use Magento\AsynchronousOperations\Model\ConfigInterface;
use Magento\AsynchronousOperations\Model\MassSchedule;
use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Sales\Api\Data\OrderInterface;

class PublishExpensiveOperationTask implements ObserverInterface
{
    /**
    * @var MassSchedule
    */
    private $massSchedule;

    /**
    * @var ConfigInterface
    */
    private $asyncConfig;

    public function __construct(
        MassSchedule $massSchedule,
        ConfigInterface $asyncConfig
    ) {
        $this->massSchedule = $massSchedule;
        $this->asyncConfig = $asyncConfig;
    }

    public function execute(Observer $observer)
    {
        /** @var OrderInterface $order */
        $order = $observer->getData('order');

        $topicName = $this->asyncConfig->getTopicName('/V1/expensive-operation/calculate', 'POST');
        $this->massSchedule->publishMass($topicName, [[(int)$order->getCustomerId()]]);
    }
}

The topic name is just an API endpoint which will be called. You need to pass in the endpoint url, plus the type of request you want to make (GET, POST, etc). The entities array is an array with arrays that contains an ID. That's why it's wrapped in two [[ $id ]].

Your webapi.xml would look something like this:

<?xml version="1.0"?>
<routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Webapi:etc/webapi.xsd">
    <route url="/V1/expensive-operation/calculate" method="POST">
    <service class="<VENDOR>\<MODULE>\Api\ExpensiveOperationInterface" method="calculate"/>
    <resources>
      <resource ref="Admin"/>
    </resources>
    </route>
</routes>

Off cource you would need to declare the implementation of the ExpensiveOperationInterface in your di.xml.

Running the consumer

Now jobs are placed in the queue, but how do you make sure they are executed? This is where the consumer comes in. This will pick up the jobs and execute them one by one. Just run this simple command:

bin/magento queue:consumers:start async.operations.all

Only want to run a limited amount of jobs to debug your script? Call the command with --max-messages=1 and it will stop when it has processed exactly 1 job.

How to make sure this process keeps running?

Well, that is the tricky part. You can start this command, but if it for some reason fails your queue grinds to a halt and thing will go bad quickly. There is a simple solution for that: Use Supervisor. Supervisor is a tool that is widely use to watch specific programs and make sure they keep running. If it fails for whatever reason it will start the process again for your. There is also a bonus: Supervisor can start the same process multiple time, to increase the queue throughput. A typical Supervisor configuration would look like something like this:

[program:consumers-start]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php /home/magento/current/bin/magento queue:consumers:start async.operations.all
autostart=true
autorestart=true
user=YOUR_USER
numprocs=8
redirect_stderr=true
stdout_logfile=/home/magento/current/var/log/worker.log

After this load the Supervisor configuration and start the process:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start consumers-start:*

Other ways to leverage the queue

I've used this way because it's relatively easy to use the queue. But Magento offers other ways too. There is a pretty extensive tutorial on this subject from Inviqa, which you can find here. I'm pretty sure that method is better if you have a lot of different background jobs, but that method requires you to create a few xml files and a few classes before you can use a queue.

That's why i like the method described above, it's relatively simple to get a queue up and running quickly.

Want to respond?