diff --git a/README.md b/README.md index 2947af3..4c01463 100644 --- a/README.md +++ b/README.md @@ -110,13 +110,14 @@ Usage: substreams-sink-websockets [options] Substreams Sink Websockets Options: - --public-key (required) Ed25519 public key (env: PUBLIC_KEY) - --port Server listen on HTTP port (default: 3000, env: PORT) - --hostname Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME) - --sqlite-filename SQLite database filename (default: "db.sqlite", env: SQLITE_FILENAME) - --verbose Enable verbose logging (default: false, env: VERBOSE) - -V, --version output the version number - -h, --help display help for command + --public-key (required) Ed25519 public key (comma-separated for multiple public keys) (env: PUBLIC_KEY) + --port Server listen on HTTP port (default: 3000, env: PORT) + --hostname Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME) + --sqlite-filename SQLite database filename (default: "db.sqlite", env: SQLITE_FILENAME) + --verbose Enable verbose logging (default: false, env: VERBOSE) + --recent-messages-limit Limit recent messages (default: 50, env: RECENT_MESSAGES_LIMIT) + -V, --version output the version number + -h, --help display help for command ``` ## Docker environment diff --git a/index.ts b/index.ts index 178fdf1..9ca988b 100644 --- a/index.ts +++ b/index.ts @@ -5,13 +5,13 @@ import GET from "./src/fetch/GET.js"; import POST from "./src/fetch/POST.js"; import * as sqlite from "./src/sqlite.js"; import * as websocket from "./src/websocket/index.js"; -import { HOSTNAME, PORT, PUBLIC_KEY, SQLITE_FILENAME } from "./src/config.js"; +import { HOSTNAME, PORT, PUBLIC_KEYS, SQLITE_FILENAME } from "./src/config.js"; import { logger } from "./src/logger.js"; export const db = sqlite.createDb(SQLITE_FILENAME); logger.info(`Server listening on http://${HOSTNAME}:${PORT}`); -logger.info("Verifying with PUBLIC_KEY", PUBLIC_KEY); +logger.info("Verifying with PUBLIC_KEYS", PUBLIC_KEYS.join(",")); logger.info("Reading SQLITE_FILENAME", SQLITE_FILENAME); export interface ServerWebSocketData { diff --git a/package.json b/package.json index da54087..cb08c8e 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,12 @@ { "private": true, - "version": "0.1.9", + "version": "0.1.10", "description": "Substreams Sink Websockets", "name": "substreams-sink-websockets", "homepage": "https://github.com/pinax-network/substreams-sink-websockets", "type": "module", "scripts": { - "start": "bun run index.ts", + "start": "bun run index.ts --help", "test": "bun test", "build": "bun build --compile ./index.ts --outfile substreams-sink-websockets", "dev": "bun run --watch index.ts" @@ -16,6 +16,7 @@ "dotenv": "latest", "openapi3-ts": "latest", "prom-client": "latest", + "substreams-sink-webhook": "^0.7.2", "tslog": "latest", "tweetnacl": "latest" }, diff --git a/src/config.ts b/src/config.ts index 8238b9b..200ee3e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -15,7 +15,7 @@ const opts = new Command() .name(pkg.name) .description(pkg.description) .showHelpAfterError() - .addOption(new Option("--public-key ", "(required) Ed25519 public key").env("PUBLIC_KEY")) + .addOption(new Option("--public-key ", "(required) Ed25519 public key (comma-separated for multiple public keys)").env("PUBLIC_KEY")) .addOption(new Option("--port ", "Server listen on HTTP port").default(DEFAULT_PORT).env("PORT")) .addOption(new Option("--hostname ", "Server listen on HTTP hostname").default(DEFAULT_HOSTNAME).env("HOSTNAME")) .addOption(new Option("--sqlite-filename ", "SQLite database filename").default(DEFAULT_SQLITE_FILENAME).env("SQLITE_FILENAME")) @@ -25,7 +25,7 @@ const opts = new Command() .parse(process.argv).opts(); // export options -export const PUBLIC_KEY: string = opts.publicKey; +export const PUBLIC_KEYS: string[] = opts.publicKey?.split(","); export const PORT = Number(opts.port); export const HOSTNAME: string = opts.hostname export const SQLITE_FILENAME: string = opts.sqliteFilename; @@ -33,6 +33,8 @@ export const VERBOSE: boolean = opts.verbose === "true" ? true : false; export const RECENT_MESSAGES_LIMIT: number = Number(opts.recentMessagesLimit); // validate required options -if (!PUBLIC_KEY) throw new Error("PUBLIC_KEY is required"); -if (Buffer.from(PUBLIC_KEY, "hex").length !== 32) throw new Error("PUBLIC_KEY must be a 32 byte hex string"); +if (!PUBLIC_KEYS.length) throw new Error("PUBLIC_KEY is required"); +for ( const publicKey of PUBLIC_KEYS ) { + if (Buffer.from(publicKey, "hex").length !== 32) throw new Error("PUBLIC_KEY must be a 32 byte hex string"); +} if (!Number.isInteger(PORT)) throw new Error("PORT must be an integer"); diff --git a/src/fetch/POST.ts b/src/fetch/POST.ts index 914b938..fd5902a 100644 --- a/src/fetch/POST.ts +++ b/src/fetch/POST.ts @@ -2,92 +2,72 @@ import * as prometheus from "../prometheus.js"; import * as sqlite from "../sqlite.js"; import { db } from "../../index.js"; import { logger } from "../logger.js"; -import { verify } from "../verify.js"; -import { PUBLIC_KEY } from "../config.js"; import { Server } from "bun"; import { toText } from "./cors.js"; import { insertMessages } from "./messages.js"; +import { signatureEd25519 } from "../webhook/singatureEd25519.js"; +import { BodySchema } from "substreams-sink-webhook/auth"; export default async function (req: Request, server: Server) { - // get headers and body from POST request - const timestamp = req.headers.get("x-signature-timestamp"); - const signature = req.headers.get("x-signature-ed25519"); - const body = await req.text(); - logger.info('POST', {timestamp, signature, body}); + // validate Ed25519 signature + const text = await req.text(); + const signatureResult = await signatureEd25519(req, text); + if ("error" in signatureResult) return signatureResult.error; - // validate request - try { - if (!timestamp) throw new Error("missing required \'timestamp\' in headers"); - if (!signature) throw new Error("missing required \'signature\' in headers"); - if (!body) throw new Error("missing body"); - } catch (e) { - logger.error(e); - prometheus.webhook_message_received_errors.inc(1); - return toText(e.message, 400 ); - } - - // verify request - const msg = Buffer.from(timestamp + body); - const isVerified = verify(msg, signature, PUBLIC_KEY); - if (!isVerified) { - prometheus.webhook_message_received_errors.inc(1); - return toText("invalid request signature", 401 ); - } - const json = JSON.parse(body); + // parse POST body payload + try { + // prometheus.requests.inc(); + const body = BodySchema.parse(JSON.parse(text)); - // Webhook handshake (not WebSocket related) - if (json?.message == "PING") { - const message = JSON.parse(body).message; - logger.info('PING WebHook handshake', {message}); - return toText("OK"); + // PING + if ("message" in body) { + if (body.message === "PING") return toText("OK"); + return toText("invalid body", 400); } - // Get data from Substreams metadata - const { clock, manifest, session } = json; - const { moduleHash, chain } = manifest ?? {}; - const { traceId } = session ?? {}; // validate POST request - try { - if (!clock) throw new Error("missing required \'clock\' in body"); - if (!manifest) throw new Error("missing required \'manifest\' in body"); - if (!session) throw new Error("missing required \'session\' in body"); - if (!chain) throw new Error("missing required \'chain\' in body.manifest"); - if (!moduleHash) throw new Error("missing required \'moduleHash\' in body.manifest"); - if (!traceId) throw new Error("missing required \'traceId\' in body.session"); - } catch (e) { - logger.error(e); - prometheus.webhook_message_received_errors.inc(1); - return toText(e.message, 400 ); - } + if ("data" in body) { + // Get data from Substreams metadata + const { clock, manifest, session } = body; + const { moduleHash, chain } = manifest ?? {}; + const { traceId } = session ?? {}; + const { timestamp } = clock; - // publish message to subscribers - const bytes = server.publish(moduleHash, body); - logger.info('server.publish', {bytes, block: clock.number, timestamp: clock.timestamp, moduleHash}); + // publish message to subscribers + const bytes = server.publish(moduleHash, text); + logger.info('server.publish', {bytes, block: clock.number, timestamp: clock.timestamp, moduleHash}); - // additional publish message specified by chain - server.publish(`${chain}:${moduleHash}`, body) + // additional publish message specified by chain + server.publish(`${chain}:${moduleHash}`, text) - // Metrics for published messages - // response is: - // 0 if the message was dropped - // -1 if backpressure was applied - // or the number of bytes sent. - if ( bytes > 0 ) { - prometheus.publish_message_bytes.inc(bytes); - prometheus.publish_message.inc(1); - } - // Metrics for incoming WebHook - prometheus.webhook_message_received.labels({moduleHash, chain}).inc(1); - prometheus.webhook_trace_id.labels({traceId, chain}).inc(1); + // Metrics for published messages + // response is: + // 0 if the message was dropped + // -1 if backpressure was applied + // or the number of bytes sent. + if ( bytes > 0 ) { + prometheus.publish_message_bytes.inc(bytes); + prometheus.publish_message.inc(1); + } + // Metrics for incoming WebHook + prometheus.webhook_message_received.labels({moduleHash, chain}).inc(1); + prometheus.webhook_trace_id.labels({traceId, chain}).inc(1); - // Upsert moduleHash into SQLite DB - sqlite.replace(db, "chain", chain, timestamp); - sqlite.replace(db, "moduleHash", moduleHash, timestamp); - sqlite.replace(db, "moduleHashByChain", `${chain}:${moduleHash}`, timestamp); - sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp); - //Set timestamp as key to filter recent messages + // Upsert moduleHash into SQLite DB + sqlite.replace(db, "chain", chain, timestamp); + sqlite.replace(db, "moduleHash", moduleHash, timestamp); + sqlite.replace(db, "moduleHashByChain", `${chain}:${moduleHash}`, timestamp); + sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp); - insertMessages( db, traceId, JSON.stringify(json), chain ); + // Set timestamp as key to filter recent messages + insertMessages( db, traceId, text, chain ); - return toText("OK"); + return toText("OK"); + } + } catch (err) { + logger.error(err); + // prometheus.request_errors?.inc(); + prometheus.webhook_message_received_errors.inc(1); + return toText("invalid request", 400); + } } \ No newline at end of file diff --git a/src/fetch/messages.ts b/src/fetch/messages.ts index f1b19ff..b998691 100644 --- a/src/fetch/messages.ts +++ b/src/fetch/messages.ts @@ -2,7 +2,7 @@ import * as sqlite from "../sqlite.js"; import { DEFAULT_RECENT_MESSAGES_LIMIT, RECENT_MESSAGES_LIMIT } from "../config.js"; import Database from "bun:sqlite"; import { db } from "../../index.js"; -import { toJSON, toText } from "./cors.js"; +import { toJSON } from "./cors.js"; export function parseLimit(searchParams: URLSearchParams) { const value = searchParams.get("limit"); @@ -34,28 +34,28 @@ export function handleMessages(req: Request) { //console.log(messages) } -export function insertMessages(db: Database, traceId: string, timestamp: string, chain?: string) { +export function insertMessages(db: Database, traceId: string, text: string, chain?: string) { const dbLength = sqlite.count(db, "messages"); if (dbLength >= RECENT_MESSAGES_LIMIT) { let oldest = sqlite.selectAll(db, "messages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; // update messages - sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, text); sqlite.deleteRow(db, "messages", oldest.key); // update messagesByChain if (chain) { oldest = sqlite.selectAll(db, "messagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0]; - sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); + sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, text ); sqlite.deleteRow(db, "messagesByChain", `${oldest.key}`); } return; } // add messages if tables not full - sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp); + sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, text); - if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp ); + if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, text ); } export function selectMessages(db: Database, limit: number, sortBy: string, chain?: string, moduleHash?: string,) { diff --git a/src/result.ts b/src/result.ts new file mode 100644 index 0000000..1c404b4 --- /dev/null +++ b/src/result.ts @@ -0,0 +1,28 @@ +export type Result = OkResult | ErrResult; + +type OkResult

