Skip to content

Commit

Permalink
Merge branch 'julianrojas87-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Apr 23, 2024
2 parents 346b6c8 + 23758f9 commit f7d48d5
Show file tree
Hide file tree
Showing 12 changed files with 725 additions and 68 deletions.
42 changes: 40 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,45 @@

[![Bun CI](https://github.com/ajuvercr/sds-processors/actions/workflows/build-test.yml/badge.svg)](https://github.com/ajuvercr/sds-processors/actions/workflows/build-test.yml) [![npm](https://img.shields.io/npm/v/sds-processors.svg?style=popout)](https://npmjs.com/package/sds-processors)

[Connector Architecture](https://the-connector-architecture.github.io/site/docs/1_Home) Typescript processors for handling operations over [SDS streams](https://treecg.github.io/SmartDataStreams-Spec/). It currently exposes 4 functions:
[Connector Architecture](https://the-connector-architecture.github.io/site/docs/1_Home) Typescript processors for handling operations over [SDS streams](https://treecg.github.io/SmartDataStreams-Spec/). It currently exposes 5 functions:

### [`js:Sdsify`](https://github.com/ajuvercr/sds-processors/blob/master/configs/sdsify.ttl#L10)

This processor takes as input a stream non SDS entities (members) and wrap them inside SDS records. Optionally, a type can be specified to indicate the correct subject.
This processor takes as input a stream of (batched) RDF data entities and wraps them as individual SDS records to be further processed downstream. By default, it will extract individual entities by taking every single named node subject and extracting a [Concise Bounded Description](https://www.w3.org/Submission/CBD/) (CBD) of that entity with respect to the input RDF graph.

Alternatively, a set of SHACL shapes can be given to concretely define and filter the type of entities and their properties, that want to be extracted and packaged as SDS records. This processor relies on the [member extraction algorithm](https://github.com/TREEcg/extract-cbd-shape) implemented by the [W3C TREE Hypermedia community group](https://www.w3.org/community/treecg/).

If the `js:timestampPath` is specified, the set of SDS records will be streamed out in temporal order to avoid out of order writing issues downstream.

An example of how to use this processor within a Connector Architecture pipeline definition is shown next:

```turtle
@prefix : <https://w3id.org/conn#>.
@prefix js: <https://w3id.org/conn/js#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
[ ] a js:Sdsify;
js:input <inputChannelReader>;
js:output <outputChannerWriter>;
js:stream <http://ex.org/myStream>;
js:timestampPath <http://ex.org/timestamp>;
js:shapeFilter """
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix ex: <http://ex.org/>.
[ ] a sh:NodeShape;
sh:targetClass ex:SomeClass;
sh:property [ sh:path ex:someProperty ].
""",
"""
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix ex: <http://ex.org/>.
[ ] a sh:NodeShape;
sh:targetClass ex:SomeOtherClass;
sh:property [ sh:path ex:someOtherProperty ].
""".
```

### [`js:Bucketize`](https://github.com/ajuvercr/sds-processors/blob/master/configs/bucketizer.ttl#L10)

Expand Down Expand Up @@ -38,6 +72,10 @@ You can define bucketizers as follows:

This processor takes a stream of raw entities (e.g., out from a RML transformation process) and creates versioned entities appending the current timestamp to the entity IRI to make it unique. It is capable of keeping a state so that unmodified entities are filtered.

### [`js:StreamJoin`](https://github.com/ajuvercr/sds-processors/blob/master/configs/stream_join.ttl#L10)

This processor can be used to join multiple input streams or Reader Channels (`js:input`) and pipe their data flow into a single output stream or Writer Channel (`js:output`). The processor will guarantee that all data elements are delivered downstream and will close the output if all inputs are closed.

### [`js:Generate`](https://github.com/ajuvercr/sds-processors/blob/be7134a295eb63e17034b2e3ceea0eaf6ad01770/configs/generator.ttl#L19)

This a simple RDF data generator function used for testing. This processor will periodically generate RDF objects with 3 to 4 predicates.
Binary file modified bun.lockb
Binary file not shown.
16 changes: 13 additions & 3 deletions configs/sdsify.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ js:Sdsify a js:JsProcess;
fnom:implementationParameterPosition "2"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Type filter";
fnom:functionParameter "Timestamp path";
fnom:implementationParameterPosition "3"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Shape filters";
fnom:implementationParameterPosition "4"^^xsd:int;
];
].

Expand All @@ -56,8 +60,14 @@ js:Sdsify a js:JsProcess;
sh:minCount 1;
], [
sh:datatype xsd:iri;
sh:path js:objectType;
sh:name "Type filter";
sh:path js:timestampPath;
sh:name "Timestamp path";
sh:minCount 0;
sh:maxCount 1;
], [
sh:datatype xsd:string;
sh:path js:shapeFilter;
sh:name "Shape filters";
sh:minCount 0;
].

42 changes: 42 additions & 0 deletions configs/stream_join.ttl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
@prefix js: <https://w3id.org/conn/js#>.
@prefix fno: <https://w3id.org/function/ontology#>.
@prefix fnom: <https://w3id.org/function/vocabulary/mapping#>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
@prefix : <https://w3id.org/conn#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix dc: <http://purl.org/dc/terms/>.

js:StreamJoin a js:JsProcess;
dc:title "Stream Join processor";
dc:description "Handle and pipe multiple input data streams into a single output stream.";
js:file <../lib/streamJoin.js>;
js:function "streamJoin";
js:location <../>;
js:mapping [
a fno:Mapping;
fno:parameterMapping [
a fnom:PositionParameterMapping;
fnom:functionParameter "Input Streams";
fnom:implementationParameterPosition "0"^^xsd:int
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Output stream";
fnom:implementationParameterPosition "1"^^xsd:int
]
].

[ ] a sh:NodeShape;
sh:targetClass js:StreamJoin;
sh:property [
sh:class :ReaderChannel;
sh:path js:input;
sh:name "Input Streams";
sh:minCount 1
], [
sh:class :WriterChannel;
sh:path js:output;
sh:name "Output stream";
sh:maxCount 1;
sh:minCount 1
].
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"name": "sds-processors",
"version": "0.2.1",
"version": "0.3.2",
"type": "module",
"scripts": {
"build": "tsc",
"prepublishOnly": "tsc",
"test": "bun test"
},
"main": "./lib/index.js",
Expand Down Expand Up @@ -34,11 +35,11 @@
"devDependencies": {
"@ajuvercr/js-runner": "^0.2.0-alpha.0",
"@jest/globals": "^29.7.0",
"@rdfjs/types": "^1.1.0",
"@types/jsonld": "^1.5.11",
"@types/n3": "^1.16.3",
"@types/node": "^20.8.9",
"@types/jsonld": "^1.5.13",
"@types/n3": "^1.16.4",
"@types/node": "^20.10.0",
"@types/rdf-js": "^4.0.2",
"typescript": "^5.2.2"
"@rdfjs/types": "^1.1.0",
"typescript": "^5.3.2"
}
}
2 changes: 1 addition & 1 deletion src/bucketizers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export async function doTheBucketization(
dataWriter.end();
});
metadataReader.on("end", () => {
console.log("buckeitze index closed");
console.log("bucketize index closed");
metadataWriter.end();
});

Expand Down
Loading

0 comments on commit f7d48d5

Please sign in to comment.