Library for parallel (concurrent) task processing. Implemented with symfony/process.
Basic table output
command.php
#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';
// First setup Parallel class
// If you setup log dir (optional) parallel will automatically create sub folder /stats and log running statistics in json format here.
$parallel = new \Parallel\Parallel(__DIR__, 'parallel', 5, 0.1);
// Add some tasks (only examples, not part of parallel)
// Task is defined by its name and by dependencies (optional)
// Dependencies ($runAfter = []) are tasks which have to be done before task can start
$parallel->addTask(new \Parallel\AdminsTask('task:admin'), ['task:categories']);
$parallel->addTask(new \Parallel\UsersTask('task:user'));
$parallel->addTask(new \Parallel\ArticlesTask('task:articles'), ['task:admin', 'task:user']);
$parallel->addTask(new \Parallel\CategoriesTask('task:categories'));
// Some tasks can be too much resource expensive, so we can define how many tasks can run along this task.
// If we setup 0, this task will be run alone although we setup 5 as global max concurrent
$parallel->addTask(new \Parallel\ArticleCategoriesTask('task:articlesCategories'), 0);
// Run symfony application under hood
$parallel->runConsoleApp();
Now simply run
php command.php parallel:run
Sometimes you want to run only subnet of registered task (eg. in development). For this purpose use --subnet
option in parallel:run
command.
--subnet option is validated as regexp and accept multiple values. Also all dependencies that doesn't match any of subnet regexp is removed from matched tasks.
# This command run only task:user and task:categories tasks
php command.php parallel:run --subnet task:user$ --subnet task:categories$
PSR logger is implemented. So we can use monolog/monolog
.
<?php
// Setup PSR logger
// ...
$parallel->setLogger($psrLogger);
Parallel can visualize tasks dependencies graph. All you have to do is set analyze dir. Output file in HTML format will be generated to setup directory.
<?php
$parallel->setAnalyzeDir(__DIR__ . '/../log');
# Now you can run
php command.php analyze:graph
Every task has to return some kind of result (SuccessResult, SkipResult, ErrorResult) as the result of task/item processing.
Suitable for tasks with static input data processing.
<?php
class ImplementedSimpleTask extends SimpleTask
{
protected function processTask(InputInterface $input, OutputInterface $output): TaskResult
{
// Do some magic
return new SuccessResult();
}
}
Suitable if you need to process each item from a medium dataset separately. All source items are provided at once. In some cases it can be too expensive for memory (see BatchProgressTask).
<?php
class ImplementedProgressTask extends ProgressTask
{
protected function items(): iterable
{
return DB::table('users');
}
protected function itemsCount(): int
{
return DB::table('users')->count();
}
protected function processItem($item): TaskResult
{
// $item here is one record form users table
// It can be anything what is provided by items() method (array, object ...)
if (!$item['is_active']) {
return new SkipResult();
}
file_put_contents('active_users', $item['id'] . "/n", FILE_APPEND | LOCK_EX);
return SuccessResult();
}
}
Most advanced task. Suitable if you need to process each item from a large dataset separately. Items can be provided and processed in batches.
<?php
class ImplementedBatchProgressTask extends BatchProgressTask
{
protected function startup(): void
{
// Here you can prepare data
}
protected function shutdown(): void
{
// Here you can run cleanup
}
protected function items(int $processed): iterable
{
// Fetch data (eg. from database) by 500 and use offset
return DB::table('users')->limit(500)->offset($processed);
}
protected function itemsCount(): int
{
// Count ALL data that will be processed
return DB::table('users')->count();
}
protected function processItem($item): TaskResult
{
// $item here is one record form users table
if (!$item['is_active']) {
return new SkipResult();
}
return new SuccessResult([
'id' => $item['id'],
'name' => $item['name']
]);
}
protected function batch(array $items): void
{
DB::table('active_users')->insert($items);
}
}
Anywhere in ProgressTask
and BatchProgressTask
can be send message to output with sendMessage($message)
method.
The TaskGenerator interface in the Parallel library allows you to dynamically generate tasks. This is useful when you need to create tasks based on criteria or input data that may not be known beforehand.
To create a new TaskGenerator, implement the TaskGenerator interface. This interface requires the implementation of two methods: getName and generateTasks.
Here's an example of how to create a new TaskGenerator:
namespace Parallel\TaskGenerator;
use Parallel\Task;
class CustomTaskGenerator implements TaskGenerator
{
private $name;
private $task;
private $runAfter = [];
private $maxConcurrentTasksCount;
private $chunkSize;
public function __construct(string $name, Task $task, int $chunksCount, array $runAfter = [], ?int $maxConcurrentTasksCount = null)
{
$this->name = $name;
$this->task = $task;
$this->chunkSize = $chunksCount;
$this->runAfter = $runAfter;
$this->maxConcurrentTasksCount = $maxConcurrentTasksCount;
}
public function getName(): string
{
return $this->name;
}
public function generateTasks(): array
{
$tasks = [];
for ($i = 1; $i <= $this->chunkSize; $i++) {
$tasks[] = new BaseGeneratedTask($this->task, $this->runAfter, $this->maxConcurrentTasksCount);
}
return $tasks;
}
}
Once you have created your TaskGenerator, add it to the Parallel instance. This allows the dynamically generated tasks to be included in the parallel processing.
$taskGenerator = new CustomTaskGenerator('custom', new \Parallel\SomeTask('task:custom'), 10, ['task:dependency'], 2);
$parallel->addTaskGenerator($taskGenerator);
Tasks generated by a TaskGenerator can have dependencies just like regular tasks. Specify these dependencies when creating the generated tasks. The Parallel library ensures these tasks are executed in the correct order.
public function generateTasks(): array
{
$tasks = [];
for ($i = 1; $i <= $this->chunkSize; $i++) {
$tasks[] = new BaseGeneratedTask($this->task, ['task:dependency'], $this->maxConcurrentTasksCount);
}
return $tasks;
}
In this example, each generated task depends on task:dependency.
To set dependencies for tasks that should run after the tasks generated by a TaskGenerator, reference the generator's name. This ensures that the task will wait for all tasks generated by the generator to complete before it starts.
$parallel->addTask(new \Parallel\SomeOtherTask('task:afterGenerated'), ['custom']);
In this example, task:afterGenerated
will only start after all tasks generated by the CustomTaskGenerator (named custom
) have finished.
afterGenerated` will only start after all tasks generated by CustomTaskGenerator have finished.