Skip to content

Commit

Permalink
Add support for transactions with ldes:transactionId and ldes:isLastO…
Browse files Browse the repository at this point in the history
…fTransaction
  • Loading branch information
julianrojas87 committed Mar 11, 2024
1 parent 505c06e commit 5057779
Show file tree
Hide file tree
Showing 5 changed files with 437 additions and 120 deletions.
Binary file modified bun.lockb
Binary file not shown.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
"url": "git+https://github.com/julianrojas87/sparql-ingest-processor-ts.git"
},
"dependencies": {
"@treecg/types": "^0.4.5",
"@treecg/types": "^0.4.6",
"n3": "^1.17.2"
},
"devDependencies": {
"@ajuvercr/js-runner": "0.1.14",
"@ajuvercr/js-runner": "^0.1.17",
"@jest/globals": "^29.7.0",
"@rdfjs/types": "^1.1.0",
"@types/bun": "latest",
"@types/n3": "^1.16.4",
"memory-level": "^1.0.0",
"quadstore": "^13.0.1",
"quadstore-comunica": "^4.0.1",
"typescript": "^5.3.3"
"typescript": "^5.4.2"
}
}
202 changes: 177 additions & 25 deletions src/SPARQLIngest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { Stream, Writer } from "@ajuvercr/js-runner";
import { SDS } from "@treecg/types";
import { Store, Parser } from "n3";
import { LDES, SDS } from "@treecg/types";
import { Store, Parser, DataFactory } from "n3";
import { CREATE, UPDATE, DELETE } from "./SPARQLQueries";
import { Quad_Subject, Term } from "@rdfjs/types";

const { quad, namedNode } = DataFactory;

export type ChangeSemantics = {
changeTypePath: string;
Expand All @@ -18,13 +21,20 @@ export type IngestConfig = {
transactionIdPath?: string;
};

export type TransactionMember = {
memberId: string,
transactionId: string,
store: Store
}

export async function sparqlIngest(
memberStream: Stream<string>,
config: IngestConfig,
sparqlWriter: Writer<string>
) {
let transactionMembers: TransactionMember[] = [];

memberStream.data(async rawQuads => {
// TODO handle transaction-based bulk writes
const quads = new Parser().parse(rawQuads);
const store = new Store(quads);

Expand All @@ -36,43 +46,185 @@ export async function sparqlIngest(
store.removeQuads(store.getQuads(null, SDS.stream, null, null));
store.removeQuads(store.getQuads(null, SDS.payload, null, null));

// Determine if we have a named graph (either explicitly configure or as the member itself)
let ng = undefined;
if (config.memberIsGraph) {
ng = memberIRI.value;
} else if (config.targetNamedGraph) {
ng = config.targetNamedGraph;
// Find if this member is part of a transaction
const transactionId = store.getObjects(null, LDES.custom("transactionId"), null)[0];
if (transactionId) {
// Remove transactionId property
store.removeQuad(quad(
<Quad_Subject>memberIRI,
namedNode(LDES.custom("transactionId")),
transactionId
));
// See if this is a finishing, new or ongoing transaction
const isLastOfTransaction = store.getObjects(null, LDES.custom("isLastOfTransaction"), null)[0];

if (isLastOfTransaction) {
console.log(`[sparqlIngest] Last member of ${transactionId.value} received!`);
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), transactionId);
// Remove is-last-of-transaction flag
store.removeQuad(quad(
<Quad_Subject>memberIRI,
namedNode(LDES.custom("isLastOfTransaction")),
isLastOfTransaction
));
// We copy all previous member quads into the current store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
} else if (transactionMembers.length > 0) {
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), transactionId);
// Is an ongoing transaction, so we add this member's quads into the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
} else {
console.log(`[sparqlIngest] New transaction ${transactionId.value} started!`);
if (transactionMembers.length > 0)
throw new Error(`[sparqlIngest] Received new transaction ${transactionId.value}, `
+ `but older transaction ${transactionMembers[0].transactionId} hasn't been finalized `);
// Is a new transaction, add it to the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
}
}

// Variable that will hold the full query to be executed
let query;

if (config.changeSemantics) {
// Extract and remove change type value
const ctv = store.getQuads(null, config.changeSemantics.changeTypePath, null, null)[0];
if (ctv) {
if (transactionMembers.length > 0) {
query = createTransactionQueries(transactionMembers, config);
// Clean up transaction stores
transactionMembers = [];
} else {
// Determine if we have a named graph (either explicitly configure or as the member itself)
const ng = getNamedGraphIfAny(memberIRI, config.memberIsGraph, config.targetNamedGraph);
// Get the type of change
const ctv = store.getQuads(null, config.changeSemantics!.changeTypePath, null, null)[0];
// Remove change type quad from store
store.removeQuad(ctv);
// Assemble corresponding SPARQL UPDATE query
if (ctv.object.value === config.changeSemantics.createValue) {
query = CREATE(store, ng);
} else if (ctv.object.value === config.changeSemantics.updateValue) {
query = UPDATE(store, ng);
} else if (ctv.object.value === config.changeSemantics.deleteValue) {
query = DELETE(store, [memberIRI.value], config.memberShapes, ng);
} else {
throw new Error(`[sparqlIngest] Unrecognized change type value: ${ctv.object.value}`);
}
}

if (ctv.object.value === config.changeSemantics.createValue) {
query = CREATE(store, ng);
} else if (ctv.object.value === config.changeSemantics.updateValue) {
query = UPDATE(store, ng);
} else if (ctv.object.value === config.changeSemantics.deleteValue) {
query = DELETE(store, memberIRI.value, config.memberShapes, ng);
} else {
if (transactionMembers.length > 0) {
transactionMembers.forEach(ts => store.addQuads(ts.store.getQuads(null, null, null, null)));
query = UPDATE(store, config.targetNamedGraph);
} else {
throw new Error(`Unrecognized change type value: ${ctv.object.value}`);
// Determine if we have a named graph (either explicitly configure or as the member itself)
const ng = getNamedGraphIfAny(memberIRI, config.memberIsGraph, config.targetNamedGraph);
// No change semantics are provided so we do a DELETE/INSERT query by default
query = UPDATE(store, ng);
}
} else {
// If no change semantics are provided we do a DELETE/INSERT query by default
query = UPDATE(store, ng);
}

// Execute the update query
await sparqlWriter.push(query);
if (query) {
await sparqlWriter.push(query);
}
} else {
throw new Error(`No member IRI found in received RDF data: \n${rawQuads}`);
throw new Error(`[sparqlIngest] No member IRI found in received RDF data: \n${rawQuads}`);
}
});

memberStream.on("end", async () => await sparqlWriter.end());
}

