Configure event queues

Edit on GitHub

This document describes how event queues are configured.

To configure the event queue, follow these steps:

  1. Install spryker/queue version at least 1.*. and spryker/rabbit-mq
  2. Create the configuration for RabbitMQ in \Pyz\Client\RabbitMq\RabbitMqDependencyProvider as follows:
<?php
namespace Pyz\Client\RabbitMq;

use ArrayObject;
use Generated\Shared\Transfer\RabbitMqOptionTransfer;
use Spryker\Client\RabbitMq\Model\Connection\Connection;
use Spryker\Client\RabbitMq\RabbitMqDependencyProvider as RabbitMqRabbitMqDependencyProvider;
use Spryker\Shared\Event\EventConstants;

class RabbitMqDependencyProvider extends RabbitMqRabbitMqDependencyProvider
{

    /**
     * @return \ArrayObject
     */
    protected function getQueueOptions()
    {
        $queueOptionCollection = new ArrayObject();
        $queueOptionCollection->append($this->createEventExchangeQueueOption());
        $queueOptionCollection->append($this->createEventErrorExchangeQueueOption());

        return $queueOptionCollection;
    }

    /**
     * @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
     */
    protected function createEventExchangeQueueOption()
    {
        $rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
        $rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
        $rabbitMqOptionTransfer->setAutoDelete(false);
        $rabbitMqOptionTransfer->setDurable(true);
        $rabbitMqOptionTransfer->setPassive(false);
        $rabbitMqOptionTransfer->setType('direct');
        $rabbitMqOptionTransfer->setDeclarationType(Connection::RABBIT_MQ_EXCHANGE);
        $rabbitMqOptionTransfer->setBindingQueue($this->createEventQueueBinding());

        return $rabbitMqOptionTransfer;
    }

    /**
     * @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
     */
    protected function createEventErrorExchangeQueueOption()
    {
        $rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
        $rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
        $rabbitMqOptionTransfer->setAutoDelete(false);
        $rabbitMqOptionTransfer->setDurable(true);
        $rabbitMqOptionTransfer->setPassive(false);
        $rabbitMqOptionTransfer->setType('direct');
        $rabbitMqOptionTransfer->setDeclarationType(Connection::RABBIT_MQ_EXCHANGE);
        $rabbitMqOptionTransfer->setBindingQueue($this->createEventErrorQueueBinding());

        return $rabbitMqOptionTransfer;
    }

    /**
     * @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
     */
    protected function createEventErrorQueueBinding()
    {
        $rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
        $rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE . '.error');
        $rabbitMqOptionTransfer->setAutoDelete(false);
        $rabbitMqOptionTransfer->setDurable(true);
        $rabbitMqOptionTransfer->setExclusive(false);
        $rabbitMqOptionTransfer->setPassive(false);
        $rabbitMqOptionTransfer->setRoutingKey('error');

        return $rabbitMqOptionTransfer;
    }

    /**
     * @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
     */
    protected function createEventQueueBinding()
    {
        $rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
        $rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
        $rabbitMqOptionTransfer->setAutoDelete(false);
        $rabbitMqOptionTransfer->setDurable(true);
        $rabbitMqOptionTransfer->setExclusive(false);
        $rabbitMqOptionTransfer->setPassive(false);

        return $rabbitMqOptionTransfer;
    }

}
  1. In \Pyz\Client\Queue\QueueDependencyProvider, add the RabbitMQ adapter:
<?php
namespace Pyz\Client\Queue;

use Spryker\Client\Kernel\Container;
use Spryker\Client\Queue\QueueDependencyProvider as BaseQueueDependencyProvider;

class QueueDependencyProvider extends BaseQueueDependencyProvider
{

    /**
     * @param \Spryker\Client\Kernel\Container $container
     *
     * @return \Spryker\Client\Queue\Model\Adapter\AdapterInterface[]
     */
    protected function createQueueAdapters(Container $container)
    {
        return [
            $container->getLocator()->rabbitMq()->client()->createQueueAdapter(),
        ];
    }

}
  1. In \Pyz\Zed\Queue\QueueConfig, add receiver options for the event queue:
<?php
namespace Pyz\Zed\Queue;

use Generated\Shared\Transfer\RabbitMqConsumerOptionTransfer;
use Spryker\Shared\Event\EventConstants;
use Spryker\Zed\Queue\QueueConfig as SprykerQueueConfig;

class QueueConfig extends SprykerQueueConfig
{

    /**
     * @return array
     */
    protected function getQueueReceiverOptions()
    {
        return [
            EventConstants::EVENT_QUEUE => [
                'rabbitmq' => $this->getRabbitMqQueueConsumerOptions(),
            ],
        ];
    }

