diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 90b9a6b..4446c16 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -1,23 +1,24 @@ -name: Build & Test with Bun +name: Build and tests with Node.js on: - push: - branches: [ master ] - pull_request: - branches: [ master ] + push: + branches: [ master ] + pull_request: + branches: [ master ] jobs: build-and-test: runs-on: ubuntu-latest steps: - - name: Checkout - uses: actions/checkout@v3 # Checkout repo + - name: Checkout + uses: actions/checkout@v3 - - name: Setup Bun - uses: oven-sh/setup-bun@v1 # Setup bun - with: - bun-version: latest - - - run: bun i # Install dependencies - - run: bun test # Run tests + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: 21.x + cache: "npm" + - run: npm ci + - run: npm run build + - run: npm run test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml deleted file mode 100644 index fa849ea..0000000 --- a/.github/workflows/test.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: bun-test -on: - pull_request: - push: - branches: - - 'master' -jobs: - my-job: - name: bun-test - runs-on: ubuntu-latest - steps: - # ... - - uses: actions/checkout@v4 - - uses: oven-sh/setup-bun@v1 - - # run any `bun` or `bunx` command - - run: bun install - - run: bun test - diff --git a/README.md b/README.md index c45578b..4155ec7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Js-runner -[![Bun CI](https://github.com/rdf-connect/js-runner/actions/workflows/build-test.yml/badge.svg)](https://github.com/rdf-connect/js-runner/actions/workflows/build-test.yml) [![npm](https://img.shields.io/npm/v/@rdfc/js-runner.svg?style=popout)](https://npmjs.com/package/@rdfc/js-runner) +[![Build and tests with Node.js](https://github.com/rdf-connect/js-runner/actions/workflows/build-test.yml/badge.svg)](https://github.com/rdf-connect/js-runner/actions/workflows/build-test.yml) [![npm](https://img.shields.io/npm/v/@rdfc/js-runner.svg?style=popout)](https://npmjs.com/package/@rdfc/js-runner) Typescript/Javascript executor for an [RDF-Connect](https://rdf-connect.github.io/rdfc.github.io/) pipeline. Starting from a declarative RDF file describing the pipeline. @@ -34,7 +34,7 @@ You can execute this pipeline with ```bash tsc -bun bin/js-runner.js input.ttl +npx bin/js-runner.js input.ttl ``` -This example input configuration file uses `owl:imports` to specify additional configuration files. \ No newline at end of file +This example input configuration file uses `owl:imports` to specify additional configuration files. diff --git a/package.json b/package.json index f36e1f7..1f05d3e 100644 --- a/package.json +++ b/package.json @@ -19,8 +19,7 @@ "js-runner": "bin/bundle.mjs" }, "scripts": { - "build": "tsc && tsc-alias && rollup ./dist/index.js --file ./dist/index.cjs --format cjs && bun build --external debug ./bin/js-runner.js --outfile bin/bundle.mjs --target node && npm run build:recompose", - "build:recompose": "sed -z 's/var __require = (id) => {\\n return import.meta.require(id);\\n};/import Module from \"node:module\";\\nconst __require = Module.createRequire(import.meta.url);/' -i bin/bundle.mjs", + "build": "tsc && tsc-alias", "watch": "tsc -w", "test": "vitest run --coverage --coverage.include src", "prepare": "husky", diff --git a/src/args.ts b/src/args.ts index 2a34f06..696dab2 100644 --- a/src/args.ts +++ b/src/args.ts @@ -2,58 +2,70 @@ import commandLineArgs from "command-line-args"; import commandLineUsage from "command-line-usage"; const optionDefinitions = [ - { name: 'input', type: String, defaultOption: true, summary: "Specify what input file to start up" }, - { name: 'help', alias: 'h', type: Boolean, description: "Display this help message" }, + { + name: "input", + type: String, + defaultOption: true, + summary: "Specify what input file to start up", + }, + { + name: "help", + alias: "h", + type: Boolean, + description: "Display this help message", + }, ]; const sections = [ - { - header: "Js-runner", - content: "JS-runner is part of the {italic connector architecture}. Starting from an input file start up all JsProcessors that are defined. Please do not use blank nodes, skolemize your data somewhere else!" - }, - { - header: "Synopsis", - content: "$ js-runner " - }, - { - header: "Command List", - content: [{ name: "input", summary: "Specify what input file to start up" }], - }, - { - optionList: [optionDefinitions[1]] - } + { + header: "Js-runner", + content: + "JS-runner is part of the {italic connector architecture}. Starting from an input file start up all JsProcessors that are defined. Please do not use blank nodes, skolemize your data somewhere else!", + }, + { + header: "Synopsis", + content: "$ js-runner ", + }, + { + header: "Command List", + content: [ + { name: "input", summary: "Specify what input file to start up" }, + ], + }, + { + optionList: [optionDefinitions[1]], + }, ]; export type Args = { - input: string, + input: string; }; +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ function validArgs(args: any): boolean { - if (!args.input) return false; - return true; + if (!args.input) return false; + return true; } function printUsage() { - const usage = commandLineUsage(sections); - console.log(usage); - process.exit(0); + const usage = commandLineUsage(sections); + console.log(usage); + process.exit(0); } export function getArgs(): Args { - let args: any; - try { - args = commandLineArgs(optionDefinitions); - } catch (e) { - console.error(e); - printUsage(); - } - - if (args.help || !validArgs(args)) { - printUsage(); - } - - return args; -} - + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + let args: any; + try { + args = commandLineArgs(optionDefinitions); + } catch (e) { + console.error(e); + printUsage(); + } + if (args.help || !validArgs(args)) { + printUsage(); + } + return args; +} diff --git a/src/connectors.ts b/src/connectors.ts index 495f50b..b7ae6f6 100644 --- a/src/connectors.ts +++ b/src/connectors.ts @@ -2,258 +2,282 @@ import { createTermNamespace } from "@treecg/types"; import { NamedNode, Term } from "@rdfjs/types"; import { - FileWriterConfig, - startFileStreamReader, - startFileStreamWriter, + FileWriterConfig, + startFileStreamReader, + startFileStreamWriter, } from "./connectors/file"; export * from "./connectors/file"; import { - startWsStreamReader, - startWsStreamWriter, - WsWriterConfig, + startWsStreamReader, + startWsStreamWriter, + WsWriterConfig, } from "./connectors/ws"; export * from "./connectors/ws"; import { - KafkaReaderConfig, - KafkaWriterConfig, - startKafkaStreamReader, - startKafkaStreamWriter, + KafkaReaderConfig, + KafkaWriterConfig, + startKafkaStreamReader, + startKafkaStreamWriter, } from "./connectors/kafka"; export * from "./connectors/kafka"; import { - HttpReaderConfig, - HttpWriterConfig, - startHttpStreamReader, - startHttpStreamWriter, + HttpReaderConfig, + HttpWriterConfig, + startHttpStreamReader, + startHttpStreamWriter, } from "./connectors/http"; import { LOG } from "./util"; export * from "./connectors/http"; export const Conn = createTermNamespace( - "https://w3id.org/conn#", - "FileReaderChannel", - "FileWriterChannel", - "HttpReaderChannel", - "HttpWriterChannel", - "KafkaReaderChannel", - "KafkaWriterChannel", - "WsReaderChannel", - "WsWriterChannel", - "WriterChannel", - "ReaderChannel", + "https://w3id.org/conn#", + "FileReaderChannel", + "FileWriterChannel", + "HttpReaderChannel", + "HttpWriterChannel", + "KafkaReaderChannel", + "KafkaWriterChannel", + "WsReaderChannel", + "WsWriterChannel", + "WriterChannel", + "ReaderChannel", ); export interface Config { - id: Term; - ty: NamedNode; - config: T; + id: Term; + ty: NamedNode; + config: T; } export type ReaderConstructor = (config: C) => { - reader: Stream; - init: () => Promise; + reader: Stream; + init: () => Promise; }; export type WriterConstructor = (config: C) => { - writer: Writer; - init: () => Promise; + writer: Writer; + init: () => Promise; }; export const JsOntology = createTermNamespace( - "https://w3id.org/conn/js#", - "JsProcess", - "JsChannel", - "JsReaderChannel", - "JsWriterChannel", + "https://w3id.org/conn/js#", + "JsProcess", + "JsChannel", + "JsReaderChannel", + "JsWriterChannel", ); type JsChannel = { - channel: { - id: Term; - }; + channel: { + id: Term; + }; }; export class ChannelFactory { - private inits: (() => Promise)[] = []; - private jsChannelsNamedNodes: { [label: string]: SimpleStream } = {}; - private jsChannelsBlankNodes: { [label: string]: SimpleStream } = {}; - - createReader(config: Config): Stream { - LOG.channel("Creating reader %s: a %s", config.id.value, config.ty.value); - if (config.ty.equals(Conn.FileReaderChannel)) { - const { reader, init } = startFileStreamReader(config.config); - this.inits.push(init); - - return reader; - } - - if (config.ty.equals(Conn.WsReaderChannel)) { - const { reader, init } = startWsStreamReader(config.config); - this.inits.push(init); - - return reader; - } - - if (config.ty.equals(Conn.KafkaReaderChannel)) { - const { reader, init } = startKafkaStreamReader( - config.config, - ); - this.inits.push(init); - return reader; - } + private inits: (() => Promise)[] = []; + private jsChannelsNamedNodes: { [label: string]: SimpleStream } = + {}; + private jsChannelsBlankNodes: { [label: string]: SimpleStream } = + {}; + + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + createReader(config: Config): Stream { + LOG.channel( + "Creating reader %s: a %s", + config.id.value, + config.ty.value, + ); + if (config.ty.equals(Conn.FileReaderChannel)) { + const { reader, init } = startFileStreamReader(config.config); + this.inits.push(init); + + return reader; + } - if (config.ty.equals(Conn.HttpReaderChannel)) { - const { reader, init } = startHttpStreamReader( - config.config, - ); - this.inits.push(init); - return reader; - } + if (config.ty.equals(Conn.WsReaderChannel)) { + const { reader, init } = startWsStreamReader(config.config); + this.inits.push(init); - if (config.ty.equals(JsOntology.JsReaderChannel)) { - const c = config.config; - if (c.channel) { - const id = c.channel.id.value; - if (c.channel.id.termType === "NamedNode") { - if (!this.jsChannelsNamedNodes[id]) { - this.jsChannelsNamedNodes[id] = new SimpleStream(); - } + return reader; + } - return this.jsChannelsNamedNodes[id]; + if (config.ty.equals(Conn.KafkaReaderChannel)) { + const { reader, init } = startKafkaStreamReader( + config.config, + ); + this.inits.push(init); + return reader; } - if (c.channel.id.termType === "BlankNode") { - if (!this.jsChannelsBlankNodes[id]) { - this.jsChannelsBlankNodes[id] = new SimpleStream(); - } + if (config.ty.equals(Conn.HttpReaderChannel)) { + const { reader, init } = startHttpStreamReader( + config.config, + ); + this.inits.push(init); + return reader; + } - return this.jsChannelsBlankNodes[id]; + if (config.ty.equals(JsOntology.JsReaderChannel)) { + const c = config.config; + if (c.channel) { + const id = c.channel.id.value; + if (c.channel.id.termType === "NamedNode") { + if (!this.jsChannelsNamedNodes[id]) { + this.jsChannelsNamedNodes[id] = + new SimpleStream(); + } + + return this.jsChannelsNamedNodes[id]; + } + + if (c.channel.id.termType === "BlankNode") { + if (!this.jsChannelsBlankNodes[id]) { + this.jsChannelsBlankNodes[id] = + new SimpleStream(); + } + + return this.jsChannelsBlankNodes[id]; + } + throw "Should have found a thing"; + } } - throw "Should have found a thing"; - } - } - throw "Unknown reader channel " + config.ty.value; - } - - createWriter(config: Config): Writer { - LOG.channel("Creating writer %s: a %s", config.id.value, config.ty.value); - if (config.ty.equals(Conn.FileWriterChannel)) { - const { writer, init } = startFileStreamWriter( - config.config, - ); - this.inits.push(init); - - return writer; + throw "Unknown reader channel " + config.ty.value; } - if (config.ty.equals(Conn.WsWriterChannel)) { - const { writer, init } = startWsStreamWriter( - config.config, - ); - this.inits.push(init); + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + createWriter(config: Config): Writer { + LOG.channel( + "Creating writer %s: a %s", + config.id.value, + config.ty.value, + ); + if (config.ty.equals(Conn.FileWriterChannel)) { + const { writer, init } = startFileStreamWriter( + config.config, + ); + this.inits.push(init); + + return writer; + } - return writer; - } + if (config.ty.equals(Conn.WsWriterChannel)) { + const { writer, init } = startWsStreamWriter( + config.config, + ); + this.inits.push(init); - if (config.ty.equals(Conn.KafkaWriterChannel)) { - const { writer, init } = startKafkaStreamWriter( - config.config, - ); - this.inits.push(init); - return writer; - } - - if (config.ty.equals(Conn.HttpWriterChannel)) { - const { writer, init } = startHttpStreamWriter( - config.config, - ); - this.inits.push(init); - return writer; - } - - if (config.ty.equals(JsOntology.JsWriterChannel)) { - const c = config.config; - if (c.channel) { - const id = c.channel.id.value; - if (c.channel.id.termType === "NamedNode") { - if (!this.jsChannelsNamedNodes[id]) { - this.jsChannelsNamedNodes[id] = new SimpleStream(); - } + return writer; + } - return this.jsChannelsNamedNodes[id]; + if (config.ty.equals(Conn.KafkaWriterChannel)) { + const { writer, init } = startKafkaStreamWriter( + config.config, + ); + this.inits.push(init); + return writer; } - if (c.channel.id.termType === "BlankNode") { - if (!this.jsChannelsBlankNodes[id]) { - this.jsChannelsBlankNodes[id] = new SimpleStream(); - } + if (config.ty.equals(Conn.HttpWriterChannel)) { + const { writer, init } = startHttpStreamWriter( + config.config, + ); + this.inits.push(init); + return writer; + } - return this.jsChannelsBlankNodes[id]; + if (config.ty.equals(JsOntology.JsWriterChannel)) { + const c = config.config; + if (c.channel) { + const id = c.channel.id.value; + if (c.channel.id.termType === "NamedNode") { + if (!this.jsChannelsNamedNodes[id]) { + this.jsChannelsNamedNodes[id] = + new SimpleStream(); + } + + return this.jsChannelsNamedNodes[id]; + } + + if (c.channel.id.termType === "BlankNode") { + if (!this.jsChannelsBlankNodes[id]) { + this.jsChannelsBlankNodes[id] = + new SimpleStream(); + } + + return this.jsChannelsBlankNodes[id]; + } + throw "Should have found a thing"; + } } - throw "Should have found a thing"; - } - } - throw "Unknown writer channel " + config.ty.value; - } + throw "Unknown writer channel " + config.ty.value; + } - async init(): Promise { - await Promise.all(this.inits.map((x) => x())); - } + async init(): Promise { + await Promise.all(this.inits.map((x) => x())); + } } export interface Writer { - push(item: T): Promise; - end(): Promise; + push(item: T): Promise; + end(): Promise; + on(event: "end", listener: Handler): this; } export interface Stream { - lastElement?: T; - end(): Promise; - data(listener: (t: T) => PromiseLike | void): this; - on(event: "data", listener: (t: T) => PromiseLike | void): this; - on(event: "end", listener: () => PromiseLike | void): this; + lastElement?: T; + end(): Promise; + data(listener: (t: T) => PromiseLike | void): this; + on(event: "data", listener: (t: T) => PromiseLike | void): this; + on(event: "end", listener: () => PromiseLike | void): this; } export type Handler = (item: T) => Promise | void; export class SimpleStream implements Stream { - private readonly dataHandlers: Handler[] = []; - private readonly endHandlers: Handler[] = []; - - public readonly disconnect: () => Promise; - public lastElement?: T | undefined; - - public constructor(onDisconnect?: () => Promise) { - this.disconnect = onDisconnect || (async () => {}); - } - - public data(listener: Handler): this { - this.dataHandlers.push(listener); - return this; - } - - public async push(data: T): Promise { - this.lastElement = data; - await Promise.all(this.dataHandlers.map((handler) => handler(data))); - } - - public async end(): Promise { - await this.disconnect(); - await Promise.all(this.endHandlers.map((handler) => handler())); - } - - public on(event: "data", listener: Handler): this; - public on(event: "end", listener: Handler): this; - public on(event: "data" | "end", listener: Handler): this { - if (event === "data") { - this.dataHandlers.push(listener); + private readonly dataHandlers: Handler[] = []; + private readonly endHandlers: Handler[] = []; + + public readonly disconnect: () => Promise; + public lastElement?: T | undefined; + + private ended = false; + + public constructor(onDisconnect?: () => Promise) { + this.disconnect = onDisconnect || (async () => {}); } - if (event === "end") { - this.endHandlers.push(listener); + + public data(listener: Handler): this { + this.dataHandlers.push(listener); + return this; + } + + public async push(data: T): Promise { + if (this.ended) { + throw new Error("Trying to push to a stream that has ended!"); + } + this.lastElement = data; + await Promise.all(this.dataHandlers.map((handler) => handler(data))); + } + + public async end(): Promise { + await this.disconnect(); + await Promise.all(this.endHandlers.map((handler) => handler())); + this.ended = true; + } + + public on(event: "data", listener: Handler): this; + public on(event: "end", listener: Handler): this; + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + public on(event: "data" | "end", listener: Handler): this { + if (event === "data") { + this.dataHandlers.push(listener); + } + if (event === "end") { + this.endHandlers.push(listener); + } + return this; } - return this; - } } diff --git a/src/connectors/file.ts b/src/connectors/file.ts index 34bdef0..7be4110 100644 --- a/src/connectors/file.ts +++ b/src/connectors/file.ts @@ -3,134 +3,139 @@ import { appendFile, readFile, stat, writeFile } from "fs/promises"; import { isAbsolute } from "path"; import { watch } from "node:fs"; import { - ReaderConstructor, - SimpleStream, - WriterConstructor, + ReaderConstructor, + SimpleStream, + WriterConstructor, } from "../connectors"; interface FileError extends Error { - code: string; + code: string; } export interface FileReaderConfig { - path: string; - onReplace: boolean; - readFirstContent?: boolean; - encoding?: string; + path: string; + onReplace: boolean; + readFirstContent?: boolean; + encoding?: string; } export interface FileWriterConfig { - path: string; - onReplace: boolean; - readFirstContent?: boolean; - encoding?: string; + path: string; + onReplace: boolean; + readFirstContent?: boolean; + encoding?: string; } async function getFileSize(path: string): Promise { - return (await stat(path)).size; + return (await stat(path)).size; } function readPart( - path: string, - start: number, - end: number, - encoding: BufferEncoding, + path: string, + start: number, + end: number, + encoding: BufferEncoding, ): Promise { - return new Promise((res) => { - const stream = createReadStream(path, { encoding, start, end }); - let buffer = ""; - stream.on("data", (chunk) => { - buffer += chunk; + return new Promise((res) => { + const stream = createReadStream(path, { encoding, start, end }); + let buffer = ""; + stream.on("data", (chunk) => { + buffer += chunk; + }); + stream.on("close", () => res(buffer)); }); - stream.on("close", () => res(buffer)); - }); } function debounce(func: (t: A) => void, timeout = 100): (t: A) => void { - let timer: ReturnType; - return (...args) => { - clearTimeout(timer); - timer = setTimeout(() => { - func(...args); - }, timeout); - }; + let timer: ReturnType; + return (...args) => { + clearTimeout(timer); + timer = setTimeout(() => { + func(...args); + }, timeout); + }; } export const startFileStreamReader: ReaderConstructor = ( - config, + config, ) => { - const path = isAbsolute(config.path) - ? config.path - : `${process.cwd()}/${config.path}`; - openSync(path, "a+"); - const encoding: BufferEncoding = config.encoding || "utf-8"; - const reader = new SimpleStream(); - - const init = async () => { - let currentPos = await getFileSize(path); - const watcher = watch(path, { encoding: "utf-8" }); - watcher.on( - "change", - debounce(async () => { - try { - let content: string; - if (config.onReplace) { - content = await readFile(path, { encoding }); - } else { - const newSize = await getFileSize(path); - - if (newSize <= currentPos) { - currentPos = newSize; - return; - } - - content = await readPart(path, currentPos, newSize, encoding); - currentPos = newSize; - } - - await reader.push(content); - } catch (error: unknown) { - if ((error).code === "ENOENT") { - return; - } - throw error; + const path = isAbsolute(config.path) + ? config.path + : `${process.cwd()}/${config.path}`; + openSync(path, "a+"); + const encoding: BufferEncoding = config.encoding || "utf-8"; + const reader = new SimpleStream(); + + const init = async () => { + let currentPos = await getFileSize(path); + const watcher = watch(path, { encoding: "utf-8" }); + watcher.on( + "change", + debounce(async () => { + try { + let content: string; + if (config.onReplace) { + content = await readFile(path, { encoding }); + } else { + const newSize = await getFileSize(path); + + if (newSize <= currentPos) { + currentPos = newSize; + return; + } + + content = await readPart( + path, + currentPos, + newSize, + encoding, + ); + currentPos = newSize; + } + + await reader.push(content); + } catch (error: unknown) { + if ((error).code === "ENOENT") { + return; + } + throw error; + } + }), + ); + + if (config.onReplace && config.readFirstContent) { + const content = await readFile(path, { encoding }); + await reader.push(content); } - }), - ); + }; - if (config.onReplace && config.readFirstContent) { - const content = await readFile(path, { encoding }); - await reader.push(content); - } - }; - - return { reader, init }; + return { reader, init }; }; // export interface FileWriterConfig extends FileReaderConfig {} export const startFileStreamWriter: WriterConstructor = ( - config, + config, ) => { - const path = isAbsolute(config.path) - ? config.path - : `${process.cwd()}/${config.path}`; - const encoding: BufferEncoding = config.encoding || "utf-8"; - - const init = async () => { - if (!config.onReplace) { - await writeFile(path, "", { encoding }); - } - }; - - const push = async (item: string): Promise => { - if (config.onReplace) { - await writeFile(path, item, { encoding }); - } else { - await appendFile(path, item, { encoding }); - } - }; - - const end = async (): Promise => {}; - - return { writer: { push, end }, init }; + const path = isAbsolute(config.path) + ? config.path + : `${process.cwd()}/${config.path}`; + const encoding: BufferEncoding = config.encoding || "utf-8"; + + const writer = new SimpleStream(); + + const init = async () => { + if (!config.onReplace) { + await writeFile(path, "", { encoding }); + } + }; + + writer.push = async (item: string): Promise => { + if (config.onReplace) { + await writeFile(path, item, { encoding }); + } else { + await appendFile(path, item, { encoding }); + } + }; + + return { writer, init }; }; diff --git a/src/connectors/http.ts b/src/connectors/http.ts index ac50aa7..c0a2111 100644 --- a/src/connectors/http.ts +++ b/src/connectors/http.ts @@ -1,131 +1,131 @@ import * as http from "http"; import type * as https from "https"; import type { - IncomingMessage, - RequestListener, - Server, - ServerResponse, + IncomingMessage, + RequestListener, + Server, + ServerResponse, } from "http"; import { createServer } from "http"; import type { Readable } from "stream"; import { - Config, - ReaderConstructor, - SimpleStream, - WriterConstructor, + ReaderConstructor, + SimpleStream, + WriterConstructor, } from "../connectors"; function streamToString( - stream: Readable, - binary: boolean, + stream: Readable, + binary: boolean, ): Promise { - const datas = []; - return new Promise((res) => { - stream.on("data", (data) => { - datas.push(data); - }); - stream.on("end", () => { - const streamData = Buffer.concat(datas); - res(binary ? streamData : streamData.toString()); + const datas = []; + return new Promise((res) => { + stream.on("data", (data) => { + datas.push(data); + }); + stream.on("end", () => { + const streamData = Buffer.concat(datas); + res(binary ? streamData : streamData.toString()); + }); }); - }); } export interface HttpReaderConfig { - endpoint: string; - port: number; - binary: boolean; - waitHandled?: boolean; - responseCode?: number; + endpoint: string; + port: number; + binary: boolean; + waitHandled?: boolean; + responseCode?: number; } export const startHttpStreamReader: ReaderConstructor = ( - config, + config, ) => { - let server: Server; + let server: Server | undefined = undefined; - const stream = new SimpleStream( - () => - new Promise((res) => { - if (server !== undefined) { - server.close(() => { - res(); - }); - } else { - res(); - } - }), - ); + const stream = new SimpleStream( + () => + new Promise((res) => { + if (server !== undefined) { + server.close(() => { + res(); + }); + } else { + res(); + } + }), + ); - const requestListener: RequestListener = async function ( - req: IncomingMessage, - res: ServerResponse, - ) { - try { - const content = await streamToString(req, config.binary); + const requestListener: RequestListener = async function ( + req: IncomingMessage, + res: ServerResponse, + ) { + try { + const content = await streamToString(req, config.binary); - const promise = stream.push(content).catch((error) => { - throw error; - }); - if (config.waitHandled) { - await promise; - } - } catch (error: unknown) { - console.error("Failed", error); - } + const promise = stream.push(content).catch((error) => { + throw error; + }); + if (config.waitHandled) { + await promise; + } + } catch (error: unknown) { + console.error("Failed", error); + } - res.writeHead(config.responseCode || 200); - res.end("OK"); - }; + res.writeHead(config.responseCode || 200); + res.end("OK"); + }; - server = createServer(requestListener); - const init = () => { - return new Promise((res) => { - const cb = (): void => res(undefined); - if (server) { - server.listen(config.port, config.endpoint, cb); - } else { - cb(); - } - }); - }; - return { reader: stream, init }; + server = createServer(requestListener); + const init = () => { + return new Promise((res) => { + const cb = (): void => res(undefined); + if (server) { + server.listen(config.port, config.endpoint, cb); + } else { + cb(); + } + }); + }; + return { reader: stream, init }; }; export interface HttpWriterConfig { - endpoint: string; - method: string; + endpoint: string; + method: string; } export const startHttpStreamWriter: WriterConstructor = ( - config, + config, ) => { - const requestConfig = new URL(config.endpoint); + const requestConfig = new URL(config.endpoint); - const push = async (item: string | Buffer): Promise => { - await new Promise(async (resolve) => { - const options = { - hostname: requestConfig.hostname, - path: requestConfig.path, - method: config.method, - port: requestConfig.port, - }; + const writer = new SimpleStream(); - const cb = (response: IncomingMessage): void => { - response.on("data", () => {}); - response.on("end", () => { - resolve(null); - }); - }; + writer.push = async (item: string | Buffer): Promise => { + await new Promise((resolve) => { + const options = { + hostname: requestConfig.hostname, + path: requestConfig.path, + method: config.method, + port: requestConfig.port, + }; - const req = http.request(options, cb); - await new Promise((res) => req.write(item, res)); - await new Promise((res) => req.end(res)); - // res(null); - }); - }; + const cb = (response: IncomingMessage): void => { + response.on("data", () => {}); + response.on("end", () => { + resolve(null); + }); + }; - const end = async (): Promise => {}; + const req = http.request(options, cb); + req.write(item, () => { + req.end(); + }); + // res(null); + }); + }; - return { writer: { push, end }, init: async () => {} }; + return { writer, init: async () => {} }; }; diff --git a/src/connectors/kafka.ts b/src/connectors/kafka.ts index 1b2faa6..2ac13b4 100644 --- a/src/connectors/kafka.ts +++ b/src/connectors/kafka.ts @@ -2,148 +2,152 @@ import { readFileSync } from "node:fs"; import type { KafkaConfig, KafkaMessage, ProducerConfig } from "kafkajs"; import { Kafka } from "kafkajs"; import { - Config, - ReaderConstructor, - SimpleStream, - WriterConstructor, + ReaderConstructor, + SimpleStream, + WriterConstructor, } from "../connectors"; export interface SASLOptions { - mechanism: "plain"; - username: string; - password: string; + mechanism: "plain"; + username: string; + password: string; } export interface BrokerConfig { - hosts: string[]; - ssl?: boolean; - sasl?: SASLOptions; + hosts: string[]; + ssl?: boolean; + sasl?: SASLOptions; } export interface ConsumerConfig { - groupId: string; - metadataMaxAge?: number; - sessionTimeout?: number; - rebalanceTimeout?: number; - heartbeatInterval?: number; - maxBytesPerPartition?: number; - minBytes?: number; - maxBytes?: number; - maxWaitTimeInMs?: number; - allowAutoTopicCreation?: boolean; - maxInFlightRequests?: number; - readUncommitted?: boolean; - rackId?: string; + groupId: string; + metadataMaxAge?: number; + sessionTimeout?: number; + rebalanceTimeout?: number; + heartbeatInterval?: number; + maxBytesPerPartition?: number; + minBytes?: number; + maxBytes?: number; + maxWaitTimeInMs?: number; + allowAutoTopicCreation?: boolean; + maxInFlightRequests?: number; + readUncommitted?: boolean; + rackId?: string; } export interface CSTopic { - topic: string; - fromBeginning?: boolean; + topic: string; + fromBeginning?: boolean; } export interface KafkaReaderConfig { - topic: { - name: string; - fromBeginning?: boolean; - }; - consumer: ConsumerConfig; - broker: string | BrokerConfig; + topic: { + name: string; + fromBeginning?: boolean; + }; + consumer: ConsumerConfig; + broker: string | BrokerConfig; } export const startKafkaStreamReader: ReaderConstructor = ( - config, + config, ) => { - const brokerConfig: unknown = {}; - if (typeof config.broker === "string" || config.broker instanceof String) { - Object.assign( - brokerConfig, - JSON.parse(readFileSync(config.broker, "utf-8")), - ); - } else { - Object.assign(brokerConfig, config.broker); - } - if (brokerConfig && (brokerConfig).hosts) { - (brokerConfig).brokers = (brokerConfig).hosts; - } - - const kafka = new Kafka(brokerConfig); - - const consumer = kafka.consumer(config.consumer); - - const stream = new SimpleStream(async () => { - await consumer.disconnect(); - await consumer.stop(); - }); - - const init = async () => { - await consumer.connect(); - await consumer.subscribe({ - topic: config.topic.name, - fromBeginning: config.topic.fromBeginning, + const brokerConfig: unknown = {}; + if (typeof config.broker === "string" || config.broker instanceof String) { + Object.assign( + brokerConfig, + JSON.parse(readFileSync(config.broker, "utf-8")), + ); + } else { + Object.assign(brokerConfig, config.broker); + } + if (brokerConfig && (brokerConfig).hosts) { + (brokerConfig).brokers = (( + brokerConfig + )).hosts; + } + + const kafka = new Kafka(brokerConfig); + + const consumer = kafka.consumer(config.consumer); + + const stream = new SimpleStream(async () => { + await consumer.disconnect(); + await consumer.stop(); }); - consumer - .run({ - async eachMessage({ - topic, - message, - }: { - topic: string; - message: KafkaMessage; - }) { - if (topic === config.topic.name) { - const element = message.value?.toString() ?? ""; - stream.push(element).catch((error) => { - throw error; + const init = async () => { + await consumer.connect(); + await consumer.subscribe({ + topic: config.topic.name, + fromBeginning: config.topic.fromBeginning, + }); + + consumer + .run({ + async eachMessage({ + topic, + message, + }: { + topic: string; + message: KafkaMessage; + }) { + if (topic === config.topic.name) { + const element = message.value?.toString() ?? ""; + stream.push(element).catch((error) => { + throw error; + }); + } + }, + }) + .catch((error) => { + throw error; }); - } - }, - }) - .catch((error) => { - throw error; - }); - }; - - return { reader: stream, init }; + }; + + return { reader: stream, init }; }; export interface KafkaWriterConfig { - topic: { - name: string; - }; - producer: ProducerConfig; - broker: BrokerConfig | string; + topic: { + name: string; + }; + producer: ProducerConfig; + broker: BrokerConfig | string; } export const startKafkaStreamWriter: WriterConstructor = ( - config, + config, ) => { - const topic = config.topic.name; - - const brokerConfig: unknown = {}; - if (typeof config.broker === "string" || config.broker instanceof String) { - Object.assign( - brokerConfig, - JSON.parse(readFileSync(config.broker, "utf-8")), - ); - } else { - Object.assign(brokerConfig, config.broker); - } - if (brokerConfig && (brokerConfig).hosts) { - (brokerConfig).brokers = (brokerConfig).hosts; - } - - const kafka = new Kafka(brokerConfig); - - const producer = kafka.producer(config.producer); - const init = () => producer.connect(); - - const push = async (item: string): Promise => { - await producer.send({ topic, messages: [{ value: item }] }); - }; - - const end = async (): Promise => { - await producer.disconnect(); - }; - - return { writer: { push, end }, init }; + const topic = config.topic.name; + + const brokerConfig: unknown = {}; + if (typeof config.broker === "string" || config.broker instanceof String) { + Object.assign( + brokerConfig, + JSON.parse(readFileSync(config.broker, "utf-8")), + ); + } else { + Object.assign(brokerConfig, config.broker); + } + if (brokerConfig && (brokerConfig).hosts) { + (brokerConfig).brokers = (( + brokerConfig + )).hosts; + } + + const kafka = new Kafka(brokerConfig); + + const producer = kafka.producer(config.producer); + + const writer = new SimpleStream(async () => { + await producer.disconnect(); + }); + + const init = () => producer.connect(); + + writer.push = async (item: string): Promise => { + await producer.send({ topic, messages: [{ value: item }] }); + }; + + return { writer, init }; }; diff --git a/src/connectors/ws.ts b/src/connectors/ws.ts index 3c13d77..cff627d 100644 --- a/src/connectors/ws.ts +++ b/src/connectors/ws.ts @@ -1,107 +1,107 @@ import { - Config, - ReaderConstructor, - SimpleStream, - WriterConstructor, + ReaderConstructor, + SimpleStream, + WriterConstructor, } from "../connectors"; import { RawData, WebSocket } from "ws"; import { WebSocketServer } from "ws"; export interface WsWriterConfig { - url: string; + url: string; } function _connectWs(url: string, res: (value: WebSocket) => void): void { - const ws = new WebSocket(url, {}); - ws.on("error", () => { - setTimeout(() => _connectWs(url, res), 300); - }); - - ws.on("ping", () => ws.pong()); - ws.on("open", () => { - res(ws); - }); + const ws = new WebSocket(url, {}); + ws.on("error", () => { + setTimeout(() => _connectWs(url, res), 300); + }); + + ws.on("ping", () => ws.pong()); + ws.on("open", () => { + res(ws); + }); } function connectWs(url: string): Promise { - return new Promise((res) => _connectWs(url, res)); + return new Promise((res) => _connectWs(url, res)); } export interface WsReaderConfig { - host: string; - port: number; + host: string; + port: number; } export const startWsStreamReader: ReaderConstructor = ( - config, + config, ) => { - const server = new WebSocketServer(config); - server.on("error", (error) => { - console.error("Ws server error:"); - console.error(error); - }); - - const connections: { socket: WebSocket; alive: boolean }[] = []; - - const interval = setInterval(() => { - connections.forEach((instance, i) => { - if (!instance) { - return; - } - if (!instance.alive) { - instance.socket.terminate(); - delete connections[i]; - - return; - } - - instance.socket.ping(); - instance.alive = false; - }); - }, 30_000); - - const reader = new SimpleStream( - () => - new Promise((res) => { - clearInterval(interval); - server.close(() => res()); - }), - ); - - server.on("connection", (ws) => { - const instance = { socket: ws, alive: true }; - connections.push(instance); - - ws.on("message", async (msg: RawData) => { - reader.push(msg.toString()).catch((error) => { - throw error; - }); + const server = new WebSocketServer(config); + server.on("error", (error) => { + console.error("Ws server error:"); + console.error(error); }); - ws.on("pong", () => { - instance.alive = true; + const connections: { socket: WebSocket; alive: boolean }[] = []; + + const interval = setInterval(() => { + connections.forEach((instance, i) => { + if (!instance) { + return; + } + if (!instance.alive) { + instance.socket.terminate(); + delete connections[i]; + + return; + } + + instance.socket.ping(); + instance.alive = false; + }); + }, 30_000); + + const reader = new SimpleStream( + () => + new Promise((res) => { + clearInterval(interval); + server.close(() => res()); + }), + ); + + server.on("connection", (ws) => { + const instance = { socket: ws, alive: true }; + connections.push(instance); + + ws.on("message", async (msg: RawData) => { + reader.push(msg.toString()).catch((error) => { + throw error; + }); + }); + + ws.on("pong", () => { + instance.alive = true; + }); }); - }); - return { reader, init: async () => {} }; + return { reader, init: async () => {} }; }; export const startWsStreamWriter: WriterConstructor = ( - config, + config, ) => { - let ws: WebSocket; - const init = async () => { - ws = await connectWs(config.url); - ws.on("open", () => console.log("open")); - }; - - const push = async (item: any): Promise => { - ws.send(item); - }; + let ws: WebSocket; + const init = async () => { + ws = await connectWs(config.url); + ws.on("open", () => console.log("open")); + }; + + const writer = new SimpleStream(async () => { + ws.close(); + }); - const end = async (): Promise => { - ws.close(); - }; + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + writer.push = async (item: any): Promise => { + ws.send(item); + }; - return { writer: { push, end }, init }; + return { writer, init }; }; diff --git a/src/index.ts b/src/index.ts index a0890f1..df6eba0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,6 @@ import { Store } from "n3"; import { getArgs } from "./args"; import { load_store, LOG } from "./util"; - -export * from "./connectors"; - import path from "path"; import { RDF } from "@treecg/types"; import { ChannelFactory, Conn, JsOntology } from "./connectors"; @@ -11,125 +8,131 @@ import { Quad, Term } from "@rdfjs/types"; import { extractShapes, Shapes } from "rdf-lens"; +export * from "./connectors"; + +/* eslint-disable @typescript-eslint/no-explicit-any */ + function safeJoin(a: string, b: string) { - if (b.startsWith("/")) { - return b; - } - return path.join(a, b); + if (b.startsWith("/")) { + return b; + } + return path.join(a, b); } type Processor = { - ty: Term; - file: string; - location: string; - func: string; - mapping: { parameters: { parameter: string; position: number }[] }; + ty: Term; + file: string; + location: string; + func: string; + mapping: { parameters: { parameter: string; position: number }[] }; }; export type Source = - | { type: "remote"; location: string } - | { - type: "memory"; - value: string; - baseIRI: string; - }; + | { type: "remote"; location: string } + | { + type: "memory"; + value: string; + baseIRI: string; + }; export type Extracted = { - processors: Processor[]; - quads: Quad[]; - shapes: Shapes; + processors: Processor[]; + quads: Quad[]; + shapes: Shapes; }; export async function extractProcessors( - source: Source, - apply?: { [label: string]: (item: any) => any }, + source: Source, + apply?: { [label: string]: (item: any) => any }, ): Promise { - const store = new Store(); - await load_store(source, store); - const quads = store.getQuads(null, null, null, null); - - const config = extractShapes(quads, apply); - const subjects = quads - .filter( - (x) => - x.predicate.equals(RDF.terms.type) && - x.object.equals(JsOntology.JsProcess), - ) - .map((x) => x.subject); - const processorLens = config.lenses[JsOntology.JsProcess.value]; - const processors = subjects.map((id) => processorLens.execute({ id, quads })); - return { processors, quads, shapes: config }; + const store = new Store(); + await load_store(source, store); + const quads = store.getQuads(null, null, null, null); + + const config = extractShapes(quads, apply); + const subjects = quads + .filter( + (x) => + x.predicate.equals(RDF.terms.type) && + x.object.equals(JsOntology.JsProcess), + ) + .map((x) => x.subject); + const processorLens = config.lenses[JsOntology.JsProcess.value]; + const processors = subjects.map((id) => + processorLens.execute({ id, quads }), + ); + return { processors, quads, shapes: config }; } export function extractSteps( - proc: Processor, - quads: Quad[], - config: Shapes, + proc: Processor, + quads: Quad[], + config: Shapes, ): any[][] { - const out: any[][] = []; + const out: any[][] = []; - const subjects = quads - .filter( - (x) => x.predicate.equals(RDF.terms.type) && x.object.equals(proc.ty), - ) - .map((x) => x.subject); - const processorLens = config.lenses[proc.ty.value]; + const subjects = quads + .filter( + (x) => + x.predicate.equals(RDF.terms.type) && x.object.equals(proc.ty), + ) + .map((x) => x.subject); + const processorLens = config.lenses[proc.ty.value]; - const fields = proc.mapping.parameters; + const fields = proc.mapping.parameters; - for (let id of subjects) { - const obj = processorLens.execute({ id, quads }); - const functionArgs = new Array(fields.length); + for (const id of subjects) { + const obj = processorLens.execute({ id, quads }); + const functionArgs = new Array(fields.length); - for (let field of fields) { - functionArgs[field.position] = obj[field.parameter]; - } + for (const field of fields) { + functionArgs[field.position] = obj[field.parameter]; + } - out.push(functionArgs); - } + out.push(functionArgs); + } - return out; + return out; } export async function jsRunner() { - const args = getArgs(); - const cwd = process.cwd(); - - const source: Source = { - location: safeJoin(cwd, args.input).replaceAll("\\", "/"), - type: "remote", - }; - - const factory = new ChannelFactory(); - /// Small hack, if something is extracted from these types, that should be converted to a reader/writer - const apply: { [label: string]: (item: any) => any } = {}; - apply[Conn.ReaderChannel.value] = factory.createReader.bind(factory); - apply[Conn.WriterChannel.value] = factory.createWriter.bind(factory); - - const { - processors, - quads, - shapes: config, - } = await extractProcessors(source, apply); - - LOG.main("Found %d processors", processors.length); - - const starts = []; - for (let proc of processors) { - const argss = extractSteps(proc, quads, config); - const jsProgram = await import("file://" + proc.file); - process.chdir(proc.location); - for (let args of argss) { - starts.push(await jsProgram[proc.func](...args)); + const args = getArgs(); + const cwd = process.cwd(); + + const source: Source = { + location: safeJoin(cwd, args.input).replaceAll("\\", "/"), + type: "remote", + }; + + const factory = new ChannelFactory(); + /// Small hack, if something is extracted from these types, that should be converted to a reader/writer + const apply: { [label: string]: (item: any) => any } = {}; + apply[Conn.ReaderChannel.value] = factory.createReader.bind(factory); + apply[Conn.WriterChannel.value] = factory.createWriter.bind(factory); + + const { + processors, + quads, + shapes: config, + } = await extractProcessors(source, apply); + + LOG.main("Found %d processors", processors.length); + + const starts = []; + for (const proc of processors) { + const argss = extractSteps(proc, quads, config); + const jsProgram = await import("file://" + proc.file); + process.chdir(proc.location); + for (const args of argss) { + starts.push(await jsProgram[proc.func](...args)); + } } - } - await factory.init(); + await factory.init(); - for (let s of starts) { - if (s && typeof s === "function") { - s(); + for (const s of starts) { + if (s && typeof s === "function") { + s(); + } } - } } - diff --git a/src/util.ts b/src/util.ts index dff7b44..ed355f8 100644 --- a/src/util.ts +++ b/src/util.ts @@ -10,39 +10,39 @@ import { Quad, Term } from "@rdfjs/types"; import debug from "debug"; export const LOG = (function () { - const main = debug("js-runner"); - const channel = main.extend("channel"); - const util = main.extend("util"); + const main = debug("js-runner"); + const channel = main.extend("channel"); + const util = main.extend("util"); - return { main, channel, util }; + return { main, channel, util }; })(); export function toArray(stream: stream.Readable): Promise { - const output: T[] = []; - return new Promise((res, rej) => { - stream.on("data", (x) => output.push(x)); - stream.on("end", () => res(output)); - stream.on("close", () => res(output)); - stream.on("error", rej); - }); + const output: T[] = []; + return new Promise((res, rej) => { + stream.on("data", (x) => output.push(x)); + stream.on("end", () => res(output)); + stream.on("close", () => res(output)); + stream.on("error", rej); + }); } export const OWL = createUriAndTermNamespace( - "http://www.w3.org/2002/07/owl#", - "imports", + "http://www.w3.org/2002/07/owl#", + "imports", ); export const CONN2 = createUriAndTermNamespace( - "https://w3id.org/conn#", - "install", - "build", - "GitInstall", - "LocalInstall", - "url", - "procFile", - "path", - "EnvVariable", - "envKey", - "envDefault", + "https://w3id.org/conn#", + "install", + "build", + "GitInstall", + "LocalInstall", + "url", + "procFile", + "path", + "EnvVariable", + "envKey", + "envDefault", ); export const { namedNode, literal } = DataFactory; @@ -51,66 +51,70 @@ export type Keyed = { [Key in keyof T]: Term | undefined }; export type Map = (value: V, key: K, item: T) => O; async function get_readstream(location: string): Promise { - if (location.startsWith("https")) { - return new Promise((res) => { - https.get(location, res); - }); - } else if (location.startsWith("http")) { - return new Promise((res) => { - http.get(location, res); - }); - } else { - return createReadStream(location); - } + if (location.startsWith("https")) { + return new Promise((res) => { + https.get(location, res); + }); + } else if (location.startsWith("http")) { + return new Promise((res) => { + http.get(location, res); + }); + } else { + return createReadStream(location); + } } export async function load_quads(location: string, baseIRI?: string) { - try { - LOG.util("Loading quads %s", location); - const parser = new StreamParser({ baseIRI: baseIRI || location }); - const rdfStream = await get_readstream(location); - rdfStream.pipe(parser); + try { + LOG.util("Loading quads %s", location); + const parser = new StreamParser({ baseIRI: baseIRI || location }); + const rdfStream = await get_readstream(location); + rdfStream.pipe(parser); - const quads: Quad[] = await toArray(parser); - return quads; - } catch (ex) { - console.error("Failed to load_quads", location, baseIRI); - console.error(ex); - return []; - } + const quads: Quad[] = await toArray(parser); + return quads; + } catch (ex) { + console.error("Failed to load_quads", location, baseIRI); + console.error(ex); + return []; + } } function load_memory_quads(value: string, baseIRI: string) { - const parser = new Parser({ baseIRI }); - return parser.parse(value); + const parser = new Parser({ baseIRI }); + return parser.parse(value); } const loaded = new Set(); export async function load_store( - location: Source, - store: Store, - recursive = true, + location: Source, + store: Store, + recursive = true, ) { - if (loaded.has(location)) return; - loaded.add(location); + if (loaded.has(location)) return; + loaded.add(location); - const quads = - location.type === "remote" - ? await load_quads(location.location) - : load_memory_quads(location.value, location.baseIRI); + const quads = + location.type === "remote" + ? await load_quads(location.location) + : load_memory_quads(location.value, location.baseIRI); - store.addQuads(quads); + store.addQuads(quads); - if (recursive) { - const loc = - location.type === "remote" ? location.location : location.baseIRI; - const other_imports = store.getObjects( - namedNode(loc), - OWL.terms.imports, - null, - ); - for (let other of other_imports) { - await load_store({ location: other.value, type: "remote" }, store, true); + if (recursive) { + const loc = + location.type === "remote" ? location.location : location.baseIRI; + const other_imports = store.getObjects( + namedNode(loc), + OWL.terms.imports, + null, + ); + for (const other of other_imports) { + await load_store( + { location: other.value, type: "remote" }, + store, + true, + ); + } } - } }