function verifyTransaction(stores: Store[], transactionId: Term): void {
for (const store of stores) {
// Get all transaction IDs
const tIds = store.getObjects(null, LDES.custom("transactionId"), null);
for (const tid of tIds) {
if (!tid.equals(transactionId)) {
throw new Error(`[sparqlIngest] Received non-matching transaction ID ${transactionId.value} `
+ `with previous transaction: ${tid.value}`);
}
}
}
}

function getNamedGraphIfAny(
memberIRI: Term,
memberIsGraph: boolean,
targetNamedGraph?: string
): string | undefined {
let ng;
if (memberIsGraph) {
ng = memberIRI.value;
} else if (targetNamedGraph) {
ng = targetNamedGraph;
}
return ng;
}

function createTransactionQueries(
transactionMembers: TransactionMember[],
config: IngestConfig,

): string {
console.log(`[sparqlIngest] Creating multi-operation SPARQL UPDATE query for ${transactionMembers.length}`
+ ` members of transaction ${transactionMembers[0].transactionId}`);
// This is a transaction query, we need to deal with possibly multiple types of queries
const createStore = new Store();
const updateStore = new Store();
const deleteStore = new Store();
const deleteMembers: string[] = [];

const transactionQueryBuilder: string[] = [];

for (const tsm of transactionMembers) {
const ctv = tsm.store.getQuads(null, config.changeSemantics!.changeTypePath, null, null)[0];
// Remove change type quad from store
tsm.store.removeQuad(ctv);

if (ctv.object.value === config.changeSemantics!.createValue) {
createStore.addQuads(tsm.store.getQuads(null, null, null, null));
} else if (ctv.object.value === config.changeSemantics!.updateValue) {
updateStore.addQuads(tsm.store.getQuads(null, null, null, null));
} else if (ctv.object.value === config.changeSemantics!.deleteValue) {
deleteStore.addQuads(tsm.store.getQuads(null, null, null, null));
deleteMembers.push(tsm.memberId);
} else {
throw new Error(`[sparqlIngest] Unrecognized change type value: ${ctv.object.value}`);
}
}

// TODO: Handle case of members as Named Graphs

// Build multi-operation SPARQL query
if (createStore.size > 0) {
transactionQueryBuilder.push(CREATE(createStore, config.targetNamedGraph));
}
if (updateStore.size > 0) {
transactionQueryBuilder.push(UPDATE(updateStore, config.targetNamedGraph));
}
if (updateStore.size > 0) {
transactionQueryBuilder.push(DELETE(
deleteStore,
deleteMembers,
config.memberShapes,
config.targetNamedGraph
));
}

return transactionQueryBuilder.join(";\n");
}
49 changes: 36 additions & 13 deletions src/SPARQLQueries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Quad_Object } from "@rdfjs/types";
import { RDF, SHACL } from "@treecg/types";
import { Writer as N3Writer, Store, Parser } from "n3"

