Integrate multi-queue publish structure
Edit on GitHubTo improve debugging of failures and slow events in Spryker, we introduced a new publish queue structure. In the new structure, the single event queue is replaced by multiple publish queues. You can can find a detailed comparison of the structures in the table below.
PROPERTY | SINGLE PUBLISH QUEUE STRUCTURE | MULTIPLE PUBLISH QUEUE STRUCTURE |
---|---|---|
Event processing | Infrastructure and application events are processed in the same event queue. | Infrastructure events are spread across multiple publish events. Application events are processed in the event queue. |
Monitoring of event consumption speed | Due to mixed listeners, event consumption speed is unpredictable and not linear. | You can monitor the speed of event consumption in each queue separately. |
Per event configuration of chunk size | ✓ | |
Per event separation of workers | ✓ |
Diagram of the single publish queue structure
Diagram of the multiple publish queue structure
Set up multiple publish queue structure
To enhance your project with the new structure, follow the steps below.
1) Install the required modules using Composer
composer require spryker/publisher:"1.1.0"
Make sure the following modules have been installed:
MODULE | EXPECTED DIRECTORY |
---|---|
Publisher | spryker/publisher |
2) Set up Behavior
Set up behavior as follows:
- Set up the
publish
queue as the default one:
<?php
namespace Pyz\Zed\Publisher;
use Spryker\Shared\Publisher\PublisherConfig as SharedPublisherConfig;
use Spryker\Zed\Publisher\PublisherConfig as SprykerPublisherConfig;
class PublisherConfig extends SprykerPublisherConfig
{
/**
* @return string|null
*/
public function getPublishQueueName(): ?string
{
return SharedPublisherConfig::PUBLISH_QUEUE; // 'publish'
}
}
- Set
\Pyz\Client\RabbitMqRabbitMqConfig::getPublishQueueConfiguration()
to use thepublish
queue.
Pyz\Client\RabbitMq
<?php
namespace Pyz\Client\RabbitMq;
...
class RabbitMqConfig extends SprykerRabbitMqConfig
{
/**
* QueueNameFoo, // Queue => QueueNameFoo, (Queue and error queue will be created: QueueNameFoo and QueueNameFoo.error)
* QueueNameBar => [
* RoutingKeyFoo => QueueNameBaz, // (Additional queues can be defined by several routing keys)
* ],
*
* @see https://www.rabbitmq.com/tutorials/amqp-concepts.html
*
* @return array
*/
protected function getQueueConfiguration(): array
{
return array_merge(
[
EventConstants::EVENT_QUEUE => [
EventConfig::EVENT_ROUTING_KEY_RETRY => EventConstants::EVENT_QUEUE_RETRY,
EventConfig::EVENT_ROUTING_KEY_ERROR => EventConstants::EVENT_QUEUE_ERROR,
],
$this->get(LogConstants::LOG_QUEUE_NAME),
],
$this->getPublishQueueConfiguration(),
....
);
}
/**
* @return array
*/
protected function getPublishQueueConfiguration(): array
{
return [
PublisherConfig::PUBLISH_QUEUE => [
PublisherConfig::PUBLISH_ROUTING_KEY_RETRY => PublisherConfig::PUBLISH_RETRY_QUEUE,
PublisherConfig::PUBLISH_ROUTING_KEY_ERROR => PublisherConfig::PUBLISH_ERROR_QUEUE,
],
...
];
}
...
}
- Set
Pyz\Zed\Queue\QueueDependencyProvider::getProcessorMessagePlugins()
to use thepublish
queue.
<?php
namespace Pyz\Zed\Queue;
...
use Spryker\Shared\Publisher\PublisherConfig;
...
class QueueDependencyProvider extends SprykerDependencyProvider
{
/**
* @param \Spryker\Zed\Kernel\Container $container
*
* @return \Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface[]
*/
protected function getProcessorMessagePlugins(Container $container): array
{
return [
...
PublisherConfig::PUBLISH_QUEUE => new EventQueueMessageProcessorPlugin(),
PublisherConfig::PUBLISH_RETRY_QUEUE => new EventRetryQueueMessageProcessorPlugin(),
...
];
}
}
Introduce a new publish queue
We introduce the GlossaryStorage publish queue as an example. Adjust the publish queue name according to your requirements.
To introduce the GlossaryStorage publish queue:
- Update the
GlossaryStorage
module:
composer update spryker/glossary-storage:"1.8.0" --update-with-dependencies
- In
\Pyz\Client\RabbitMqRabbitMqConfig::getPublishQueueConfiguration()
, register thepublish.translation
queue for glossary events.
<?php
namespace Pyz\Client\RabbitMq;
...
class RabbitMqConfig extends SprykerRabbitMqConfig
{
...
/**
* @return array
*/
protected function getPublishQueueConfiguration(): array
{
return [
...,
GlossaryStorageConfig::PUBLISH_TRANSLATION, // 'publish.translation'
];
}
...
}
- Adjust
Pyz\Zed\Queue\QueueDependencyProvider::getProcessorMessagePlugins()
by adding thepublish.translation
glossary publish queue with a specific message processor plugin.
<?php
namespace Pyz\Zed\Queue;
...
use Spryker\Shared\GlossaryStorage\GlossaryStorageConfig;
...
class QueueDependencyProvider extends SprykerDependencyProvider
{
/**
* @param \Spryker\Zed\Kernel\Container $container
*
* @return \Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface[]
*/
protected function getProcessorMessagePlugins(Container $container): array
{
return [
...
GlossaryStorageConfig::PUBLISH_TRANSLATION => new EventQueueMessageProcessorPlugin(),
...
];
}
}
- Set
\Pyz\Zed\Publisher\PublisherDependencyProvider::getPublisherPlugins()
to use thepublish.translation
queue.
Pyz\Zed\Publisher
<?php
namespace Pyz\Zed\Publisher;
...
use Spryker\Shared\GlossaryStorage\GlossaryStorageConfig;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryDeletePublisherPlugin as GlossaryKeyDeletePublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryWritePublisherPlugin as GlossaryKeyWriterPublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryTranslation\GlossaryWritePublisherPlugin as GlossaryTranslationWritePublisherPlugin;
...
class PublisherDependencyProvider extends SprykerPublisherDependencyProvider
{
/**
* @return array
*/
protected function getPublisherPlugins(): array
{
return array_merge(
$this->getGlossaryStoragePlugins(),
);
}
/**
* @return array
*/
protected function getGlossaryStoragePlugins(): array
{
return [
GlossaryStorageConfig::PUBLISH_TRANSLATION => [ // 'publish.translation'
new GlossaryKeyDeletePublisherPlugin(),
new GlossaryKeyWriterPublisherPlugin(),
new GlossaryTranslationWritePublisherPlugin(),
],
];
}
...
}
Publish events using the default publish queue
To publish events using the default publish
queue, register the publisher plugins which process glossary events:
<?php
namespace Pyz\Zed\Publisher;
...
use Spryker\Shared\GlossaryStorage\GlossaryStorageConfig;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryDeletePublisherPlugin as GlossaryKeyDeletePublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryWritePublisherPlugin as GlossaryKeyWriterPublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryTranslation\GlossaryWritePublisherPlugin as GlossaryTranslationWritePublisherPlugin;
...
class PublisherDependencyProvider extends SprykerPublisherDependencyProvider
{
/**
* @return array
*/
protected function getGlossaryStoragePlugins(): array
{
return [
new GlossaryKeyDeletePublisherPlugin(),
new GlossaryKeyWriterPublisherPlugin(),
new GlossaryTranslationWritePublisherPlugin(),
];
}
...
}
Set up a publish queue for a publisher plugin
You can set up an individual publish queue for a publisher plugin or a set of publisher plugins as follows:
Pyz\Zed\Publisher
<?php
namespace Pyz\Zed\Publisher;
...
use Spryker\Shared\GlossaryStorage\GlossaryStorageConfig;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryDeletePublisherPlugin as GlossaryKeyDeletePublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryKey\GlossaryWritePublisherPlugin as GlossaryKeyWriterPublisherPlugin;
use Spryker\Zed\GlossaryStorage\Communication\Plugin\Publisher\GlossaryTranslation\GlossaryWritePublisherPlugin as GlossaryTranslationWritePublisherPlugin;
...
class PublisherDependencyProvider extends SprykerPublisherDependencyProvider
{
/**
* @return array
*/
protected function getGlossaryStoragePlugins(): array
{
return [
'publish.translation_key' => [
new GlossaryKeyDeletePublisherPlugin(),
new GlossaryKeyWriterPublisherPlugin(),
],
'publish.translation' => [
new GlossaryTranslationWritePublisherPlugin(),
],
];
}
...
}
Thank you!
For submitting the form