Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a middle-man to run tasks wired to specific tubes #12

Open
igorsantos07 opened this issue Jul 28, 2016 · 1 comment
Open

Add a middle-man to run tasks wired to specific tubes #12

igorsantos07 opened this issue Jul 28, 2016 · 1 comment

Comments

@igorsantos07
Copy link
Contributor

igorsantos07 commented Jul 28, 2016

This was already made in the main project, but I found no way to supply this feature in a good way from this queue package. Maybe we could add those classes and the developer simply extends/implements what's needed, since there's no way to reference a task that's outside of the main project files?

tasks/WorkTask.php

<?php
use Phalcon\CLI\Task;
use Phalcon\Queue\Db as DbQueue;
use Phalcon\Text;
use PhalconRest\tasks\traits\Options;
use PhalconRest\tasks\workers\Worker;

class WorkTask extends Task
{
    use Options;

    protected static $options     = [];
    protected static $optionsSpec = [
        'limit' => '\d+',
        'delay' => '[\d\.]+',
    ];

    public function __call($action, array $arguments = [])
    {
        $name  = substr($action, 0, -strlen('action'));
        $class = '\PhalconRest\tasks\workers\\'.Text::camelize($name);
        if (class_exists($class)) {
            $options = self::processOptions(isset($arguments[0])? $arguments[0] : []);
            $worker  = new $class($options);

            if (!($worker instanceof Worker)) {
                echo "$class must be an instance of tasks\\workers\\Worker\n";
                die(254);
            }

            $result = (new DbQueue)
                ->watch($name, true)
                ->process([$worker,'handleJob'], static::$options['delay']?: 1, static::$options['limit']);

            echo "\nStats: ".print_r($result, true);
        } else {
            echo "There's no such worker class: $name ($class)\n";
            die(254);
        }
    }

    public function mainAction()
    {
        echo <<<HELP
  -= DbQueue Task runner =-
This scripts walks through available jobs and processes them given a specific class handler.

Usage: app/cli.php work «tube»
Implementation: tasks/workers/«Tube»::handleJob(\$body, Job \$job):bool|void
Options:
    --delay=F   Delay between asking for new jobs when the queue is over
    --limit=I   How much jobs to process before exiting
    --???=???   Other options are passed directly to the Worker constructor

HELP;
    }

}

tasks/worker/Worker.php

<?php namespace PhalconRest\tasks\workers;

use Phalcon\Queue\Db\Job;

interface Worker
{
    public function handleJob($body, Job $job);
}

tasks/traits/Options.php

<?php namespace PhalconRest\tasks\traits;

/**
 * Trait Options
 * Adds ability to process options.
 * @static array $optionsSpec Associative array of option name => regex
 *
 * <code>
 *     protected static $optionsSpec = [
 *         'limit'    => '\d*',        //allows --limit=123
 *         'category' => '.*',         //allows --category=u8nreijgndfjn
 *         'fast'     => true/false,   //allows --fast
 *
 * </code>
 */
trait Options
{

    protected static function processOptions(array $args)
    {
        if (!isset(static::$optionsSpec)) {
            return false;
        }

        $spec    = static::$optionsSpec;
        $options = array_combine(array_keys($spec), array_fill(0, sizeof($spec), null));
        foreach (static::$optionsSpec as $name => $regex) {
            $opt   = "--$name";
            $found = false;

            foreach ($args as $k => $arg) {
                if ($regex === true && $arg == $opt) {
                    $options[$name] = true;
                    $found = true;
                } elseif ($regex == '.*' && strpos($arg, $opt.'=')) {
                    $options[$name] = substr($arg, strlen($opt) + 1);
                    $found[] = $arg;
                    $found = true;
                } elseif (preg_match("/$opt=($regex)/", $arg, $matches)) {
                    $options[$name] = $matches[1];
                    $found[] = $arg;
                    $found = true;
                }

                if ($found) {
                    unset($args[$k]); //removes what's found
                    $found = false;
                    break;
                }
            }
        }
        static::$options = $options;

        $rest = [];
        foreach ($args as $arg) {
            $arg    = ltrim($arg, '-');
            $pieces = explode('=', $arg);
            $rest[$pieces[0]] = isset($pieces[1])? $pieces[1] : true;
        }

        return $rest; //returns what's left
    }

}
@igorsantos07 igorsantos07 modified the milestone: 1.0 Jul 28, 2016
@igorsantos07
Copy link
Contributor Author

igorsantos07 commented Jul 29, 2016

Another thing to be noticed here is: it might be useful as well to have a way to run one command that would work through the entire queue, handing out jobs to their specific workers as needed.

Cons

  • lack of control over options, delays and etc
  • long running jobs might hang on the line, blocking small jobs
  • having multiple workers to tackle the above point might end up eating too much resources from the machine

Pros

  • Less stuff to configure on supervisor
  • Might be useful for smaller projects?
  • only one query to the database to get all jobs and distribute instead of one query per tube (things may get noisy if there's a lot of tubes to be worked)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant