A consumer of Kafka in PHP
-
Install librdkafka c library
$ cd /tmp $ mkdir librdkafka $ cd librdkafka $ curl -L https://github.com/edenhill/librdkafka/archive/v1.0.0.tar.gz | tar xz $ cd librdkafka-1.0.0 $ ./configure $ make $ make install
-
Install the php-rdkafka PECL extension
$ pecl install rdkafka
-
Add the following to your php.ini file to enable the php-rdkafka extension
extension=rdkafka.so
-
Install this package via composer using:
composer require arquivei/php-kafka-consumer
<?php
require_once 'vendor/autoload.php';
use Kafka\Consumer\Entities\Config;
use Kafka\Consumer\Contracts\Consumer;
use Kafka\Consumer\Entities\Config\Sasl;
use Kafka\Consumer\Entities\Config\MaxAttempt;
class DefaultConsumer extends Consumer
{
public function handle(string $message): void
{
print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
sleep(2);
print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
}
}
$config = new Config(
new Sasl('username', 'pasword', 'mechanisms'),
'topic',
'broker:port',
1,
'php-kafka-consumer-group-id',
new DefaultConsumer(),
new MaxAttempt(1),
'security-protocol'
);
(new \Kafka\Consumer\Consumer($config))->consume();
You need to add the php-kafka-consig.php
in config
path:
<?php
return [
'topic' => 'topic',
'broker' => 'broker',
'groupId' => 'group-id',
'securityProtocol' => 'security-protocol',
'sasl' => [
'mechanisms' => 'mechanisms',
'username' => 'username',
'password' => 'password',
],
];
Use the command to execute the consumer:
$ php artisan arquivei:php-kafka-consumer --consumer="App\Consumers\YourConsumer" --commit=1
- Add unit tests