    /**
     * @return \Generated\Shared\Transfer\RabbitMqConsumerOptionTransfer
     */
    protected function getRabbitMqQueueConsumerOptions()
    {
        $queueOptionTransfer = new RabbitMqConsumerOptionTransfer();
        $queueOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
        $queueOptionTransfer->setConsumerTag('');
        $queueOptionTransfer->setNoLocal(false);
        $queueOptionTransfer->setNoAck(false);
        $queueOptionTransfer->setConsumerExclusive(false);
        $queueOptionTransfer->setNoWait(false);

        return $queueOptionTransfer;
    }

}
  1. In \Pyz\Zed\Queue\QueueDependencyProvider, add a plugin (consumer) to process messages or events.
<?php
namespace Pyz\Zed\Queue;

use Spryker\Shared\Event\EventConstants;
use Spryker\Zed\Event\Communication\Plugin\Queue\EventQueueMessageProcessorPlugin;
use Spryker\Zed\Kernel\Container;
use Spryker\Zed\Queue\QueueDependencyProvider as SprykerDependencyProvider;
use Spryker\Zed\RabbitMq\Communication\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin;

class QueueDependencyProvider extends SprykerDependencyProvider
{

    /**
     * @param \Spryker\Zed\Kernel\Container $container
     *
     * @return \Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface[]
     */
    protected function getProcessorMessagePlugins(Container $container)
    {
        return [
            EventConstants::EVENT_QUEUE => new EventQueueMessageProcessorPlugin(),
        ];
    }
    
    /**
     * @return array<\Spryker\Zed\RabbitMq\Communication\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin>
     */
    protected function getQueueMetricsExpanderPlugins(): array
    {
        return [
            new RabbitMqQueueMetricsReaderPlugin(), // Provides RabbitMQ-specific metrics for the resource-aware queue worker. Applicable only for RabbitMqAdapter.
        ];
    }
}
  1. In your application’s configuration file ./config/Shared/config_default.php or environment specific, make sure you have this configuration:
<?php

$config[QueueConstants::QUEUE_SERVER_ID] = (gethostname()) ?: php_uname('n');
$config[QueueConstants::QUEUE_WORKER_INTERVAL_MILLISECONDS] = 1000; // default 1000
$config[QueueConstants::QUEUE_WORKER_MAX_THRESHOLD_SECONDS] = 60; // 1min

$config[QueueConstants::QUEUE_ADAPTER_CONFIGURATION] = [
    EventConstants::EVENT_QUEUE => [
        QueueConfig::CONFIG_QUEUE_ADAPTER => \Spryker\Client\RabbitMq\Model\RabbitMqAdapter::class,
        QueueConfig::CONFIG_MAX_WORKER_NUMBER => 1, //Increase number of workers if higher concurrency needed.
    ],
];

// Enables processing of queues with resource aware queue worker.
$config[QueueConstants::RESOURCE_AWARE_QUEUE_WORKER_ENABLED] = (bool)getenv('RESOURCE_AWARE_QUEUE_WORKER_ENABLED') ?? false;
$config[QueueConstants::QUEUE_WORKER_FREE_MEMORY_BUFFER] = (int)getenv('QUEUE_WORKER_FREE_MEMORY_BUFFER') ?: 750;
$config[QueueConstants::QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT] = (int)getenv('QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT') ?: 5;
$config[QueueConstants::QUEUE_WORKER_MAX_PROCESSES] = 10; // concurrent, for all queues/stores, default 5
$config[QueueConstants::QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT] = 600; // 10 min, default 5 min

Metrics and resource-aware worker configuration

Starting with 202512.0, Spryker ships a resource-aware queue worker that uses queue metrics and system resource monitoring to make intelligent decisions about process spawning. The configuration keys referenced above (RESOURCE_AWARE_QUEUE_WORKER_ENABLED, QUEUE_WORKER_FREE_MEMORY_BUFFER, QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT, QUEUE_WORKER_MAX_PROCESSES, QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT) control this worker’s behavior.

RabbitMqQueueMetricsReaderPlugin

Register the RabbitMqQueueMetricsReaderPlugin in your QueueDependencyProvider to supply RabbitMQ queue metrics (message counts, batch sizes) to the resource-aware worker:

// src/Pyz/Zed/Queue/QueueDependencyProvider.php

use Spryker\Client\RabbitMq\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin;

class QueueDependencyProvider extends SprykerQueueDependencyProvider
{
    /**
     * @return array<\Spryker\Client\QueueExtension\Dependency\Plugin\QueueMetricsReaderPluginInterface>
     */
    protected function getQueueMetricsReaderPlugins(): array
    {
        return [
            new RabbitMqQueueMetricsReaderPlugin(),
        ];
    }
}

For the full configuration reference, architecture details, tuning recommendations, and backporting guidance, see Optimizing Jenkins execution with the resource-aware queue worker.