Skip to content

Commit

Permalink
Bucketizers (#5)
Browse files Browse the repository at this point in the history
* update ldesify

* implement bucketizers

* better sds extraction

* update readme

* bump version

* update js-runner version
  • Loading branch information
ajuvercr committed Apr 23, 2024
1 parent 93bb2f5 commit bd02e82
Show file tree
Hide file tree
Showing 20 changed files with 4,973 additions and 1,826 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,27 @@ An example of how to use this processor within a Connector Architecture pipeline

This processor takes as input a stream of SDS records and SDS metadata and proceeds to _bucketize_ them according to a predefined strategy ([see example](https://github.com/ajuvercr/sds-processors/blob/master/bucketizeStrategy.ttl)). The SDS metadata will be also transformed to reflect this transformation. Multiple SDS streams can be present on the incoming data channel.

This processor relies on the bucketizer implementations available in the [TREEcg/bucketizers](https://github.com/TREEcg/bucketizers) repository.
You can define bucketizers as follows:

```turtle
<bucketize> a js:Bucketize;
js:channels [
js:dataInput <...data input>;
js:metadataInput <... metadata input>;
js:dataOutput <... data output>;
js:metadataOutput <... metadata output>;
];
js:bucketizeStrategy ( [ # One or more bucketize strategies
a tree:SubjectFragmentation; # Create a bucket based on this path
tree:fragmentationPath ( );
] [
a tree:PageFragmentation; # Create a new bucket when the previous bucket has 2 members
tree:pageSize 2;
] );
js:savePath <./buckets_save.json>;
js:outputStreamId <MyEpicStream>.
```


### [`js:Ldesify`](https://github.com/ajuvercr/sds-processors/blob/master/configs/ldesify.ttl#L10)

Expand Down
88 changes: 60 additions & 28 deletions configs/bucketizer.ttl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@prefix tree: <https://w3id.org/tree#>.
@prefix rdfl: <https://w3id.org/rdf-lens/ontology#>.
@prefix js: <https://w3id.org/conn/js#>.
@prefix fno: <https://w3id.org/function/ontology#>.
@prefix fnom: <https://w3id.org/function/vocabulary/mapping#>.
Expand All @@ -7,88 +9,118 @@
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix dc: <http://purl.org/dc/terms/>.

<> owl:imports <./bucketizer_configs.ttl>.
js:Bucketize a js:JsProcess;
dc:title "Bucketizer processor";
dc:description "This bucketizer processor is a SDS processor. It takes in SDS records and SDS metadata and bucketizes the incoming records according to some bucketize strategy. The metadata is also transformed to reflect this transformation. Many SDS streams can be present on the incoming data channel, please specify what stream to ingest and what stream to produce.";
js:file <../lib/main.js>;
js:function "doTheBucketization";
js:file <../src/bucketizers.ts>;
js:function "bucketize";
js:location <../>;
js:mapping [
a fno:Mapping;
fno:parameterMapping [
a fnom:PositionParameterMapping;
fnom:functionParameter "Data input channel";
fnom:functionParameter "channels";
fnom:implementationParameterPosition "0"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Metadata input channel";
fnom:functionParameter "Bucketization strategy";
fnom:implementationParameterPosition "1"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Data output channel";
fnom:implementationParameterPosition "2"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Metadata output channel";
fnom:implementationParameterPosition "3"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Location of bucketization strategy file";
fnom:implementationParameterPosition "4"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Path to use to save state files";
fnom:implementationParameterPosition "5"^^xsd:int;
fnom:implementationParameterPosition "2"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "ID of Stream to transform";
fnom:implementationParameterPosition "6"^^xsd:int;
fnom:implementationParameterPosition "3"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "ID of Stream to produce";
fnom:implementationParameterPosition "7"^^xsd:int;
fnom:implementationParameterPosition "4"^^xsd:int;
];
].

[ ] a sh:NodeShape;
sh:targetClass js:Bucketize;
sh:targetClass <Channels>;
sh:property [
sh:class :ReaderChannel;
sh:path js:dataInput;
sh:name "Data input channel";
sh:name "dataInput";
sh:minCount 1;
sh:maxCount 1;
], [
sh:class :ReaderChannel;
sh:path js:metadataInput;
sh:name "Metadata input channel";
sh:name "metadataInput";
sh:minCount 1;
sh:maxCount 1;
], [
sh:class :WriterChannel;
sh:path js:dataOutput;
sh:name "Data output channel";
sh:name "dataOutput";
sh:minCount 1;
sh:maxCount 1;
], [
sh:class :WriterChannel;
sh:path js:metadataOutput;
sh:name "Metadata output channel";
sh:name "metadataOutput";
sh:minCount 1;
sh:maxCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass <RdfThing>;
sh:property [
sh:name "id";
sh:path ( );
sh:maxCount 1;
sh:minCount 1;
sh:datatype xsd:iri;
], [
sh:datatype xsd:string;
sh:name "quads";
sh:path ( );
sh:maxCount 1;
sh:minCount 1;
sh:class rdfl:CBD;
].

[ ] a sh:NodeShape;
sh:targetClass <Config>;
sh:property [
sh:class tree:FragmentationStrategy;
sh:path ( );
sh:name "strategy";
sh:minCount 1;
], [
sh:class <RdfThing>;
sh:path ( );
sh:name "quads";
sh:minCount 1;
sh:maxCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass js:Bucketize;
sh:property [
sh:class <Channels>;
sh:path js:channels;
sh:name "channels";
sh:minCount 1;
sh:maxCount 1;
], [
sh:class <Config>;
sh:path js:bucketizeStrategy;
sh:name "Location of bucketization strategy file";
sh:name "Bucketization strategy";
sh:minCount 1;
sh:maxCount 1;
], [
sh:datatype xsd:string;
sh:datatype xsd:iri;
sh:path js:inputStreamId;
sh:name "ID of Stream to transform";
sh:maxCount 1;
], [
sh:datatype xsd:string;
sh:datatype xsd:iri;
sh:path js:outputStreamId;
sh:name "ID of Stream to produce";
sh:minCount 1;
Expand Down
91 changes: 91 additions & 0 deletions configs/bucketizer_configs.ttl
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
@prefix tree: <https://w3id.org/tree#>.
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>.
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix sds: <https://w3id.org/sds#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
@prefix rdfl: <https://w3id.org/rdf-lens/ontology#>.

[ ] a sh:NodeShape;
sh:targetClass tree:FragmentationStrategy;
sh:property [
sh:name "type";
sh:path rdf:type;
sh:datatype xsd:iri;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "config";
sh:path ( );
sh:class rdfl:TypedExtract;
sh:maxCount 1;
sh:minCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass <RdfThing>;
sh:property [
sh:name "id";
sh:path ( );
sh:maxCount 1;
sh:minCount 1;
sh:datatype xsd:iri;
], [
sh:name "quads";
sh:path ( );
sh:maxCount 1;
sh:minCount 1;
sh:class rdfl:CBD;
].

[ ] a sh:NodeShape;
sh:targetClass tree:SubjectFragmentation;
sh:property [
sh:name "path";
sh:path tree:fragmentationPath;
sh:class rdfl:PathLens;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "pathQuads";
sh:path tree:fragmentationPath;
sh:class <RdfThing>;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "namePath";
sh:path tree:fragmentationPathName;
sh:class rdfl:pathLens;
sh:maxCount 1;
], [
sh:name "defaultName";
sh:path tree:fragmentationDefaultName;
sh:datatype xsd:string;
sh:maxCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass tree:PageFragmentation;
sh:property [
sh:name "pageSize";
sh:path tree:pageSize;
sh:datatype xsd:integer;
sh:maxCount 1;
sh:minCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass tree:TimebasedFragmentation;
sh:property [
sh:name "path";
sh:path tree:fragmentationPath;
sh:class rdfl:PathLens;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "maxGranularity";
sh:path tree:maxGranularity;
sh:datatype xsd:string;
sh:maxCount 1;
].

27 changes: 27 additions & 0 deletions configs/ldesify.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ js:Ldesify a js:JsProcess;
a fnom:PositionParameterMapping;
fnom:functionParameter "State file path";
fnom:implementationParameterPosition "2"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Check properties";
fnom:implementationParameterPosition "3"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Timestamp Path";
fnom:implementationParameterPosition "4"^^xsd:int;
], [
a fnom:PositionParameterMapping;
fnom:functionParameter "Version Of Path";
fnom:implementationParameterPosition "5"^^xsd:int;
];
].

Expand All @@ -44,6 +56,21 @@ js:Ldesify a js:JsProcess;
sh:name "Data output channel";
sh:maxCount 1;
sh:minCount 1;
], [
sh:datatype xsd:boolean;
sh:path js:checkProps;
sh:name "Check properties";
sh:maxCount 1;
], [
sh:datatype xsd:iri;
sh:path js:timestampPath;
sh:name "Timestamp Path";
sh:maxCount 1;
], [
sh:datatype xsd:iri;
sh:path js:versionOfPath;
sh:name "Version Of Path";
sh:maxCount 1;
], [
sh:datatype xsd:string;
sh:path js:path;
Expand Down
Loading

0 comments on commit bd02e82

Please sign in to comment.