Skip to content

Commit

Permalink
Make transactions configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 committed Mar 11, 2024
1 parent 5057779 commit 6adb826
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 61 deletions.
24 changes: 20 additions & 4 deletions processors.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ js:SPARQLIngest a js:JsProcess;
a fnom:PositionParameterMapping;
fnom:functionParameter "sparqlWriter";
fnom:implementationParameterPosition "2"^^xsd:integer;
];
]
].

[ ] a sh:NodeShape;
Expand Down Expand Up @@ -75,9 +75,9 @@ js:SPARQLIngest a js:JsProcess;
sh:maxCount 1;
sh:minCount 0;
], [
sh:path js:transactionIdPath;
sh:datatype xsd:string;
sh:name "transactionIdPath";
sh:path js:transactionConfig;
sh:class js:TransactionConfig;
sh:name "transactionConfig";
sh:maxCount 1;
sh:minCount 0;
].
Expand Down Expand Up @@ -108,4 +108,20 @@ js:SPARQLIngest a js:JsProcess;
sh:name "deleteValue";
sh:maxCount 1;
sh:minCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass js:TransactionConfig;
sh:property [
sh:path js:transactionIdPath;
sh:datatype xsd:string;
sh:name "transactionIdPath";
sh:maxCount 1;
sh:minCount 1;
], [
sh:path js:transactionEndPath;
sh:datatype xsd:string;
sh:name "transactionEndPath";
sh:maxCount 1;
sh:minCount 1;
].
113 changes: 60 additions & 53 deletions src/SPARQLIngest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Stream, Writer } from "@ajuvercr/js-runner";
import { LDES, SDS } from "@treecg/types";
import { SDS } from "@treecg/types";
import { Store, Parser, DataFactory } from "n3";
import { CREATE, UPDATE, DELETE } from "./SPARQLQueries";
import { Quad_Subject, Term } from "@rdfjs/types";
Expand All @@ -13,12 +13,17 @@ export type ChangeSemantics = {
deleteValue: string;
};

export type TransactionConfig = {
transactionIdPath: string;
transactionEndPath: string;
};

export type IngestConfig = {
memberIsGraph: boolean;
memberShapes?: string[];
changeSemantics?: ChangeSemantics;
targetNamedGraph?: string;
transactionIdPath?: string;
transactionConfig?: TransactionConfig;
};

export type TransactionMember = {
Expand Down Expand Up @@ -47,55 +52,57 @@ export async function sparqlIngest(
store.removeQuads(store.getQuads(null, SDS.payload, null, null));

// Find if this member is part of a transaction
const transactionId = store.getObjects(null, LDES.custom("transactionId"), null)[0];
if (transactionId) {
// Remove transactionId property
store.removeQuad(quad(
<Quad_Subject>memberIRI,
namedNode(LDES.custom("transactionId")),
transactionId
));
// See if this is a finishing, new or ongoing transaction
const isLastOfTransaction = store.getObjects(null, LDES.custom("isLastOfTransaction"), null)[0];

if (isLastOfTransaction) {
console.log(`[sparqlIngest] Last member of ${transactionId.value} received!`);
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), transactionId);
// Remove is-last-of-transaction flag
if (config.transactionConfig) {
const transactionId = store.getObjects(null, config.transactionConfig.transactionIdPath, null)[0];
if (transactionId) {
// Remove transactionId property
store.removeQuad(quad(
<Quad_Subject>memberIRI,
namedNode(LDES.custom("isLastOfTransaction")),
isLastOfTransaction
namedNode(config.transactionConfig.transactionIdPath),
transactionId
));
// We copy all previous member quads into the current store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
} else if (transactionMembers.length > 0) {
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), transactionId);
// Is an ongoing transaction, so we add this member's quads into the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
} else {
console.log(`[sparqlIngest] New transaction ${transactionId.value} started!`);
if (transactionMembers.length > 0)
throw new Error(`[sparqlIngest] Received new transaction ${transactionId.value}, `
+ `but older transaction ${transactionMembers[0].transactionId} hasn't been finalized `);
// Is a new transaction, add it to the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
// See if this is a finishing, new or ongoing transaction
const isLastOfTransaction = store.getObjects(null, config.transactionConfig.transactionEndPath, null)[0];

if (isLastOfTransaction) {
console.log(`[sparqlIngest] Last member of ${transactionId.value} received!`);
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), config.transactionConfig.transactionIdPath, transactionId);
// Remove is-last-of-transaction flag
store.removeQuad(quad(
<Quad_Subject>memberIRI,
namedNode(config.transactionConfig.transactionEndPath),
isLastOfTransaction
));
// We copy all previous member quads into the current store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
} else if (transactionMembers.length > 0) {
// Check this transaction is correct
verifyTransaction(transactionMembers.map(ts => ts.store), config.transactionConfig.transactionIdPath, transactionId);
// Is an ongoing transaction, so we add this member's quads into the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
} else {
console.log(`[sparqlIngest] New transaction ${transactionId.value} started!`);
if (transactionMembers.length > 0)
throw new Error(`[sparqlIngest] Received new transaction ${transactionId.value}, `
+ `but older transaction ${transactionMembers[0].transactionId} hasn't been finalized `);
// Is a new transaction, add it to the transaction store
transactionMembers.push({
memberId: memberIRI.value,
transactionId: transactionId.value,
store
});
return;
}
}
}

