From 3c5f2206e00c88d4578853126f2856e8a9b5c8ad Mon Sep 17 00:00:00 2001 From: smessie Date: Wed, 23 Oct 2024 18:30:01 +0200 Subject: [PATCH] feat: HourBucketizer fragmenting members per hour as a double linked list --- configs/bucketizer_configs.ttl | 22 +++ src/bucketizers/hourBucketizer.ts | 142 ++++++++++++++++++ src/bucketizers/index.ts | 17 ++- test/hourBucketizer.test.ts | 231 ++++++++++++++++++++++++++++++ test/timebasedBucketizer.test.ts | 2 +- 5 files changed, 410 insertions(+), 4 deletions(-) create mode 100644 src/bucketizers/hourBucketizer.ts create mode 100644 test/hourBucketizer.test.ts diff --git a/configs/bucketizer_configs.ttl b/configs/bucketizer_configs.ttl index 1cd5fcc..f65f37e 100644 --- a/configs/bucketizer_configs.ttl +++ b/configs/bucketizer_configs.ttl @@ -108,3 +108,25 @@ sh:minCount 1; ]. +[ ] a sh:NodeShape; + sh:targetClass tree:HourFragmentation; + sh:property [ + sh:name "path"; + sh:path tree:timestampPath; + sh:class rdfl:PathLens; + sh:maxCount 1; + sh:minCount 1; + ], [ + sh:name "pathQuads"; + sh:path tree:timestampPath; + sh:class ; + sh:maxCount 1; + sh:minCount 1; + ], [ + sh:name "unorderedRelations"; + sh:path tree:unorderedRelations; + sh:datatype xsd:boolean; + sh:maxCount 1; + sh:minCount 0; + ]. + diff --git a/src/bucketizers/hourBucketizer.ts b/src/bucketizers/hourBucketizer.ts new file mode 100644 index 0000000..347884b --- /dev/null +++ b/src/bucketizers/hourBucketizer.ts @@ -0,0 +1,142 @@ +import { AddRelation, Bucketizer, HourFragmentation } from "./index"; +import { getLoggerFor } from "../utils/logUtil"; +import { Bucket, RdfThing, Record } from "../utils"; +import { BasicLensM, Cont } from "rdf-lens"; +import { Term } from "@rdfjs/types"; +import { TREE, XSD } from "@treecg/types"; +import { DataFactory } from "n3"; + +const { literal, namedNode } = DataFactory; + +export default class HourBucketizer implements Bucketizer { + protected readonly logger = getLoggerFor(this); + + private readonly path: BasicLensM; + private readonly pathQuads: RdfThing; + private readonly unorderedRelations: boolean; + + private hour: Date; + private root: boolean = true; + + constructor(config: HourFragmentation, save?: string) { + this.path = config.path.mapAll((x) => ({ + value: x.id.value, + literal: x.id, + })); + this.pathQuads = config.pathQuads; + this.unorderedRelations = config.unorderedRelations ?? false; + + if (save) { + const parsed = JSON.parse(save); + this.hour = new Date(parsed.hour); + this.root = parsed.root; + } + } + + bucketize( + record: Record, + getBucket: (key: string, root?: boolean) => Bucket, + addRelation: AddRelation, + ): Bucket[] { + const values = this.path + .execute(record.data) + .filter( + (x, i, arr) => arr.findIndex((y) => x.value === y.value) == i, + ); + + const out: Bucket[] = []; + + for (const value of values) { + if (value.literal) { + // The record has a timestamp value. + const timestamp = value.literal.value; + + const recordDate = new Date(timestamp); + + if (!this.hour) { + // Create the first (root) bucket. + this.root = true; + this.hour = new Date( + recordDate.getFullYear(), + recordDate.getMonth(), + recordDate.getDate(), + recordDate.getHours(), + ); + out.push(getBucket(this.hour.toISOString(), this.root)); + + this.logger.debug( + `Created root hour bucket ${this.hour.toISOString()}`, + ); + } else if (recordDate.getHours() !== this.hour.getHours()) { + // Create a new bucket. + const newHour = new Date( + recordDate.getFullYear(), + recordDate.getMonth(), + recordDate.getDate(), + recordDate.getHours(), + ); + const newBucket = getBucket(newHour.toISOString(), false); + + // Add a relation from and to the previous bucket. + const oldBucket = getBucket( + this.hour.toISOString(), + this.root, + ); + this.root = false; + if (this.unorderedRelations) { + addRelation(oldBucket, newBucket, TREE.terms.Relation); + addRelation(newBucket, oldBucket, TREE.terms.Relation); + } else { + addRelation( + oldBucket, + newBucket, + TREE.terms.GreaterThanOrEqualToRelation, + literal( + this.hour.toISOString(), + namedNode(XSD.dateTime), + ), + this.pathQuads, + ); + addRelation( + newBucket, + oldBucket, + TREE.terms.LessThanRelation, + literal( + this.hour.toISOString(), + namedNode(XSD.dateTime), + ), + this.pathQuads, + ); + } + + // Mark the old bucket as immutable. + oldBucket.immutable = true; + + out.push(newBucket); + this.hour = newHour; + + this.logger.debug( + `Created new hour bucket ${this.hour.toISOString()}`, + ); + } else { + // The record belongs to the current bucket. + out.push(getBucket(this.hour.toISOString(), this.root)); + } + } else { + // The record does not have a timestamp value. + this.logger.warn( + `Received records without timestamp values. Ignoring record '${record.data.id.value}'.`, + ); + } + } + + return out; + } + + save() { + return JSON.stringify({ + hour: this.hour, + root: this.root, + }); + } +} diff --git a/src/bucketizers/index.ts b/src/bucketizers/index.ts index ce3cdc3..d2958c1 100644 --- a/src/bucketizers/index.ts +++ b/src/bucketizers/index.ts @@ -1,5 +1,3 @@ -import { readFileSync } from "fs"; -import * as path from "path"; import { Term } from "@rdfjs/types"; import { BasicLensM, Cont } from "rdf-lens"; import { @@ -16,13 +14,18 @@ import SubjectBucketizer from "./subjectBucketizer"; import TimebasedBucketizer from "./timebasedBucketizer"; import { $INLINE_FILE } from "@ajuvercr/ts-transformer-inline-file"; +import HourBucketizer from "./hourBucketizer"; const df = new DataFactory(); export const SHAPES_TEXT = $INLINE_FILE("../../configs/bucketizer_configs.ttl"); export type BucketizerConfig = { type: Term; - config: SubjectFragmentation | PageFragmentation | TimebasedFragmentation; + config: + | SubjectFragmentation + | PageFragmentation + | TimebasedFragmentation + | HourFragmentation; }; export type SubjectFragmentation = { @@ -44,6 +47,12 @@ export type TimebasedFragmentation = { minBucketSpan: number; }; +export type HourFragmentation = { + path: BasicLensM; + pathQuads: Cont; + unorderedRelations?: boolean; +}; + export type AddRelation = ( origin: Bucket, target: Bucket, @@ -78,6 +87,8 @@ function createBucketizer(config: BucketizerConfig, save?: string): Bucketizer { config.config, save, ); + case TREE.custom("HourFragmentation"): + return new HourBucketizer(config.config, save); } throw "Unknown bucketizer " + config.type.value; } diff --git a/test/hourBucketizer.test.ts b/test/hourBucketizer.test.ts new file mode 100644 index 0000000..eabeed8 --- /dev/null +++ b/test/hourBucketizer.test.ts @@ -0,0 +1,231 @@ +import { beforeEach, describe, expect, test } from "vitest"; +import { DataFactory, Parser } from "n3"; +import { + BucketizerConfig, + BucketizerOrchestrator, + SHAPES_TEXT, +} from "../lib/bucketizers/index"; +import { extractShapes } from "rdf-lens"; +import { Bucket, Record } from "../lib"; +import { BucketRelation } from "../src/utils"; + +const { namedNode } = DataFactory; + +type Member = { id: string; timestamp: Date; text: string }; + +describe("HourBucketizer tests", () => { + function memberToRecord(member: Member): Record { + const quadsStr = ` +@prefix ex: . +@prefix xsd: . + +ex:${member.id} a ex:Member ; + ex:timestamp "${member.timestamp.toISOString()}"^^xsd:dateTime ; + ex:text "${member.text}" . +`; + const quads = new Parser({ baseIRI: "" }).parse(quadsStr); + return new Record( + { + id: namedNode(`http://example.org/${member.id}`), + quads, + }, + namedNode("http://example.org/MyStream"), + ); + } + + const bucketizerConfigsQuads = new Parser({ baseIRI: "" }).parse( + SHAPES_TEXT, + ); + const shapes = extractShapes(bucketizerConfigsQuads); + const lens = shapes.lenses["https://w3id.org/tree#FragmentationStrategy"]; + + function getOrchestrator(save?: string) { + const quadsStr = ` +@prefix ex: . +@prefix tree: . + +ex:Fragmentation a tree:HourFragmentation ; + tree:timestampPath ex:timestamp . +`; + + const quads = new Parser({ baseIRI: "" }).parse(quadsStr); + const output: BucketizerConfig = lens.execute({ + id: namedNode("http://example.org/Fragmentation"), + quads, + }); + + return new BucketizerOrchestrator([output], save); + } + + let buckets: { [id: string]: Bucket } = {}; + + beforeEach(() => { + buckets = {}; + }); + + test("member in next hour should create new bucket", () => { + const orchestrator = getOrchestrator(); + + const record1 = memberToRecord({ + id: "m1", + timestamp: new Date("2024-04-01T09:05:00Z"), + text: "First member", + }); + const member2 = { + id: "m2", + timestamp: new Date("2024-04-01T10:10:00Z"), + text: "Second member", + }; + const record2 = memberToRecord(member2); + + const firstBucketExpected = ""; + const secondBucketExpected = `/${encodeURIComponent( + new Date( + member2.timestamp.getFullYear(), + member2.timestamp.getMonth(), + member2.timestamp.getDate(), + member2.timestamp.getHours(), + ).toISOString(), + )}`; + + // Insert first record + const record1Buckets = orchestrator.bucketize( + record1, + buckets, + new Set(), + new Map>(), + [], + ); + + expect(record1Buckets.length).toBe(1); + expect(record1Buckets[0]).toBe(firstBucketExpected); + + // Insert second record + const newRelations: { origin: Bucket; relation: BucketRelation }[] = []; + const record2Buckets = orchestrator.bucketize( + record2, + buckets, + new Set(), + new Map>(), + newRelations, + ); + + expect(record2Buckets.length).toBe(1); + expect(record2Buckets[0]).toBe(secondBucketExpected); + + expect(newRelations.length).toBe(2); + expect(newRelations[0].origin.id.value).toBe(firstBucketExpected); + expect(newRelations[0].relation.target.value).toBe( + secondBucketExpected, + ); + + expect(newRelations[1].origin.id.value).toBe(secondBucketExpected); + expect(newRelations[1].relation.target.value).toBe(firstBucketExpected); + }); + + test("member in same hour should not create new bucket", () => { + const orchestrator = getOrchestrator(); + + const record1 = memberToRecord({ + id: "m1", + timestamp: new Date("2024-04-01T09:05:00Z"), + text: "First member", + }); + const member2 = { + id: "m2", + timestamp: new Date("2024-04-01T09:10:00Z"), + text: "Second member", + }; + const record2 = memberToRecord(member2); + + const firstBucketExpected = ""; + const secondBucketExpected = ""; + + // Insert first record + const record1Buckets = orchestrator.bucketize( + record1, + buckets, + new Set(), + new Map>(), + [], + ); + + expect(record1Buckets.length).toBe(1); + expect(record1Buckets[0]).toBe(firstBucketExpected); + + // Insert second record + const newRelations: { origin: Bucket; relation: BucketRelation }[] = []; + const record2Buckets = orchestrator.bucketize( + record2, + buckets, + new Set(), + new Map>(), + newRelations, + ); + + expect(record2Buckets.length).toBe(1); + expect(record2Buckets[0]).toBe(secondBucketExpected); + + expect(newRelations.length).toBe(0); + }); + + test("member in future hour (with a gap) should create new bucket for that future hour", () => { + const orchestrator = getOrchestrator(); + + const record1 = memberToRecord({ + id: "m1", + timestamp: new Date("2024-04-01T09:05:00Z"), + text: "First member", + }); + const member2 = { + id: "m2", + timestamp: new Date("2024-04-01T14:10:00Z"), + text: "Second member", + }; + const record2 = memberToRecord(member2); + + const firstBucketExpected = ""; + const secondBucketExpected = `/${encodeURIComponent( + new Date( + member2.timestamp.getFullYear(), + member2.timestamp.getMonth(), + member2.timestamp.getDate(), + member2.timestamp.getHours(), + ).toISOString(), + )}`; + + // Insert first record + const record1Buckets = orchestrator.bucketize( + record1, + buckets, + new Set(), + new Map>(), + [], + ); + + expect(record1Buckets.length).toBe(1); + expect(record1Buckets[0]).toBe(firstBucketExpected); + + // Insert second record + const newRelations: { origin: Bucket; relation: BucketRelation }[] = []; + const record2Buckets = orchestrator.bucketize( + record2, + buckets, + new Set(), + new Map>(), + newRelations, + ); + + expect(record2Buckets.length).toBe(1); + expect(record2Buckets[0]).toBe(secondBucketExpected); + + expect(newRelations.length).toBe(2); + expect(newRelations[0].origin.id.value).toBe(firstBucketExpected); + expect(newRelations[0].relation.target.value).toBe( + secondBucketExpected, + ); + + expect(newRelations[1].origin.id.value).toBe(secondBucketExpected); + expect(newRelations[1].relation.target.value).toBe(firstBucketExpected); + }); +}); diff --git a/test/timebasedBucketizer.test.ts b/test/timebasedBucketizer.test.ts index 8e74173..c480278 100644 --- a/test/timebasedBucketizer.test.ts +++ b/test/timebasedBucketizer.test.ts @@ -10,7 +10,7 @@ import { extractShapes, pred } from "rdf-lens"; import { SimpleStream } from "@rdfc/js-runner"; import { bucketize } from "../lib/main"; import { RDF, SDS } from "@treecg/types"; -import namedNode = DataFactory.namedNode; +const { namedNode } = DataFactory; type Member = { id: string; timestamp: Date; text: string };