= { success: true } & (undefined extends P ? {} : { payload: P }); +type ErrResult = { success: false; error: E }; + +export function Ok< + P extends string | number | object | undefined = undefined, + R = P extends undefined ? OkResult : OkResult

+>(payload?: P): R { + if (payload !== undefined) { + return { success: true, payload } as R; + } + return { success: true } as R; +} + +export function Err(error: E): ErrResult { + return { success: false, error }; +} + +export function UnknownErr(err: unknown): ErrResult { + if (err instanceof Error) { + return Err(err); + } else if (typeof err === "string") { + return Err(new Error(err)); + } else { + return Err(new Error(JSON.stringify(err))); + } +} diff --git a/src/webhook/singatureEd25519.ts b/src/webhook/singatureEd25519.ts new file mode 100644 index 0000000..c91e8f7 --- /dev/null +++ b/src/webhook/singatureEd25519.ts @@ -0,0 +1,25 @@ +import { PUBLIC_KEYS } from "../config.js"; +import { toText } from "../fetch/cors.js"; +import { Err, Ok, Result } from "../result.js"; +import { cachedVerify } from "substreams-sink-webhook/auth"; + +export async function signatureEd25519(req: Request, text: string): Promise> { + const signature = req.headers.get("x-signature-ed25519"); + const expiry = req.headers.get("x-signature-ed25519-expiry"); + const publicKey = req.headers.get("x-signature-ed25519-public-key"); + + if (!signature) return Err(toText("missing required signature in headers", 400)); + if (!expiry) return Err(toText("missing required expiry in headers", 400)); + if (!publicKey) return Err(toText("missing required public key in headers", 400)); + if (!text) return Err(toText("missing body", 400)); + + if (!PUBLIC_KEYS.includes(publicKey)) { + return Err(toText("invalid public key", 401)); + } + + if (!cachedVerify(signature, Number(expiry), publicKey)) { + return Err(toText("invalid request signature", 401)); + } + + return Ok(); +}