export const CREATE = (store: Store, namedGraph?: string): string => {
export const CREATE = (store: Store, namedGraph?: string, multipleNamedGraphs?: boolean): string => {
// TODO: Handle case of multiple members being Named Graphs
const content = new N3Writer().quadsToString(store.getQuads(null, null, null, null));
return `
INSERT DATA {
Expand All @@ -11,41 +12,63 @@ export const CREATE = (store: Store, namedGraph?: string): string => {
`;
};

export const UPDATE = (store: Store, namedGraph?: string): string => {
const formattedQuery = formatQuery(store);
export const UPDATE = (store: Store, namedGraph?: string, multipleNamedGraphs?: boolean): string => {
// TODO: Handle case of multiple members being Named Graphs
const formattedQuery = formatDeleteQuery(store);
const content = new N3Writer().quadsToString(store.getQuads(null, null, null, null));
return `
${namedGraph ? `WITH <${namedGraph}>` : ""}
DELETE {
${formattedQuery}
${formattedQuery[0]}
}
INSERT {
${content}
}
WHERE {
${formattedQuery}
${formattedQuery[0]}
}
`;
};

export const DELETE = (store: Store, memberIRI: string, memberShapes?: string[], namedGraph?: string): string => {
const formattedQuery = formatQuery(store, memberIRI, memberShapes);
export const DELETE = (
store: Store,
memberIRIs: string[],
memberShapes?: string[],
namedGraph?: string,
multipleNamedGraphs?: boolean
): string => {
// TODO: Handle case of multiple members being Named Graphs
const deleteBuilder = [];
const whereBuilder = [];

let indexStart = 0;
for (const memberIRI of memberIRIs) {
const formatted = formatDeleteQuery(store, memberIRI, memberShapes, indexStart);
deleteBuilder.push(formatted.length > 1 ? formatted[1] : formatted[0]);
whereBuilder.push(formatted[0]);
indexStart++;
}

return `
${namedGraph ? `WITH <${namedGraph}>` : ""}
DELETE {
${formattedQuery.length > 1 ? formattedQuery[1] : formattedQuery[0]}
} WHERE {
${formattedQuery[0]}
${deleteBuilder.join("\n")}
} WHERE {
${whereBuilder.join("\n")}
}
`;
}

function formatQuery(memberStore: Store, memberIRI?: string, memberShapes?: string[]): string[] {
function formatDeleteQuery(
memberStore: Store,
memberIRI?: string,
memberShapes?: string[],
indexStart: number = 0
): string[] {
const subjectSet = new Set<string>();
const queryBuilder: string[] = [];
const formattedQueries: string[] = [];
let i = 0;
let i = indexStart;

// Check if one or more member shapes were given.
// If not, we assume that all properties of the member are present
Expand Down Expand Up @@ -102,7 +125,7 @@ function formatQuery(memberStore: Store, memberIRI?: string, memberShapes?: stri
const deleteQueryBuilder: string[] = [];
deleteQueryBuilder.push(`<${memberIRI}> ?p_${i} ?o_${i}.`);
i++;

// Iterate over every declared member shape
shapeIndex.forEach(mshStore => {
const propShapes = mshStore.getObjects(null, SHACL.property, null);
Expand Down
Loading

0 comments on commit 5057779

Please sign in to comment.