diff --git a/package-lock.json b/package-lock.json index f1ac38a..c60d9ae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "MIT", "dependencies": { "@treecg/types": "^0.4.6", - "mongodb": "^6.7.0", + "mongodb": "^6.8.0", "n3": "^1.17.3", "winston": "^3.13.0" }, @@ -3010,7 +3010,7 @@ }, "node_modules/ip-address": { "version": "9.0.5", - "devOptional": true, + "dev": true, "license": "MIT", "dependencies": { "jsbn": "1.1.0", @@ -3371,7 +3371,7 @@ }, "node_modules/jsbn": { "version": "1.1.0", - "devOptional": true, + "dev": true, "license": "MIT" }, "node_modules/jsesc": { @@ -3795,8 +3795,9 @@ } }, "node_modules/mongodb": { - "version": "6.7.0", - "license": "Apache-2.0", + "version": "6.8.0", + "resolved": "https://registry.npmjs.org/mongodb/-/mongodb-6.8.0.tgz", + "integrity": "sha512-HGQ9NWDle5WvwMnrvUxsFYPd3JEbqD3RgABHBQRuoCEND0qzhsd0iH5ypHsf1eJ+sXmvmyKpP+FLOKY8Il7jMw==", "dependencies": { "@mongodb-js/saslprep": "^1.1.5", "bson": "^6.7.0", @@ -4790,7 +4791,7 @@ }, "node_modules/smart-buffer": { "version": "4.2.0", - "devOptional": true, + "dev": true, "license": "MIT", "engines": { "node": ">= 6.0.0", @@ -4799,7 +4800,7 @@ }, "node_modules/socks": { "version": "2.8.3", - "devOptional": true, + "dev": true, "license": "MIT", "dependencies": { "ip-address": "^9.0.5", @@ -4827,7 +4828,7 @@ }, "node_modules/sprintf-js": { "version": "1.1.3", - "devOptional": true, + "dev": true, "license": "BSD-3-Clause" }, "node_modules/stack-trace": { diff --git a/package.json b/package.json index 4b4807a..79c45f1 100644 --- a/package.json +++ b/package.json @@ -26,8 +26,8 @@ ], "license": "MIT", "devDependencies": { - "@rdfc/js-runner": "^0.2.1", "@jest/globals": "^29.7.0", + "@rdfc/js-runner": "^0.2.1", "@types/n3": "^1.16.4", "@types/node": "^20.14.2", "mongodb-memory-server": "^9.3.0", @@ -35,7 +35,7 @@ }, "dependencies": { "@treecg/types": "^0.4.6", - "mongodb": "^6.7.0", + "mongodb": "^6.8.0", "n3": "^1.17.3", "winston": "^3.13.0" } diff --git a/processor.ttl b/processor.ttl index 0020a6c..dae38b5 100644 --- a/processor.ttl +++ b/processor.ttl @@ -58,18 +58,6 @@ js:Ingest a js:JsProcess; a fnom:PositionParameterMapping; fnom:functionParameter "Database config"; fnom:implementationParameterPosition "2"^^xsd:int; - ], [ - a fnom:PositionParameterMapping; - fnom:functionParameter "Page Size"; - fnom:implementationParameterPosition "3"^^xsd:int; - ], [ - a fnom:PositionParameterMapping; - fnom:functionParameter "Branch Size"; - fnom:implementationParameterPosition "4"^^xsd:int; - ], [ - a fnom:PositionParameterMapping; - fnom:functionParameter "Min bucket span"; - fnom:implementationParameterPosition "5"^^xsd:int; ]; ]. @@ -93,23 +81,5 @@ js:Ingest a js:JsProcess; sh:minCount 1; sh:maxCount 1; sh:name "Database config"; - ], [ - sh:datatype xsd:integer; - sh:path js:pageSize; - sh:maxCount 1; - sh:defaultValue 100; - sh:name "Page Size"; - ], [ - sh:datatype xsd:integer; - sh:path js:branchSize; - sh:maxCount 1; - sh:defaultValue 4; - sh:name "Branch Size"; - ], [ - sh:datatype xsd:integer; - sh:path js:minBucketSpan; - sh:maxCount 1; - sh:defaultValue 300; - sh:name "Min bucket span"; ]. diff --git a/src/fragmentHelper.ts b/src/fragmentHelper.ts index 67c7668..815b146 100644 --- a/src/fragmentHelper.ts +++ b/src/fragmentHelper.ts @@ -1,9 +1,4 @@ - -import type * as RDF from '@rdfjs/types'; -import { Member, RelationType } from '@treecg/types'; -import { Collection } from "mongodb"; -import type { Logger } from 'winston'; -import { DataRecord } from '.'; +import {RelationType} from '@treecg/types'; export type TREEFragment = { id?: string, @@ -24,290 +19,3 @@ export type TREEFragment = { root: boolean, page: number }; - -export type ExtractIndices = (member: Member, mongo: Collection, timestampPath?: RDF.Term) => Promise; - -export interface Fragment { - streamId: string, - extract: ExtractIndices, -} - -// TODO: Check if scheduling updates for each new fragment using setTimeout -// is the best way to handle immutable labeling. -export async function handleTimestampPath( - id: string | null, - streamId: string, - path: string, - timestampValue: Date, - memberId: string, - indexColl: Collection, - memberColl: Collection, - maxSize: number, - k: number = 4, - minBucketSpan: number, - logger: Logger -) { - logger.debug("-----------------------------------------------------------------------------------"); - logger.debug(`[ingest] handleTimeStamp: Processing record ${memberId} with timestamp ${timestampValue.toISOString()}`); - - // Candidate fragment where we want to put this member (most granular fragment) - const candidateFragment: undefined | TREEFragment = ( - await indexColl.find({ streamId, timeStamp: { "$lte": timestampValue } }) - .sort({ timeStamp: -1, span: -1, page: -1 }).limit(1).toArray() - )[0]; - - if (candidateFragment) { - logger.debug(`[ingest] handleTimeStamp: Found this closest candidate fragment: ${candidateFragment.timeStamp?.toISOString()} (page ${candidateFragment.page})`); - - // Check if this member belongs to a new top level fragment (i.e. a new year) - if (timestampValue.getFullYear() > candidateFragment.timeStamp!.getFullYear()) { - await indexColl.insertOne(createNewYearFragment(streamId, id, memberId, timestampValue.getFullYear())); - logger.debug(`[ingest] handleTimeStamp: Created new top level fragment spanning 1 year: ${timestampValue.getFullYear()}`); - return; - } - - // Check if this fragment is still mutable - if (!candidateFragment.immutable) { - // We can still write this member to this fragment, unless it is full already - if (candidateFragment.count < maxSize) { - // There is still room in this fragment - await indexColl.updateOne( - candidateFragment, - { - $inc: { count: 1 }, - $push: { members: memberId } - } - ); - logger.debug(`[ingest] handleTimeStamp: Added new record to candidate fragment ${candidateFragment.timeStamp?.toISOString()}`); - return; - } else { - // We need to further split this fragment in k sub-fragments (recursively), - // while respecting the fragment max size - await splitFragmentRecursively( - k, - maxSize, - candidateFragment, - memberId, - streamId, - path, - memberColl, - indexColl, - minBucketSpan, - logger - ); - } - } else { - // TODO: implement optional strict mode that fails in this case - logger.warn(`[ingest] handleTimeStamp: Received out of order member (${memberId}) or current bucket has already expired (${candidateFragment.id})`); - return; - } - } else { - // Check if there is no candidate fragment because this is the first fragment ever - // or because this member is older than all existing fragments - if (await indexColl.countDocuments() > 0) { - // TODO: implement optional strict mode that fails in this case - logger.warn(`[ingest] handleTimeStamp: Received out of order member that cannot be added to the collection: ${memberId}`); - } else { - // This is the first fragment ever. Let's create a fragment spanning 1 year based on the member's timestamp - const currYear = timestampValue.getFullYear(); - await indexColl.insertOne(createNewYearFragment(streamId, id, memberId, currYear)); - logger.debug(`[ingest] handleTimeStamp: Created initial fragment spanning 1 year: ${currYear}`); - } - } -} - -function createNewYearFragment( - streamId: string, - id: string | null, - memberId: string, - year: number): TREEFragment { - const timeStamp = new Date(); - timeStamp.setUTCFullYear(year); - timeStamp.setUTCMonth(0); - timeStamp.setUTCDate(1); - timeStamp.setUTCHours(0, 0, 0, 0); - - return { - streamId, - id: id ? id : `${timeStamp.toISOString()}/31536000000/0`, - timeStamp, - relations: [], - members: [memberId], - count: 1, - span: 31536000000, - immutable: false, - root: true, - page: 0 - } -} - -async function splitFragmentRecursively( - k: number, - maxSize: number, - candidateFragment: TREEFragment, - newMember: string | null, - streamId: string, - path: string, - memberColl: Collection, - indexColl: Collection, - minBucketSpan: number, - logger: Logger -) { - // Time span for new sub-fragment(s) - const newSpan = candidateFragment.span / k; - // Gather all the members of the full fragment - const membersRefs = candidateFragment.members!; - if (newMember) { - // Include the new member (if any) - membersRefs.push(newMember); - } - const members = await memberColl.find({ id: { $in: membersRefs } }).toArray(); - // Sort members per timestamp - members.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime()); - - if (newSpan < minBucketSpan * 1000) { - // We don't want to split temporal fragments under a given time resolution. - // Instead we opt for a 1-dimensional pagination when the amount of members - // is too high for a very short time span. - - const baseBucketId = `${candidateFragment.timeStamp!.toISOString()}/${candidateFragment.span}`; - - const newFragment: TREEFragment = { - id: `${baseBucketId}/${candidateFragment.page + 1}`, - streamId, - relations: [], - members: members.slice(maxSize).map(m => m.id), - count: members.slice(maxSize).map(m => m.id).length, - timeStamp: candidateFragment.timeStamp!, - span: candidateFragment.span, - immutable: false, - root: false, - page: candidateFragment.page + 1 - }; - - const newRelation = { - type: RelationType.Relation, - bucket: `${baseBucketId}/${candidateFragment.page + 1}`, - timestampRelation: false - }; - - // Persist new paginated fragment and its relation - await Promise.all([ - indexColl.insertOne(newFragment), - indexColl.updateOne( - { id: candidateFragment.id! }, - { - $set: { - streamId, - members: members.slice(0, maxSize).map(m => m.id), - count: maxSize, - relations: [newRelation], - timeStamp: candidateFragment.timeStamp, - span: candidateFragment.span, - immutable: candidateFragment.immutable, - root: candidateFragment.root, - page: candidateFragment.page - } - }, - { upsert: true } - ) - ]); - logger.debug(`Added paginated (page ${newFragment.page}) new sub-fragment ${newFragment.timeStamp?.toISOString()}`); - - } else { - // New relations to be added - const newRelations: TREEFragment["relations"] = [] - // New sub-fragments - const subFragments: TREEFragment[] = []; - - for (let i = 0; i < k; i++) { - const newTs = new Date(candidateFragment.timeStamp!.getTime() + (i * newSpan)); - const subFragment: TREEFragment = { - id: `${newTs.toISOString()}/${newSpan}/0`, - streamId, - relations: [], - members: [], - count: 0, - timeStamp: newTs, - span: newSpan, - immutable: false, - root: false, - page: 0 - }; - - for (const member of members) { - // Check which members belong in this new sub-fragment - if (member.timestamp.getTime() >= newTs.getTime() - && member.timestamp.getTime() < (newTs.getTime() + newSpan)) { - subFragment.members?.push(member.id); - subFragment.count++; - - } - } - - // These are the new relations that are added from the originally full - // candidate fragment towards this new sub-fragment - newRelations.push(...[ - { - type: RelationType.GreaterThanOrEqualTo, - value: newTs.toISOString(), - bucket: subFragment.id!, - path, - timestampRelation: true - }, - { - type: RelationType.LessThan, - value: new Date(newTs.getTime() + newSpan).toISOString(), - bucket: subFragment.id!, - path, - timestampRelation: true - } - ]); - - // Check we if this new sub-fragment is violating the max size constraint - if (subFragment.members!.length > maxSize) { - // Further split this fragment that is currently too large - logger.debug(`Splitting one level deeper for sub-fragment ${subFragment.timeStamp?.toISOString()} (span: ${subFragment.span})`); - await splitFragmentRecursively( - k, - maxSize, - subFragment, - null, - streamId, - path, - memberColl, - indexColl, - minBucketSpan, - logger - ); - } else { - // Sub-fragment to be persisted in a bulk operation - subFragments.push(subFragment); - } - } - - // Persist new sub-fragments and their relations - await Promise.all([ - indexColl.insertMany(subFragments), - indexColl.updateOne( - { id: candidateFragment.id! }, - { - $set: { - streamId, - members: [], - count: 0, - timeStamp: candidateFragment.timeStamp, - span: candidateFragment.span, - immutable: candidateFragment.immutable, - root: candidateFragment.root, - page: candidateFragment.page - }, - $push: { relations: { $each: newRelations } } - }, - { upsert: true } - ) - ]); - logger.debug(`Added ${subFragments.length} new sub-fragments (${subFragments.map(sf => ` ${sf.id}`)})`); - } -} - diff --git a/src/index.ts b/src/index.ts index c6e47f4..477292b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,23 +1,16 @@ import type * as RDF from "@rdfjs/types"; -import { Stream } from "@rdfc/js-runner"; -import { - LDES, - Member, - PROV, - RDF as RDFT, - RelationType, - SDS, -} from "@treecg/types"; -import { Collection, MongoClient } from "mongodb"; -import { Parser, Writer } from "n3"; -import { env } from "process"; +import {Stream} from "@rdfc/js-runner"; +import {LDES, Member, PROV, RDF as RDFT, RelationType, SDS,} from "@treecg/types"; +import {Collection, MongoClient} from "mongodb"; +import {Parser, Writer} from "n3"; +import {env} from "process"; import winston from "winston"; -import { handleTimestampPath, TREEFragment } from "./fragmentHelper"; +import {TREEFragment} from "./fragmentHelper"; const consoleTransport = new winston.transports.Console(); const logger = winston.createLogger({ format: winston.format.combine( - winston.format.colorize({ level: true }), + winston.format.colorize({level: true}), winston.format.simple(), ), transports: [consoleTransport], @@ -106,59 +99,10 @@ function maybe_parse(data: RDF.Quad[] | string): RDF.Quad[] { } } -function getRelatedBuckets( - quads: RDF.Quad[], - bucket: RDF.Term, - done: Set, -): RDF.Term[] { - const set: RDF.Term[] = []; - const get = (q: RDF.Term) => { - if (done.has(q.value)) { - return; - } - done.add(q.value); - set.push(q); - // Find forward relations - quads - .filter( - (x) => x.subject.equals(q) && x.predicate.equals(SDS.terms.relation), - ) - .map((x) => x.object) - .flatMap((bn) => - quads - .filter( - (q) => - q.subject.equals(bn) && - q.predicate.equals(SDS.terms.relationBucket), - ) - .map((x) => x.object), - ) - .forEach(get); - - // Find backwards relations - quads - .filter( - (x) => - x.object.equals(q) && x.predicate.equals(SDS.terms.relationBucket), - ) - .map((x) => x.subject) - .flatMap((bn) => - quads - .filter( - (q) => - q.object.equals(bn) && q.predicate.equals(SDS.terms.relation), - ) - .map((x) => x.subject), - ) - .forEach(get); - }; - - get(bucket); - return set; -} - -function parseBool(bo?: string): boolean { - if (!bo) { +function parseBool(bo?: string): boolean | undefined { + if (bo === undefined) { + return undefined; + } else if (!bo) { return false; } else { const bos = bo.toLowerCase(); @@ -167,13 +111,24 @@ function parseBool(bo?: string): boolean { } function gatherBuckets( - buckets: Bucket[], data: RDF.Quad[], - subject: RDF.Term, - stream: string, - found: Set, -) { - for (let bucket of getRelatedBuckets(data, subject, found)) { +): Bucket[] { + const buckets: Bucket[] = []; + + const bucketTerms: Set = new Set(); + data.filter(q => + q.predicate.equals(RDFT.terms.type) + && q.object.equals(SDS.terms.custom("Bucket")) + && q.graph.equals(SDS.terms.custom("DataDescription"))) + .map(q => q.subject).forEach(bucket => bucketTerms.add(bucket)); + + data.filter(q => + q.predicate.equals(SDS.terms.bucket) + && q.graph.equals(SDS.terms.custom("DataDescription"))) + .map(q => q.object).forEach(bucket => bucketTerms.add(bucket)); + + bucketTerms.forEach(bucket => { + const isRoot = data.find( (q) => q.subject.equals(bucket) && @@ -184,6 +139,26 @@ function gatherBuckets( q.subject.equals(bucket) && q.predicate.equals(SDS.terms.custom("immutable")), )?.object.value; + + // If the subject has a triple with the Stream, use that one. Otherwise, we need to find the stream via the record the bucket is attached to. + const stream = data.find( + (q) => + q.subject.equals(bucket) && + q.predicate.equals(SDS.terms.stream), + )?.object.value || data.find( + (q) => + q.subject.equals(data.find( + (record) => + record.predicate.equals(SDS.terms.bucket) && + record.object.equals(bucket), + )?.subject) && + q.predicate.equals(SDS.terms.stream), + )?.object.value; + if (!stream) { + logger.error(`Found SDS bucket without a stream! ${bucket.value}`); + return; + } + const b = { root: isRoot === "true", id: bucket.value, @@ -192,6 +167,7 @@ function gatherBuckets( immutable: parseBool(immutable), }; + // Find all relations of the bucket const relations = data .filter( (q) => @@ -215,11 +191,12 @@ function gatherBuckets( q.predicate.equals(SDS.terms.relationValue), )?.object.value; - b.relations.push({ type, bucket: target, path, value }); + b.relations.push({type, bucket: target, path, value}); } buckets.push(b); - } + }); + return buckets; } function gatherRecords( @@ -268,6 +245,24 @@ function gatherRecords( return out; } +async function emptyBuckets(data: RDF.Quad[], collection: Collection) { + const bucketsToEmpty = data.filter(q => + q.predicate.equals(SDS.terms.custom("empty")) && + q.object.value === "true" && + q.graph.equals(SDS.terms.custom("DataDescription"))) + .map(q => q.subject); + + for (const bucket of bucketsToEmpty) { + await collection.updateOne( + {id: bucket.value}, + { + $set: {members: []}, + }, + {upsert: true}, + ); + } +} + // This could be a memory problem in the long run // TODO: Find a way to persist written records efficiently //const addedMembers: Set = new Set(); @@ -284,13 +279,17 @@ async function addDataRecord( //addedMembers.add(value); // Check if record is already written in the collection - const present = (await collection.countDocuments({ id: value })) > 0; + const present = (await collection.countDocuments({id: value})) > 0; if (present) return; const member = filterMember(quads, record.payload, [ (q) => q.predicate.equals(SDS.terms.payload), ]); + if (!member?.length) { + return; + } + const ser = new Writer().quadsToString(member); updateRecords.push({ @@ -302,6 +301,7 @@ async function addDataRecord( const setRoots: Set = new Set(); const immutables: Set = new Set(); + async function addBucket( bucket: Bucket, collection: Collection, @@ -310,32 +310,41 @@ async function addBucket( if (bucket.root && !setRoots.has(bucket.stream)) { setRoots.add(bucket.stream); await collection.updateOne( - { streamId: bucket.stream, id: bucket.id }, + {streamId: bucket.stream, id: bucket.id}, { - $set: { root: true }, + $set: {root: true}, }, - { upsert: true }, + {upsert: true}, ); } if (bucket.immutable && !immutables.has(bucket.id)) { immutables.add(bucket.id); await collection.updateOne( - { streamId: bucket.stream, id: bucket.id }, + {streamId: bucket.stream, id: bucket.id}, + { + $set: {immutable: true}, + }, + {upsert: true}, + ); + } else if (bucket.immutable === false) { + // If it is explicitly set as false, set it in the database, so we persist the bucket. + await collection.updateOne( + {streamId: bucket.stream, id: bucket.id}, { - $set: { immutable: true }, + $set: {immutable: false}, }, - { upsert: true }, + {upsert: true}, ); } for (let newRelation of bucket.relations) { await collection.updateOne( - { streamId: bucket.stream, id: bucket.id }, + {streamId: bucket.stream, id: bucket.id}, { - $push: { relations: newRelation }, + $addToSet: {relations: newRelation}, }, - { upsert: true }, + {upsert: true}, ); } } @@ -409,9 +418,9 @@ async function setup_metadata( const ser = new Writer().quadsToString(streamMember); await metaCollection.updateOne( - { type: SDS.Stream, id: streamId.value }, - { $set: { value: ser } }, - { upsert: true }, + {type: SDS.Stream, id: streamId.value}, + {$set: {value: ser}}, + {upsert: true}, ); } }; @@ -419,7 +428,7 @@ async function setup_metadata( metadata.data(handleMetadata); if (metadata.lastElement) { - handleMetadata(metadata.lastElement); + await handleMetadata(metadata.lastElement); } } @@ -427,9 +436,6 @@ export async function ingest( data: Stream, metadata: Stream, database: DBConfig, - maxSize: number = 100, - k: number = 4, - minBucketSpan: number = 300 ) { const url = database.url || env.DB_CONN_STRING || "mongodb://localhost:27017/ldes"; const mongo = await new MongoClient(url).connect(); @@ -442,7 +448,6 @@ export async function ingest( let ingestMetadata = true; let ingestData = true; let closed = false; - let lastMemberTimestamp = null; const closeMongo = () => { if (!ingestMetadata && !ingestData && !closed) { @@ -472,53 +477,16 @@ export async function ingest( const bs = record.buckets; if (bs.length === 0) { await indexCollection.updateOne( - { root: true, streamId: record.stream, id: "" }, - { $push: { members: record.payload.value } }, - { upsert: true }, + {root: true, streamId: record.stream, id: ""}, + {$addToSet: {members: record.payload.value}}, + {upsert: true}, ); } else { for (let bucket of bs) { await indexCollection.updateOne( - { streamId: record.stream, id: bucket.value }, - { $push: { members: record.payload.value } }, - { upsert: true }, - ); - } - } - }; - - const pushTimstampMemberToDB = async (record: SDSRecord) => { - const bs = record.buckets; - const timestamp = new Date(record.timestampValue!); - - if (bs.length === 0) { - await handleTimestampPath( - null, - record.stream, - streamTimestampPaths[record.stream], - timestamp, - record.payload.value, - indexCollection, - memberCollection, - maxSize, - k, - minBucketSpan, - logger - ); - } else { - for (let bucket of bs) { - await handleTimestampPath( - bucket.value, - record.stream, - streamTimestampPaths[record.stream], - timestamp, - record.payload.value, - indexCollection, - memberCollection, - maxSize, - k, - minBucketSpan, - logger + {streamId: record.stream, id: bucket.value}, + {$addToSet: {members: record.payload.value}}, + {upsert: true}, ); } } @@ -550,56 +518,16 @@ export async function ingest( // Update INDEX collection accordingly for (const r of records) { - await (r.timestampValue ? pushTimstampMemberToDB(r) : pushMemberToDB(r)); + await pushMemberToDB(r); } - const isDefaultTimestamp = records[0].timestampValue !== undefined; - - // If the fragmentation strategy is the default timestamp-based - // we need to handle the labeling of buckets/fragments as immutable - // based on the current time - if (isDefaultTimestamp) { - // Last known member timestamp - lastMemberTimestamp = new Date(records[records.length -1].timestampValue!) - // Gather all mutable fragments that have expired - - // TODO: Check if we can directly update all expired fragments - // Not sure how to add the timestamp and the span while querying - const expiredBuckets = await indexCollection.find({ - timeStamp: { $lte: lastMemberTimestamp }, - immutable: false - }).toArray(); - - for (const buck of expiredBuckets) { - const expDate = buck.timeStamp!.getTime() + buck.span; - if (expDate < lastMemberTimestamp.getTime()) { - logger.debug(`Labeling bucket ${buck.timeStamp?.toISOString()} (span: ${buck.span}) as immutable`); - // Label these buckets as immutable - await indexCollection.updateOne(buck, { - $set: { immutable: true } - }); - } - } + const buckets: Bucket[] = gatherBuckets(data); + for (let bucket of buckets) { + await addBucket(bucket, indexCollection); } - // This next steps are only performed if the fragmentation strategy - // is already defined in the SDS metadata and the stream is not relying - // on the default timestamp-based strategy, which is handled within the - // pushTimstampMemberToDB() function. - if (!isDefaultTimestamp) { - const found: Set = new Set(); - const buckets: Bucket[] = []; - - for (const r of records) { - r.buckets.forEach((b) => - gatherBuckets(buckets, data, b, r.stream, found), - ); - } - - for (let bucket of buckets) { - await addBucket(bucket, indexCollection); - } - } + // Empty buckets that are marked so. + await emptyBuckets(data, indexCollection); }); logger.debug("[ingest] Attached data handler");