Skip to content

Commit

Permalink
Remove swoole tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Meldiron committed Feb 28, 2024
1 parent 41e0cd6 commit 477c137
Showing 1 changed file with 52 additions and 61 deletions.
113 changes: 52 additions & 61 deletions app/http.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use OpenRuntimes\Executor\Usage;
use Swoole\Process;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\CLI\Console;
use Utopia\Logger\Log;
Expand Down Expand Up @@ -87,39 +86,22 @@
* Create a Swoole table to store runtime information
*/
$register->set('activeRuntimes', function () {
$table = new Table(1024);

$table->column('id', Table::TYPE_STRING, 256);
$table->column('created', Table::TYPE_FLOAT);
$table->column('updated', Table::TYPE_FLOAT);
$table->column('name', Table::TYPE_STRING, 256);
$table->column('hostname', Table::TYPE_STRING, 256);
$table->column('status', Table::TYPE_STRING, 128);
$table->column('key', Table::TYPE_STRING, 256);
$table->create();

return $table;
$state = [];

return $state;
});

/**
* Create a Swoole table of usage stats (separate for host and containers)
*/
$register->set('statsContainers', function () {
$table = new Table(1024);

$table->column('usage', Table::TYPE_FLOAT, 8);
$table->create();

return $table;
$state = []; // has usage float key
return $state;
});

$register->set('statsHost', function () {
$table = new Table(1024);

$table->column('usage', Table::TYPE_FLOAT, 8);
$table->create();

return $table;
$state = []; // has usage float key
return $state;
});

/** Set Resources */
Expand Down Expand Up @@ -256,7 +238,10 @@ function getStorageDevice(string $root): Device
}
}

function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration): void
/**
* @param array<string, mixed> $activeRuntimes
*/
function removeAllRuntimes(array $activeRuntimes, Orchestration $orchestration): void
{
Console::log('Cleaning up containers...');

Expand All @@ -275,8 +260,8 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):

$activeRuntimeId = $container->getLabels()['openruntimes-runtime-id'];

if (!$activeRuntimes->exists($activeRuntimeId)) {
$activeRuntimes->del($activeRuntimeId);
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
unset($activeRuntimes[$activeRuntimeId]);
}

Console::success('Removed container ' . $container->getName());
Expand Down Expand Up @@ -387,16 +372,16 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, string $image, string $entrypoint, string $source, string $destination, array $variables, string $runtimeEntrypoint, string $command, int $timeout, bool $remove, int $cpus, int $memory, string $version, Orchestration $orchestration, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, string $image, string $entrypoint, string $source, string $destination, array $variables, string $runtimeEntrypoint, string $command, int $timeout, bool $remove, int $cpus, int $memory, string $version, Orchestration $orchestration, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)
$runtimeId = System::getHostname() . '-' . $runtimeId; // Used in Docker (name)

$runtimeHostname = \uniqid();

$log->addTag('runtimeId', $activeRuntimeId);

