diff --git a/src/Consumer.php b/src/Consumer.php index bde64e6..4a995ac 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -52,6 +52,10 @@ public function __construct(Config $config) public function consume(): void { + if($this->config->getPrintConfigs()) { + $this->printConsumerConfigs(); + } + $this->consumer = new KafkaConsumer($this->setConf($this->config->getConsumerOptions())); $this->producer = new Producer($this->setConf($this->config->getProducerOptions())); @@ -66,6 +70,30 @@ public function consume(): void } while (!$this->isMaxMessage()); } + private function printConsumerConfigs() { + echo PHP_EOL; + echo "\e[0;30;42m ++++++++++++++++++ CONSUMER CONFIGS ++++++++++++++++++\e[0m\n"; + echo PHP_EOL; + + $mask = "\e[0;32m%26s | %s \e[0m\n"; + printf($mask, 'CONFIG', 'VALUE'); + + $mask = "%26s | %s \n"; + printf($mask, 'topics', implode(', ', $this->config->getTopics())); + printf($mask, 'dlq', $this->config->getDlq()); + printf($mask, 'commit', $this->config->getCommit()); + printf($mask, 'maxCommitRetries', $this->config->getMaxCommitRetries()); + printf($mask, 'maxMessages', $this->config->getMaxMessages()); + + foreach ($this->config->getConsumerOptions() as $key => $value) { + if($key !== 'sasl.username' && $key !== 'sasl.password') { + printf($mask, $key, $value); + } + } + + echo PHP_EOL; + } + private function doConsume() { $message = $this->consumer->consume(120000); diff --git a/src/ConsumerBuilder.php b/src/ConsumerBuilder.php index 81f8a0e..1a070d3 100644 --- a/src/ConsumerBuilder.php +++ b/src/ConsumerBuilder.php @@ -25,6 +25,7 @@ class ConsumerBuilder private $securityProtocol; private $autoCommit; private $options; + private $printConfigs; private function __construct(string $brokers, string $groupId, array $topics) { @@ -47,6 +48,7 @@ private function __construct(string $brokers, string $groupId, array $topics) $this->securityProtocol = 'PLAINTEXT'; $this->autoCommit = false; $this->options = []; + $this->printConfigs = false; } public static function create(string $brokers, $groupId, array $topics): self @@ -141,6 +143,12 @@ public function withOption(string $name, string $value): self return $this; } + public function withPrintConfigs(bool $printConfigs): self + { + $this->printConfigs = $printConfigs; + return $this; + } + public function build(): Consumer { $config = new Config( @@ -155,7 +163,8 @@ public function build(): Consumer $this->maxMessages, $this->maxCommitRetries, $this->autoCommit, - $this->options + $this->options, + $this->printConfigs ); return new Consumer( diff --git a/src/Entities/Config.php b/src/Entities/Config.php index 0d533b1..23258dd 100644 --- a/src/Entities/Config.php +++ b/src/Entities/Config.php @@ -19,6 +19,7 @@ class Config private $maxCommitRetries; private $autoCommit; private $customOptions; + private $printConfigs; public function __construct( ?Sasl $sasl, @@ -32,7 +33,8 @@ public function __construct( int $maxMessages = -1, int $maxCommitRetries = 6, bool $autoCommit = false, - array $customOptions = [] + array $customOptions = [], + bool $printConfigs = false ) { $this->dlq = $dlq; $this->sasl = $sasl; @@ -46,6 +48,7 @@ public function __construct( $this->maxCommitRetries = $maxCommitRetries; $this->autoCommit = $autoCommit; $this->customOptions = $customOptions; + $this->printConfigs = $printConfigs; } public function getCommit(): int @@ -114,6 +117,11 @@ public function getProducerOptions(): array return array_merge($config, $this->getSaslOptions()); } + public function getPrintConfigs(): bool + { + return $this->printConfigs; + } + private function getSaslOptions(): array { if ($this->isPlainText() && $this->sasl !== null) {