Replacing a default data importer with a queue data importer
Edit on GitHubThis tutorial is a quick step-by-step guide on how to replace a default data importer with a queue data importer.
Specifically, the ProductAbstract
data importer is replaced here.
Prerequisites
You should have data in the PRODUCT_ABSTRACT_QUEUE
. For details about how to import data into the queue, see Importing data with the queue data importer
1. Configuration
Configure DataImportConfig
and add a constant to it. You can add a new constant and a public method to Pyz\Zed\DataImport\DataImportConfig
.
The method must call Spryker\Zed\DataImport::buildQueueDataImporterConfiguration()
, passing three arguments:
- Import type
- Queue consumer options
Pyz\Zed\DataImport\DataImportConfig.php
<?php
use Generated\Shared\Transfer\QueueDataImporterConfigurationTransfer;
use Generated\Shared\Transfer\RabbitMqConsumerOptionTransfer;
use Spryker\Zed\DataImport\DataImportConfig as SprykerDataImportConfig;
.....
class DataImportConfig extends SprykerDataImportConfig
{
public const IMPORT_TYPE_PRODUCT_ABSTRACT_QUEUE = 'product-abstract-queue'; // We assumed you already provided the data in this queue
.....
public function getProductAbstractQueueImporterConfiguration(): QueueDataImporterConfigurationTransfer
{
return $this->buildQueueDataImporterConfiguration(
'import.product_abstract',
static::IMPORT_TYPE_PRODUCT_ABSTRACT_QUEUE,
$this->getQueueConsumerOptions()
);
}
protected function getQueueConsumerOptions(): array
{
return [
'rabbitmq' => (new RabbitMqConsumerOptionTransfer())->setConsumerExclusive(false)->setNoWait(false),
];
}
.....
}
2. Register a console command
Register a new import console command in Pyz\Zed\Console\ConsoleDependencyProvider::getConsoleCommands()
:
Pyz\Zed\Console\ConsoleDependencyProvider.php
<?php
namespace Pyz\Zed\Console;
use Pyz\Zed\DataImport\DataImportConfig;
use Spryker\Zed\Console\ConsoleDependencyProvider as SprykerConsoleDependencyProvider;
use Spryker\Zed\DataImport\Communication\Console\DataImportConsole;
.....
class ConsoleDependencyProvider extends SprykerConsoleDependencyProvider
{
protected function getConsoleCommands(Container $container)
{
$commands = [
.....
new DataImportConsole(DataImportConsole::DEFAULT_NAME . ':' . DataImportConfig::IMPORT_TYPE_PRODUCT_ABSTRACT_QUEUE),
.....
];
.....
}
.....
}
3. Adjust the business factory
Go to Pyz\Zed\DataImport\Business\DataImportBusinessFactory
and add a new method for creating the queue data importer that uses the queue reader and imports data to the database.
Also, add a call to this method to DataImportBusinessFactory::getDataImporterByType()
.
Pyz\Zed\DataImport\Business\DataImportBusinessFactory.php
<?php
use Spryker\Zed\DataImport\Business\DataImporter\Queue\QueueDataImporterInterface;
.....
class DataImportBusinessFactory extends SprykerDataImportBusinessFactory
{
.....
public function getDataImporterByType()
{
.....
case DataImportConfig::IMPORT_TYPE_PRODUCT_ABSTRACT_QUEUE:
return $this->createProductAbstractQueueImporter($dataImportConfigurationActionTransfer);
.....
}
protected function createProductAbstractQueueImporter(): QueueDataImporterInterface
{
$dataImporter = $this->createQueueDataImporter($this->getConfig()->getProductAbstractQueueImporterConfiguration());
$dataSetStepBroker = $this->createTransactionAwareDataSetStepBroker(ProductAbstractHydratorStep::BULK_SIZE);
$dataSetStepBroker
->addStep($this->createAddLocalesStep())
->addStep($this->createAddCategoryKeysStep())
->addStep($this->createTaxSetNameToIdTaxSetStep(ProductAbstractHydratorStep::KEY_TAX_SET_NAME))
->addStep($this->createAttributesExtractorStep())
->addStep($this->createProductLocalizedAttributesExtractorStep([
ProductAbstractHydratorStep::KEY_NAME,
ProductAbstractHydratorStep::KEY_URL,
ProductAbstractHydratorStep::KEY_DESCRIPTION,
ProductAbstractHydratorStep::KEY_META_TITLE,
ProductAbstractHydratorStep::KEY_META_DESCRIPTION,
ProductAbstractHydratorStep::KEY_META_KEYWORDS,
]))
->addStep(new ProductAbstractHydratorStep());
$dataImporter->addDataSetStepBroker($dataSetStepBroker);
$dataImporter->setDataSetWriter($this->createProductAbstractDataImportWriters());
return $dataImporter;
}
.....
}
4. Adjust writers
Make changes to all bulk data set writers, which store data internally. Then, flush it to the database in bulk once a configured threshold is reached.
First, use Spryker\Zed\DataImport\Business\DataImporter\Queue\DataSetWriterPersistenceStateAwareTrait
inside these writers. Then, do the following:
- Switch the persistence state to
false
at the very beginning of the::write()
method by callingSpryker\Zed\DataImport\Business\DataImporter\Queue\DataSetWriterPersistenceStateAwareTrait::setDataSetWriterPersistenceState(false)
. - Once the buffered data has been saved to the database, switch the persistence state to
true
by callingSpryker\Zed\DataImport\Business\DataImporter\Queue\DataSetWriterPersistenceStateAwareTrait::setDataSetWriterPersistenceState(true)
.
You need this for proper acknowledgment of the corresponding messages in the import queue.
Pyz\Zed\DataImport\Business\Model\ProductAbstract\Writer\ProductAbstractBulkPdoDataSetWriter.php
<?php
use Spryker\Zed\DataImport\Business\DataImporter\Queue\DataSetWriterPersistenceStateAwareTrait;
use Spryker\Zed\DataImport\Business\Model\DataSet\DataSetInterface;
use Spryker\Zed\DataImport\Business\Model\DataSet\DataSetWriterInterface;
.....
class ProductAbstractBulkPdoDataSetWriter implements DataSetWriterInterface
{
use DataSetWriterPersistenceStateAwareTrait;
/**
* Static buffer.
*/
protected static $productAbstractCollection = [];
.....
public function write(DataSetInterface $dataSet): void
{
$this->setDataSetWriterPersistenceState(false);
// storing data sets in the internal buffer
}
.....
/**
* Called once the data in the internal buffer is ready to be saved.
*/
protected function flush(): void
{
// writing data from the buffer to database
$this->setDataSetWriterPersistenceState(true);
}
}
5. Execution
To perform data importing from the queue into the persistent storage, run vendor/bin/console data:import
with the --group
option set to QUEUE_READERS
.
You can also run data:import
for the specific queue importer like vendor/bin/console data:import:product-abstract-queue
.
Thank you!
For submitting the form