Skip to content

Commit

Permalink
Print consumer configs to stdout (#57)
Browse files Browse the repository at this point in the history
Print consumer configs to stdout before starting the consumer, making it easier to debug during development

The output will look like this:

 ++++++++++++++++++ CONSUMER CONFIGS ++++++++++++++++++

                    CONFIG | VALUE
                    topics | topic_name
                       dlq | topic_name_dlq
                    commit | 1
          maxCommitRetries | 6
               maxMessages | -1
         auto.offset.reset | smallest
queued.max.messages.kbytes | 10000
        enable.auto.commit | false
         compression.codec | gzip
      max.poll.interval.ms | 86400000
                  group.id | group_id
         bootstrap.servers | localhost:9093
         security.protocol | SASL_PLAINTEXT
           sasl.mechanisms | PLAIN
  • Loading branch information
thiagobrauer authored Jul 19, 2022
1 parent 24a25b6 commit 51b0730
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
28 changes: 28 additions & 0 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public function __construct(Config $config)

public function consume(): void
{
if($this->config->getPrintConfigs()) {
$this->printConsumerConfigs();
}

$this->consumer = new KafkaConsumer($this->setConf($this->config->getConsumerOptions()));
$this->producer = new Producer($this->setConf($this->config->getProducerOptions()));

Expand All @@ -66,6 +70,30 @@ public function consume(): void
} while (!$this->isMaxMessage());
}

private function printConsumerConfigs() {
echo PHP_EOL;
echo "\e[0;30;42m ++++++++++++++++++ CONSUMER CONFIGS ++++++++++++++++++\e[0m\n";
echo PHP_EOL;

$mask = "\e[0;32m%26s | %s \e[0m\n";
printf($mask, 'CONFIG', 'VALUE');

$mask = "%26s | %s \n";
printf($mask, 'topics', implode(', ', $this->config->getTopics()));
printf($mask, 'dlq', $this->config->getDlq());
printf($mask, 'commit', $this->config->getCommit());
printf($mask, 'maxCommitRetries', $this->config->getMaxCommitRetries());
printf($mask, 'maxMessages', $this->config->getMaxMessages());

foreach ($this->config->getConsumerOptions() as $key => $value) {
if($key !== 'sasl.username' && $key !== 'sasl.password') {
printf($mask, $key, $value);
}
}

echo PHP_EOL;
}

private function doConsume()
{
$message = $this->consumer->consume(120000);
Expand Down
11 changes: 10 additions & 1 deletion src/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ConsumerBuilder
private $securityProtocol;
private $autoCommit;
private $options;
private $printConfigs;

private function __construct(string $brokers, string $groupId, array $topics)
{
Expand All @@ -47,6 +48,7 @@ private function __construct(string $brokers, string $groupId, array $topics)
$this->securityProtocol = 'PLAINTEXT';
$this->autoCommit = false;
$this->options = [];
$this->printConfigs = false;
}

public static function create(string $brokers, $groupId, array $topics): self
Expand Down Expand Up @@ -141,6 +143,12 @@ public function withOption(string $name, string $value): self
return $this;
}

public function withPrintConfigs(bool $printConfigs): self
{
$this->printConfigs = $printConfigs;
return $this;
}

public function build(): Consumer
{
$config = new Config(
Expand All @@ -155,7 +163,8 @@ public function build(): Consumer
$this->maxMessages,
$this->maxCommitRetries,
$this->autoCommit,
$this->options
$this->options,
$this->printConfigs
);

return new Consumer(
Expand Down
10 changes: 9 additions & 1 deletion src/Entities/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Config
private $maxCommitRetries;
private $autoCommit;
private $customOptions;
private $printConfigs;

public function __construct(
?Sasl $sasl,
Expand All @@ -32,7 +33,8 @@ public function __construct(
int $maxMessages = -1,
int $maxCommitRetries = 6,
bool $autoCommit = false,
array $customOptions = []
array $customOptions = [],
bool $printConfigs = false
) {
$this->dlq = $dlq;
$this->sasl = $sasl;
Expand All @@ -46,6 +48,7 @@ public function __construct(
$this->maxCommitRetries = $maxCommitRetries;
$this->autoCommit = $autoCommit;
$this->customOptions = $customOptions;
$this->printConfigs = $printConfigs;
}

public function getCommit(): int
Expand Down Expand Up @@ -114,6 +117,11 @@ public function getProducerOptions(): array
return array_merge($config, $this->getSaslOptions());
}

public function getPrintConfigs(): bool
{
return $this->printConfigs;
}

private function getSaslOptions(): array
{
if ($this->isPlainText() && $this->sasl !== null) {
Expand Down

0 comments on commit 51b0730

Please sign in to comment.