Expand Down Expand Up @@ -149,10 +156,10 @@ export async function sparqlIngest(
memberStream.on("end", async () => await sparqlWriter.end());
}

function verifyTransaction(stores: Store[], transactionId: Term): void {
function verifyTransaction(stores: Store[], transactionIdPath: string, transactionId: Term): void {
for (const store of stores) {
// Get all transaction IDs
const tIds = store.getObjects(null, LDES.custom("transactionId"), null);
const tIds = store.getObjects(null, transactionIdPath, null);
for (const tid of tIds) {
if (!tid.equals(transactionId)) {
throw new Error(`[sparqlIngest] Received non-matching transaction ID ${transactionId.value} `
Expand Down Expand Up @@ -219,9 +226,9 @@ function createTransactionQueries(
}
if (updateStore.size > 0) {
transactionQueryBuilder.push(DELETE(
deleteStore,
deleteMembers,
config.memberShapes,
deleteStore,
deleteMembers,
config.memberShapes,
config.targetNamedGraph
));
}
Expand Down
6 changes: 5 additions & 1 deletion tests/SPARQLIngest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,11 @@ describe("Functional tests for the sparqlIngest Connector Architecture function"
updateValue: "https://example.org/ns#Update",
deleteValue: "https://example.org/ns#Delete",
},
memberShapes: [ENTITY_SHAPE]
memberShapes: [ENTITY_SHAPE],
transactionConfig: {
transactionIdPath: "https://w3id.org/ldes#transactionId",
transactionEndPath: "https://w3id.org/ldes#isLastOfTransaction"
}
};

// Add some data to the triple store first (ex:Entity_0, ex:Entity_1, ex:Entity_2 and ex:Entity_3)
Expand Down
10 changes: 7 additions & 3 deletions tests/processors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ describe("Tests for SPARQL ingest processor", async () => {
js:deleteValue "http://ex.org/Delete"
];
js:targetNamedGraph "http://ex.org/myGraph";
js:transactionIdPath "http://ex.org/trancationId"
js:transactionConfig [
js:transactionIdPath "http://ex.org/transactionId";
js:transactionEndPath "http://ex.org/transactionEnd"
]
];
js:sparqlWriter <jw>.
`;
Expand All @@ -57,7 +60,7 @@ describe("Tests for SPARQL ingest processor", async () => {
expect(argss.length).toBe(1);
expect(argss[0].length).toBe(3);

const [[memberStream, ingestConfig, sparqlWriter]] = argss;
const [[memberStream, ingestConfig, sparqlWriter, transactionConfig]] = argss;

testReader(memberStream);
expect(ingestConfig.memberIsGraph).toBeFalsy();
Expand All @@ -68,7 +71,8 @@ describe("Tests for SPARQL ingest processor", async () => {
expect(ingestConfig.changeSemantics.updateValue).toBe("http://ex.org/Update");
expect(ingestConfig.changeSemantics.deleteValue).toBe("http://ex.org/Delete");
expect(ingestConfig.targetNamedGraph).toBe("http://ex.org/myGraph");
expect(ingestConfig.transactionIdPath).toBe("http://ex.org/trancationId");
expect(ingestConfig.transactionConfig.transactionIdPath).toBe("http://ex.org/transactionId");
expect(ingestConfig.transactionConfig.transactionEndPath).toBe("http://ex.org/transactionEnd");
testWriter(sparqlWriter);

await checkProc(env.file, env.func);
Expand Down

0 comments on commit 6adb826

Please sign in to comment.