From 762294002f7a309ccde397654b0e0762214356e1 Mon Sep 17 00:00:00 2001 From: Julian Rojas Date: Mon, 11 Mar 2024 22:00:53 +0100 Subject: [PATCH] Add support for executing produced SPARQL queries over HTTP --- processors.ttl | 6 ++++++ src/SPARQLIngest.ts | 20 ++++++++++++++++---- src/Utils.ts | 14 ++++++++++++++ tests/processors.test.ts | 4 +++- 4 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 src/Utils.ts diff --git a/processors.ttl b/processors.ttl index 286962b..876a1a3 100644 --- a/processors.ttl +++ b/processors.ttl @@ -80,6 +80,12 @@ js:SPARQLIngest a js:JsProcess; sh:name "transactionConfig"; sh:maxCount 1; sh:minCount 0; + ], [ + sh:path js:graphStoreUrl; + sh:datatype xsd:string; + sh:name "graphStoreUrl"; + sh:maxCount 1; + sh:minCount 0; ]. [ ] a sh:NodeShape; diff --git a/src/SPARQLIngest.ts b/src/SPARQLIngest.ts index 39d174f..ce4d76b 100644 --- a/src/SPARQLIngest.ts +++ b/src/SPARQLIngest.ts @@ -3,6 +3,7 @@ import { SDS } from "@treecg/types"; import { Store, Parser, DataFactory } from "n3"; import { CREATE, UPDATE, DELETE } from "./SPARQLQueries"; import { Quad_Subject, Term } from "@rdfjs/types"; +import { doSPARQLRequest } from "./Utils"; const { quad, namedNode } = DataFactory; @@ -24,6 +25,7 @@ export type IngestConfig = { changeSemantics?: ChangeSemantics; targetNamedGraph?: string; transactionConfig?: TransactionConfig; + graphStoreUrl?: string; }; export type TransactionMember = { @@ -35,7 +37,7 @@ export type TransactionMember = { export async function sparqlIngest( memberStream: Stream, config: IngestConfig, - sparqlWriter: Writer + sparqlWriter?: Writer ) { let transactionMembers: TransactionMember[] = []; @@ -146,14 +148,24 @@ export async function sparqlIngest( // Execute the update query if (query) { - await sparqlWriter.push(query); - } + const outputPromises = []; + if (sparqlWriter) { + outputPromises.push(sparqlWriter.push(query)); + } + if (config.graphStoreUrl) { + outputPromises.push(doSPARQLRequest(query, config.graphStoreUrl)); + } + + await Promise.all(outputPromises); + } } else { throw new Error(`[sparqlIngest] No member IRI found in received RDF data: \n${rawQuads}`); } }); - memberStream.on("end", async () => await sparqlWriter.end()); + if (sparqlWriter) { + memberStream.on("end", async () => await sparqlWriter.end()); + } } function verifyTransaction(stores: Store[], transactionIdPath: string, transactionId: Term): void { diff --git a/src/Utils.ts b/src/Utils.ts new file mode 100644 index 0000000..24e15f0 --- /dev/null +++ b/src/Utils.ts @@ -0,0 +1,14 @@ + +export async function doSPARQLRequest(query: string, url: string): Promise { + const res = await fetch(url, { + method: "POST", + headers: { + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + }, + body: `query=${encodeURIComponent(query)}` + }); + + if (!res.ok) { + throw new Error(`HTTP request failed with code ${res.status} and message: \n${await res.text()}`); + } +} \ No newline at end of file diff --git a/tests/processors.test.ts b/tests/processors.test.ts index 8c48943..405f433 100644 --- a/tests/processors.test.ts +++ b/tests/processors.test.ts @@ -40,7 +40,8 @@ describe("Tests for SPARQL ingest processor", async () => { js:transactionConfig [ js:transactionIdPath "http://ex.org/transactionId"; js:transactionEndPath "http://ex.org/transactionEnd" - ] + ]; + js:graphStoreUrl "http://ex.org/myGraphStore" ]; js:sparqlWriter . `; @@ -73,6 +74,7 @@ describe("Tests for SPARQL ingest processor", async () => { expect(ingestConfig.targetNamedGraph).toBe("http://ex.org/myGraph"); expect(ingestConfig.transactionConfig.transactionIdPath).toBe("http://ex.org/transactionId"); expect(ingestConfig.transactionConfig.transactionEndPath).toBe("http://ex.org/transactionEnd"); + expect(ingestConfig.graphStoreUrl).toBe("http://ex.org/myGraphStore"); testWriter(sparqlWriter); await checkProc(env.file, env.func);