Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nodejs): introduce safer timestamp API #21

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
name: build

on:
push:
on: [push]

jobs:
build:
name: Build @questdb/nodejs-questdb-client
test:
name: Build with Node.js ${{ matrix.node-version }}
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16, 20]
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand All @@ -16,7 +18,7 @@ jobs:
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 16
node-version: ${{ matrix.node-version }}

- name: Install dependencies
run: npm ci
Expand All @@ -26,12 +28,3 @@ jobs:

- name: Run linter
run: npm run eslint

- name: Publish @questdb/nodejs-questdb-client to npm
if: github.ref == 'refs/heads/main'
uses: JS-DevTools/npm-publish@v1
with:
token: ${{ secrets.CI_TOKEN }}
access: public
check-version: true
package: package.json
38 changes: 38 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: build

on:
push:
branches:
- main

jobs:
build:
name: Publish @questdb/nodejs-questdb-client to npm
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
submodules: recursive

- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 20

- name: Install dependencies
run: npm ci

- name: Run tests
run: npm test

- name: Run linter
run: npm run eslint

- name: Publish
uses: JS-DevTools/npm-publish@v2
with:
token: ${{ secrets.CI_TOKEN }}
access: public
strategy: all
package: package.json
118 changes: 66 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,133 +1,143 @@
## QuestDB Node.js Client
# QuestDB Node.js Client

## Requirements

The client requires Node.js v16 or newer version.

## Installation
```shell
npm install @questdb/nodejs-client
npm i -s @questdb/nodejs-client
```

## Examples

### Basic API usage

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// create a sender with a 4k buffer
// it is important to size the buffer correctly so messages can fit
const sender = new Sender({bufferSize: 4096});
const sender = new Sender();

// connect to QuestDB
// host and port are required in connect options
await sender.connect({port: 9009, host: "localhost"});
await sender.connect({port: 9009, host: 'localhost'});

// add rows to the buffer of the sender
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0195).floatColumn("ask", 1.0221).atNow();
sender.table("prices").symbol("instrument", "GBPUSD")
.floatColumn("bid", 1.2076).floatColumn("ask", 1.2082).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0195).floatColumn('ask', 1.0221)
.at(Date.now(), 'ms');
sender.table('prices').symbol('instrument', 'GBPUSD')
.floatColumn('bid', 1.2076).floatColumn('ask', 1.2082)
.at(Date.now(), 'ms');

// flush the buffer of the sender, sending the data to QuestDB
// the buffer is cleared after the data is sent and the sender is ready to accept new data
await sender.flush();

// add rows to the buffer again and send it to the server
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run()
.then(console.log)
.catch(console.error);
```

### Authentication and secure connection

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');

async function run() {
// construct a JsonWebKey
const CLIENT_ID = "testapp";
const PRIVATE_KEY = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID = 'testapp';
const PRIVATE_KEY = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224)
.at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### TypeScript example

```typescript
import { Sender } from "@questdb/nodejs-client";
import { Sender } from '@questdb/nodejs-client';

async function run(): Promise<number> {
// construct a JsonWebKey
const CLIENT_ID: string = "testapp";
const PRIVATE_KEY: string = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
const CLIENT_ID: string = 'testapp';
const PRIVATE_KEY: string = '9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8';
const PUBLIC_KEY: { x: string, y: string } = {
x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
x: 'aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc',
y: '__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
};
const JWK: { x: string, y: string, kid: string, kty: string, d: string, crv: string } = {
...PUBLIC_KEY,
d: PRIVATE_KEY,
kid: CLIENT_ID,
kty: "EC",
crv: "P-256",
kty: 'EC',
crv: 'P-256',
};

// pass the JsonWebKey to the sender
// will use it for authentication
const sender: Sender = new Sender({bufferSize: 4096, jwk: JWK});
const sender: Sender = new Sender({jwk: JWK});

// connect() takes an optional second argument
// if 'true' passed the connection is secured with TLS encryption
await sender.connect({port: 9009, host: "localhost"}, true);
await sender.connect({port: 9009, host: 'localhost'}, true);

// send the data over the authenticated and secure connection
sender.table("prices").symbol("instrument", "EURUSD")
.floatColumn("bid", 1.0197).floatColumn("ask", 1.0224).atNow();
sender.table('prices').symbol('instrument', 'EURUSD')
.floatColumn('bid', 1.0197).floatColumn('ask', 1.0224).at(Date.now(), 'ms');
await sender.flush();

// close the connection after all rows ingested
await sender.close();
return new Promise(resolve => resolve(0));
}

run().then(value => console.log(value)).catch(err => console.log(err));
run().catch(console.error);
```

### Worker threads example

```javascript
const { Sender } = require("@questdb/nodejs-client");
const { Sender } = require('@questdb/nodejs-client');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// fake venue
Expand All @@ -136,7 +146,7 @@ function* venue(ticker) {
let end = false;
setTimeout(() => { end = true; }, rndInt(5000));
while (!end) {
yield {"ticker": ticker, "price": Math.random()};
yield {'ticker': ticker, 'price': Math.random()};
}
}

Expand All @@ -153,35 +163,37 @@ async function subscribe(ticker, onTick) {

async function run() {
if (isMainThread) {
const tickers = ["t1", "t2", "t3", "t4"];
const tickers = ['t1', 't2', 't3', 't4'];
// main thread to start a worker thread for each ticker
for (let ticker in tickers) {
const worker = new Worker(__filename, { workerData: { ticker: ticker } })
.on('error', (err) => { throw err; })
.on('exit', () => { console.log(`${ticker} thread exiting...`); })
.on('message', (msg) => { console.log("Ingested " + msg.count + " prices for ticker " + msg.ticker); });
.on('message', (msg) => {
console.log(`Ingested ${msg.count} prices for ticker ${msg.ticker}`);
});
}
} else {
// it is important that each worker has a dedicated sender object
// threads cannot share the sender because they would write into the same buffer
const sender = new Sender({ bufferSize: 4096 });
await sender.connect({ port: 9009, host: "localhost" });
const sender = new Sender();
await sender.connect({ port: 9009, host: 'localhost' });

// subscribe for the market data of the ticker assigned to the worker
// ingest each price update into the database using the sender
let count = 0;
await subscribe(workerData.ticker, async (tick) => {
sender
.table("prices")
.symbol("ticker", tick.ticker)
.floatColumn("price", tick.price)
.atNow();
.table('prices')
.symbol('ticker', tick.ticker)
.floatColumn('price', tick.price)
.at(Date.now(), 'ms');
await sender.flush();
count++;
});

// let the main thread know how many prices were ingested
parentPort.postMessage({"ticker": workerData.ticker, "count": count});
parentPort.postMessage({'ticker': workerData.ticker, 'count': count});

// close the connection to the database
await sender.close();
Expand All @@ -196,5 +208,7 @@ function rndInt(limit) {
return Math.floor((Math.random() * limit) + 1);
}

run().catch((err) => console.log(err));
run()
.then(console.log)
.catch(console.error);
```
Loading