Skip to content

Commit

Permalink
Merge pull request #21 from kysrpex/postgres
Browse files Browse the repository at this point in the history
Add support for PostgreSQL
  • Loading branch information
natefoo authored Oct 28, 2024
2 parents 9139ce8 + 80183d2 commit 9560df4
Show file tree
Hide file tree
Showing 6 changed files with 3,515 additions and 20 deletions.
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ installed and invoked through its [published package](https://www.npmjs.com/pack

```console
$ npx gx-it-proxy --version
$ npx gx-it-proxy --sessions test.sqlite --port 8001
$ npx gx-it-proxy --sessions test.sqlite --port 8001 # use SQLite
$ npx gx-it-proxy --sessions test.json --port 8001 # use a JSON file
$ npx gx-it-proxy --sessions postgresql:///galaxy?host=/var/run/postgresql --port 8001 # use PostgreSQL
```

## Double Proxy
Expand All @@ -18,3 +20,34 @@ To double-proxy (e.g. from Galaxy to a remote proxy that proxies to the applicat
host1$ ./lib/main.js --sessions test.sqlite --forwardIP host2 --forwardPort 8001
host2$ ./lib/main.js --port 8001
```

## Sessions

The proxy loads sessions from the file or database passed with the `--sessions`
option. SQLite and JSON files are watched and reloaded on every change.
PostgreSQL databases are polled every five seconds by default. The polling
interval can be configured via the `--pollingInterval` option (in ms).

Faster updates for PostgreSQL databases are possible via
[PostgreSQL asynchronous notifications](https://www.postgresql.org/docs/16/libpq-notify.html).
To enable them, create a PostgreSQL trigger that sends a
NOTIFY message to the channel `gxitproxy` every time the table `gxitproxy`
changes.

```SQL
CREATE OR REPLACE FUNCTION notify_gxitproxy()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('gxitproxy', 'Table "gxitproxy" changed');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER gxitproxy_notify
AFTER INSERT OR UPDATE OR DELETE ON gxitproxy
FOR EACH ROW EXECUTE FUNCTION notify_gxitproxy();
```

Although it is possible to disable polling using `--pollingInterval 0`, it is
strongly discouraged, as the delivery of asynchronous notifications is not
guaranteed.
33 changes: 29 additions & 4 deletions lib/createdb.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#!/usr/bin/env node
const args = require("commander");
const packageInfo = require("../package");
var postgresClient = require("pg-native");
var sqlite3 = require("sqlite3");

args
.version(packageInfo.version)
.option("--sessions <file>", "Routes file to monitor")
.option("--verbose");

const main = function (argv_) {
const argv = argv_ || process.argv;
args.parse(argv);
let db = new sqlite3.Database(args.sessions, (err) => {
const createDbSqlite = function (sessions) {
let db = new sqlite3.Database(sessions, (err) => {
if (err) {
return console.error(err.message);
}
Expand All @@ -36,6 +35,32 @@ CREATE TABLE gxitproxy
db.close();
};

const createDbPostgres = function (sessions) {
const db = postgresClient();
db.connectSync(sessions);
db.querySync(`
CREATE TABLE IF NOT EXISTS gxitproxy (
key text,
key_type text,
token text,
host text,
port integer,
info text,
PRIMARY KEY (key, key_type)
);`
)
};

const main = function (argv_) {
const argv = argv_ || process.argv;
args.parse(argv);
if (args.sessions.startsWith("postgresql://")) {
createDbPostgres(args.sessions);
} else {
createDbSqlite(args.sessions);
}
}

exports.main = main;

if (require.main === module) {
Expand Down
9 changes: 8 additions & 1 deletion lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ args
.option("--ip <n>", "Public-facing IP of the proxy", "localhost")
.option("--port <n>", "Public-facing port of the proxy", parseInt)
.option("--sessions <file>", "Routes file to monitor")
.option(
"--pollingInterval <n>",
"Polling interval for PostgreSQL databases (in ms, defaults to 5000, " +
"use 0 to disable polling)",
parseInt,
5000
)
.option("--proxyPathPrefix <path>", "Path-based proxy prefix")
.option("--forwardIP <n>", "Forward all requests to IP")
.option("--forwardPort <n>", "Forward all requests to port", parseInt)
Expand All @@ -28,7 +35,7 @@ const main = function (argv_) {

let sessions = null;
if (args.sessions) {
sessions = mapFor(args.sessions);
sessions = mapFor(args.sessions, args.pollingInterval);
}

const dynamicProxyOptions = {
Expand Down
79 changes: 69 additions & 10 deletions lib/mapper.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
var fs = require("fs");
var sqlite3 = require("sqlite3");
var watch = require("node-watch");

var endsWith = function (subjectString, searchString) {
var position = subjectString.length;
position -= searchString.length;
var lastIndex = subjectString.indexOf(searchString, position);
return lastIndex !== -1 && lastIndex === position;
};
var postgresClient = require("pg-native");
var watchFile = require("node-watch");

var updateFromJson = function (path, map) {
var content = fs.readFileSync(path, "utf8");
Expand Down Expand Up @@ -89,17 +83,82 @@ var updateFromSqlite = function (path, map) {
var db = new sqlite3.Database(path, loadSessions);
};

var mapFor = function (path) {
var updateFromPostgres = function(path, map) {
var db = postgresClient();
var loadedSessions = {};
db.connectSync(path);

var queryResult = db.querySync(
"SELECT key, key_type, token, host, port, info FROM gxitproxy"
);
for (var row of queryResult) {
var info = row.info;
if (info) {
info = JSON.parse(info);
}
loadedSessions[row.key] = {
target: { host: row.host, port: parseInt(row.port) },
key_type: row.key_type,
token: row.token,
requires_path_in_url: info?.requires_path_in_url,
requires_path_in_header_named: info?.requires_path_in_header_named,
};
}

for (var oldSession in map) {
if (!(oldSession in loadedSessions)) {
delete map[oldSession];
}
}
for (var loadedSession in loadedSessions) {
map[loadedSession] = loadedSessions[loadedSession];
}
// console.log("Updated map:", map)
db.end();
};

var watchPostgres = function(path, loadMap, pollingInterval) {
// poll the database every `pollingInterval` seconds
if (pollingInterval > 0) {
setInterval(loadMap, pollingInterval)
}

// watch changes using PostgresSQL asynchronous notifications
// (https://www.postgresql.org/docs/16/libpq-notify.html)
var db = postgresClient();
db.connect(path, function(err) {
if(err) throw err;

db.on("notification", function(msg) {loadMap(path, loadMap)});

Check failure on line 132 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (14.x)

'msg' is defined but never used

Check failure on line 132 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (16.x)

'msg' is defined but never used

Check failure on line 132 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (18.x)

'msg' is defined but never used
db.query("LISTEN gxitproxy", function(err, res) {if(err) throw err});

Check failure on line 133 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (14.x)

'res' is defined but never used

Check failure on line 133 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (16.x)

'res' is defined but never used

Check failure on line 133 in lib/mapper.js

View workflow job for this annotation

GitHub Actions / build (18.x)

'res' is defined but never used
});
// requires creating a notification function and a trigger for the gxitproxy
// table on the database (see README.md for more details)
// delivery of notifications is not guaranteed, therefore, combining polling
// with asynchronous notifications is strongly recommended
}

var mapFor = function (path, pollingInterval) {
var map = {};
var loadMap;
if (endsWith(path, ".sqlite")) {
var watch;
if (path.endsWith(".sqlite")) {
loadMap = function () {
updateFromSqlite(path, map);
};
watch = watchFile;
} else if (path.startsWith("postgresql://")) {
loadMap = function () {
updateFromPostgres(path, map);
};
watch = function (path, loadMap) {
return watchPostgres(path, loadMap, pollingInterval);
};
} else {
loadMap = function () {
updateFromJson(path, map);
};
watch = watchFile;
}
console.log("Watching path " + path);
loadMap();
Expand Down
Loading

0 comments on commit 9560df4

Please sign in to comment.