if ($activeRuntimes->exists($activeRuntimeId)) {
if ($activeRuntimes->get($activeRuntimeId)['status'] == 'pending') {
if (\array_key_exists($activeRuntimeId, $activeRuntimes)) {
if ($activeRuntimes['activeRuntimeId']['status'] == 'pending') {
throw new \Exception('A runtime with the same ID is already being created. Attempt a execution soon.', 500);
}

Expand All @@ -410,15 +395,15 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):

$secret = \bin2hex(\random_bytes(16));

$activeRuntimes->set($activeRuntimeId, [
$activeRuntimes[$activeRuntimeId] = [
'id' => $containerId,
'name' => $activeRuntimeId,
'hostname' => $runtimeHostname,
'created' => $startTime,
'updated' => $startTime,
'status' => 'pending',
'key' => $secret,
]);
];

/**
* Temporary file paths in the executor
Expand Down Expand Up @@ -563,15 +548,15 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
'duration' => $duration,
]);

$activeRuntimes->set($activeRuntimeId, [
$activeRuntimes[$activeRuntimeId] = [
'id' => $containerId,
'name' => $activeRuntimeId,
'hostname' => $runtimeHostname,
'created' => $startTime,
'updated' => \microtime(true),
'status' => 'Up ' . \round($duration, 2) . 's',
'key' => $secret,
]);
];
} catch (Throwable $th) {
$error = $th->getMessage() . $output;

Expand Down Expand Up @@ -604,7 +589,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
} catch (Throwable $th) {
}

$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);

throw new Exception($error, 500);
}
Expand All @@ -621,7 +606,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
} catch (Throwable $th) {
}

$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);
}

$response
Expand All @@ -633,7 +618,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->desc("List currently active runtimes")
->inject('activeRuntimes')
->inject('response')
->action(function (Table $activeRuntimes, Response $response) {
->action(function (array $activeRuntimes, Response $response) {
$runtimes = [];

foreach ($activeRuntimes as $runtime) {
Expand All @@ -651,16 +636,16 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)

$log->addTag('runtimeId', $activeRuntimeId);

if (!$activeRuntimes->exists($activeRuntimeId)) {
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
throw new Exception('Runtime not found', 404);
}

$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];

$response
->setStatusCode(Response::STATUS_CODE_OK)
Expand All @@ -674,18 +659,19 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('activeRuntimes')
->inject('response')
->inject('log')
->action(function (string $runtimeId, Orchestration $orchestration, Table $activeRuntimes, Response $response, Log $log) {
->action(function (string $runtimeId, Orchestration $orchestration, array $activeRuntimes, Response $response, Log $log) {
$activeRuntimeId = $runtimeId; // Used with Swoole table (key)
$runtimeId = System::getHostname() . '-' . $runtimeId; // Used in Docker (name)

$log->addTag('runtimeId', $activeRuntimeId);

if (!$activeRuntimes->exists($activeRuntimeId)) {

if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
throw new Exception('Runtime not found', 404);
}

$orchestration->remove($runtimeId, true);
$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);

$response
->setStatusCode(Response::STATUS_CODE_OK)
Expand Down Expand Up @@ -714,7 +700,7 @@ function removeAllRuntimes(Table $activeRuntimes, Orchestration $orchestration):
->inject('response')
->inject('log')
->action(
function (string $runtimeId, ?string $payload, string $path, string $method, array $headers, int $timeout, string $image, string $source, string $entrypoint, array $variables, int $cpus, int $memory, string $version, string $runtimeEntrypoint, Table $activeRuntimes, Response $response, Log $log) {
function (string $runtimeId, ?string $payload, string $path, string $method, array $headers, int $timeout, string $image, string $source, string $entrypoint, array $variables, int $cpus, int $memory, string $version, string $runtimeEntrypoint, array $activeRuntimes, Response $response, Log $log) {
if (empty($payload)) {
$payload = '';
}
Expand All @@ -731,7 +717,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
$coldStartDuration = 0;

// Prepare runtime
if (!$activeRuntimes->exists($activeRuntimeId)) {
if (!\array_key_exists($activeRuntimeId, $activeRuntimes)) {
if (empty($image) || empty($source) || empty($entrypoint)) {
throw new Exception('Runtime not found. Please start it first or provide runtime-related parameters.', 401);
}
Expand Down Expand Up @@ -816,13 +802,13 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Update swoole table
$runtime = $activeRuntimes->get($activeRuntimeId) ?? [];
$runtime = $activeRuntimes[$activeRuntimeId] ?? [];
$runtime['updated'] = \time();
$activeRuntimes->set($activeRuntimeId, $runtime);
$activeRuntimes[$activeRuntimeId] = $runtime;

// Ensure runtime started
for ($i = 0; $i < 10; $i++) {
if ($activeRuntimes->get($activeRuntimeId)['status'] !== 'pending') {
if ($activeRuntimes[$activeRuntimeId]['status'] !== 'pending') {
break;
}

Expand All @@ -834,7 +820,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Ensure we have secret
$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];
$hostname = $runtime['hostname'];
$secret = $runtime['key'];
if (empty($secret)) {
Expand Down Expand Up @@ -1056,9 +1042,9 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
}

// Update swoole table
$runtime = $activeRuntimes->get($activeRuntimeId);
$runtime = $activeRuntimes[$activeRuntimeId];
$runtime['updated'] = \microtime(true);
$activeRuntimes->set($activeRuntimeId, $runtime);
$activeRuntimes[$activeRuntimeId] = $runtime;

// Finish request
$response
Expand All @@ -1073,13 +1059,13 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
->inject('statsHost')
->inject('statsContainers')
->inject('response')
->action(function (Table $statsHost, Table $statsContainers, Response $response) {
->action(function (array $statsHost, array $statsContainers, Response $response) {
$output = [
'status' => 'pass',
'runtimes' => []
];

$hostUsage = $statsHost->get('host', 'usage') ?? null;
$hostUsage = ($statsHost['host'] ?? [])['usage'] ?? null;
$output['usage'] = $hostUsage;

foreach ($statsContainers as $hostname => $stat) {
Expand Down Expand Up @@ -1213,7 +1199,7 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
} catch (\Throwable $th) {
Console::error('Inactive Runtime deletion failed: ' . $th->getMessage());
} finally {
$activeRuntimes->del($activeRuntimeId);
unset($activeRuntimes[$activeRuntimeId]);
}
});
}
Expand Down Expand Up @@ -1249,43 +1235,48 @@ function (string $runtimeId, ?string $payload, string $path, string $method, arr
* Get usage stats every X seconds to update swoole table
*/
Console::info('Starting stats interval...');
function getStats(Table $statsHost, Table $statsContainers, Orchestration $orchestration, bool $recursive = false): void

/**
* @param array<string, mixed> $statsHost
* @param array<string, mixed> $statsContainers
*/
function getStats(array $statsHost, array $statsContainers, Orchestration $orchestration, bool $recursive = false): void
{
// Get usage stats
$usage = new Usage($orchestration);
$usage->run();

// Update host usage stats
if ($usage->getHostUsage() !== null) {
$oldStat = $statsHost->get('host', 'usage') ?? null;
$oldStat = ($statsHost['host'] ?? [])['usage'] ?? null;

if ($oldStat === null) {
$stat = $usage->getHostUsage();
} else {
$stat = ($oldStat + $usage->getHostUsage()) / 2;
}

$statsHost->set('host', ['usage' => $stat]);
$statsHost['host'] = ['usage' => $stat];
}

// Update runtime usage stats
foreach ($usage->getRuntimesUsage() as $runtime => $usageStat) {
$oldStat = $statsContainers->get($runtime, 'usage') ?? null;
$oldStat = ($statsContainers[$runtime] ?? [])['usage'] ?? null;

if ($oldStat === null) {
$stat = $usageStat;
} else {
$stat = ($oldStat + $usageStat) / 2;
}

$statsContainers->set($runtime, ['usage' => $stat]);
$statsContainers[$runtime] = ['usage' => $stat];
}

// Delete gone runtimes
$runtimes = \array_keys($usage->getRuntimesUsage());
foreach ($statsContainers as $hostname => $stat) {
if (!(\in_array($hostname, $runtimes))) {
$statsContainers->delete($hostname);
unset($statsContainers[$hostname]);
}
}

Expand Down

0 comments on commit 477c137

Please sign in to comment.