forked from swarrot/swarrot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
InstantRetryProcessor.php
74 lines (61 loc) · 2.12 KB
/
InstantRetryProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
namespace Swarrot\Processor\InstantRetry;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Psr\Log\NullLogger;
use Swarrot\Broker\Message;
use Swarrot\Processor\ConfigurableInterface;
use Swarrot\Processor\ProcessorInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class InstantRetryProcessor implements ConfigurableInterface
{
private $processor;
private $logger;
public function __construct(ProcessorInterface $processor, LoggerInterface $logger = null)
{
$this->processor = $processor;
$this->logger = $logger ?: new NullLogger();
}
/**
* {@inheritdoc}
*/
public function process(Message $message, array $options): bool
{
$retry = 0;
while ($retry++ < $options['instant_retry_attempts']) {
try {
return $this->processor->process($message, $options);
} catch (\Throwable $e) {
$logLevel = LogLevel::WARNING;
foreach ($options['instant_retry_log_levels_map'] as $className => $level) {
if ($e instanceof $className) {
$logLevel = $level;
break;
}
}
$this->logger->log($logLevel, '[InstantRetry] An exception occurred. The message will be processed again.', [
'swarrot_processor' => 'instant_retry',
'exception' => $e,
'message_id' => $message->getId(),
'instant_retry_delay' => $options['instant_retry_delay'] / 1000,
]);
usleep($options['instant_retry_delay']);
}
}
if (isset($e)) {
throw $e;
}
throw new \RuntimeException('You probably misconfigured the InstantRetryProcessor.');
}
/**
* {@inheritdoc}
*/
public function setDefaultOptions(OptionsResolver $resolver): void
{
$resolver->setDefaults([
'instant_retry_delay' => 2000000,
'instant_retry_attempts' => 3,
'instant_retry_log_levels_map' => [],
]);
}
}