Skip to content

Commit

Permalink
Add support for executing produced SPARQL queries over HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 committed Mar 11, 2024
1 parent c2ebb33 commit 7622940
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
6 changes: 6 additions & 0 deletions processors.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ js:SPARQLIngest a js:JsProcess;
sh:name "transactionConfig";
sh:maxCount 1;
sh:minCount 0;
], [
sh:path js:graphStoreUrl;
sh:datatype xsd:string;
sh:name "graphStoreUrl";
sh:maxCount 1;
sh:minCount 0;
].

[ ] a sh:NodeShape;
Expand Down
20 changes: 16 additions & 4 deletions src/SPARQLIngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SDS } from "@treecg/types";
import { Store, Parser, DataFactory } from "n3";
import { CREATE, UPDATE, DELETE } from "./SPARQLQueries";
import { Quad_Subject, Term } from "@rdfjs/types";
import { doSPARQLRequest } from "./Utils";

const { quad, namedNode } = DataFactory;

Expand All @@ -24,6 +25,7 @@ export type IngestConfig = {
changeSemantics?: ChangeSemantics;
targetNamedGraph?: string;
transactionConfig?: TransactionConfig;
graphStoreUrl?: string;
};

export type TransactionMember = {
Expand All @@ -35,7 +37,7 @@ export type TransactionMember = {
export async function sparqlIngest(
memberStream: Stream<string>,
config: IngestConfig,
sparqlWriter: Writer<string>
sparqlWriter?: Writer<string>
) {
let transactionMembers: TransactionMember[] = [];

Expand Down Expand Up @@ -146,14 +148,24 @@ export async function sparqlIngest(

// Execute the update query
if (query) {
await sparqlWriter.push(query);
}
const outputPromises = [];
if (sparqlWriter) {
outputPromises.push(sparqlWriter.push(query));
}
if (config.graphStoreUrl) {
outputPromises.push(doSPARQLRequest(query, config.graphStoreUrl));
}

await Promise.all(outputPromises);
}
} else {
throw new Error(`[sparqlIngest] No member IRI found in received RDF data: \n${rawQuads}`);
}
});

memberStream.on("end", async () => await sparqlWriter.end());
if (sparqlWriter) {
memberStream.on("end", async () => await sparqlWriter.end());
}
}

function verifyTransaction(stores: Store[], transactionIdPath: string, transactionId: Term): void {
Expand Down
14 changes: 14 additions & 0 deletions src/Utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

export async function doSPARQLRequest(query: string, url: string): Promise<void> {
const res = await fetch(url, {
method: "POST",
headers: {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
},
body: `query=${encodeURIComponent(query)}`
});

if (!res.ok) {
throw new Error(`HTTP request failed with code ${res.status} and message: \n${await res.text()}`);
}
}
4 changes: 3 additions & 1 deletion tests/processors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ describe("Tests for SPARQL ingest processor", async () => {
js:transactionConfig [
js:transactionIdPath "http://ex.org/transactionId";
js:transactionEndPath "http://ex.org/transactionEnd"
]
];
js:graphStoreUrl "http://ex.org/myGraphStore"
];
js:sparqlWriter <jw>.
`;
Expand Down Expand Up @@ -73,6 +74,7 @@ describe("Tests for SPARQL ingest processor", async () => {
expect(ingestConfig.targetNamedGraph).toBe("http://ex.org/myGraph");
expect(ingestConfig.transactionConfig.transactionIdPath).toBe("http://ex.org/transactionId");
expect(ingestConfig.transactionConfig.transactionEndPath).toBe("http://ex.org/transactionEnd");
expect(ingestConfig.graphStoreUrl).toBe("http://ex.org/myGraphStore");
testWriter(sparqlWriter);

await checkProc(env.file, env.func);
Expand Down

0 comments on commit 7622940

Please sign in to comment.