RxPhp is a great work that brings us Reactive programming : asynchronous programming for human being.
You can play with reactiveX on RxMarble.com, find all the available operators on the official Reactivex.io website or read an interesting introduction.
RxNet is an effort to bring it battery included.
- Dns
- Http
- Httpd
- RabbitMq
- Redis
- ZeroMq
- InfluxDB
- Statsd
- Others outside
- voryx/pg-async postgres client
- RxPHP/RxStream stream
- RxPHP/RxWebsocket websocket client / server
- RxPHP/RxChildProcess forking
Thanks to react/react, its marvelous reactor pattern and all work done with it, many are just simple wrappers.
With composer : domraider/rxnet
Or just clone the lib, run composer install
and try the examples scripts.
Why one repository for all ? Because when you start using it you want every libraries to be RxFriendly :)
Asynchronous DNS resolver. Thanks to daverandom/libdns parser it was a breeze to code.
No extra extensions are needed
$dns = new Dns();
// Procedural way
echo Rx\awaitOnce($dns->resolve('www.google.fr', '8.8.4.4'));
// All types of queries are allowed
$dns->soa('www.google.fr')
->subscribe(new StdoutObserver());
HTTP client with all ReactiveX sweet
No extra extensions are needed
$http = new Http();
$http->get("https://github.com/Domraider/rxnet/commits/master")
// Timeout after 0.3s
->timeout(300)
// will retry 2 times on error
->retry(2)
// Transform response to something else
->map(function(Psr\Http\Message\ResponseInterface $response) {
$body = (string) $response->getBody();
// Domcrawler to extract commits
return $body;
})
->subscribe(new StdoutObserver());
// All the given options
$opts = [
// No buffering, you will receive chunks has they arrived
// Perfect for big files to parse or streaming json
'stream' => true,
// You can use body or json, json will add the header and json_encode
'body' => 'raw body for post',
'json' => ['my'=>'parameters', 'they-will->be'=>'json'],
// Set query string from here not the url
'query'=> [
'param1'=>'one'
],
// Specify user-agent to use
'user-agent' => 'SuperParser/0.1',
// Use a proxy
'proxy' => 'http://user:password@myproxy:8080',
// Append extra headers
'headers' => [
'Authorization' => 'Basic '.base64_encode('user:password')
],
// see http://php.net/manual/en/context.ssl.php
// Add whatever option you want on your https query
'ssl' => [
'verify_peer' => false
]
];
$http->post('https://adwords.google.com/my-10gb.xml', $opts)
->subscribeCallback(function($chunk) {
// let's give it to expat while it arrives
});
[ ] Psr7 request/response
[ ] multipart
[ ] gzip/deflate
Standalone HTTP server based on react/http implements nikic/fast-route as default router
No extra extensions are needed
$httpd = new HttpD();
$httpd->route('GET', '/', function(HttpdRequest $request, HttpdResponse $response) {
$response->text('Hello');
});
$httpd->route('POST', '/{var}', function(HttpdRequest $request, HttpdResponse $response) {
$var = $request->getRouteParam('var');
$response->json(['var'=>$var]);
});
$httpd->listen(8080);
Performances on a macbook pro are around 500 msg/s for one php process.
Remember that it does not need any fpm to run. And that default fpm configuration is with 10 childs.
[ ] Psr7 Request / Response
[ ] gzip / deflate
[ ] http2
[ ] multipart ?
[ ] ssl ? :D
Wrapper from jakubkulhan/bunny that works just fine
No extra extensions are needed
$rabbit = new RabbitMq('rabbit://guest:[email protected]:5672/', new Serialize());
// Wait for rabbit to be connected
\Rxnet\awaitOnce($rabbit->connect());
$queue = $rabbit->queue('test_queue', 'amq.direct', []);
// Will wait for messages
$queue->consume()
->subscribeCallback(function (RabbitMessage $message) use ($debug, $rabbit) {
echo '.';
$data = $message->getData();
$name = $message->getName();
$head = $message->getLabels();
// Do what you want but do one of this to get next
$message->ack();
//$message->nack();
//$message->reject();
//$message->rejectToBottom();
});
$queue = $rabbit->queue('test_queue', 'amq.direct', []);
$exchange = $rabbit->exchange('amq.direct');
$rabbit->connect()
->zip([
// Declare all the binding
$queue->create($queue::DURABLE),
$queue->bind('/routing/key', 'amq.direct'),
$exchange->create($exchange::TYPE_DIRECT, [
$exchange::DURABLE,
$exchange::AUTO_DELETE
])
])
// Everything's done let's produce
->subscribeCallback(function () use ($exchange, $loop) {
$done = 0;
// Just a simple array
\Rx\Observable::just(['id' => 2, 'foo' => 'bar'])
// Wait for one produce to be done before starting another
->flatMap(function ($data) use ($exchange) {
// Rabbit will handle serialize and unserialize
return $exchange->produce($data, '/routing/key');
})
// Produce it many times
->repeat($10000)
// Let's get some stats
->subscribeCallback(
function () use (&$done) { $done++;},
function (\Exception $e) { echo "shit happens : ".$e->getMessage();},
function () use (&$done, $loop) { echo number_format($done)." lines produced"; }
);
});
[ ] add all possible options has constant
Wrapper from clue/redis (great work !)
No extra extensions are needed
$redis = new Redis();
// Wait for redis to be ready
$redis = RxNet\awaitOnce($redis->connect('127.0.0.1:6379'));
$redis->get('key')
->subscribe(new StdoutObserver());
// Every redis operators return an observable
// And they are all implemented
Message exchange through tcp (or ipc or inproc).
Needs Pecl ZMQ extension to work
Router/dealer is a both direction communication. Dealer will wait for the router, router will not, so dealer has to start first
$zmq = new ZeroMq(new MsgPack());
// Connect to the router with my identity
$dealer = $zmq->dealer('tcp://127.0.0.1:3000', 'Roger');
// Display output
$dealer->subscribeCallback(new StdoutObserver())
// And start
$dealer->send(new PingCommand('ping'));
// Bind and wait
$router = $zmq->router('tcp://127.0.0.1:3000');
// Show received message and wait 0.1s to answer
$router->doOnEach(new StdOutObserver())
->delay(100)
->subscribeCallback(function($data) use($router) {
$router->send(new ReceivedEvent('pong'), 'Roger');
});
You can do push/pull
, req/rep
, read ØMQ - The Guide to see what network models fits you.
5k to 10k msg/s in router dealer. 30k msg/s in push pull.
[ ] pub/sub
InfluxDB client based on influxdata/influxdb-php It only supports UDP protocol for the moment (write only). Our primary goal was to have a non blocking client to send metrics.
Statsd client based on this gist and php-datadogstatsd for tagging support.
$statsd->gauge("database.connections", 42)
->subscribe(new StdOutObserver(), new EventLoopScheduler($loop));
Classic procedural is always possible but take care of side effects
$observable = $http->get('http://www.google.fr')
->timeout(1000)
->retry(3);
// Loop continue to go forward during await
$response = Rxnet\awaitOnce($observable);
// but this echo will wait before running
echo "1";
Using rx/await you can transform you observable to a generator
$source = \Rx\Observable::interval(1000)
->take(5); //Limit items to 5
$generator = \Rx\await($source);
foreach ($generator as $item) {
echo $item, PHP_EOL;
}
echo "DONE";