Skip to content

Commit

Permalink
add mergeBy aggregation (#3438)
Browse files Browse the repository at this point in the history
* add mergeBy aggregation

* simplify a bit

* remove merge from DF since now on AF

* spelling, type cast

* bump: (minor) @terascope/[email protected], [email protected]

bump: (minor) [email protected], [email protected]

* ensure groupBy & mergeBy are not called at the same time
  • Loading branch information
lesleydreyer authored Oct 19, 2023
1 parent 1a80685 commit e2be585
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 35 deletions.
2 changes: 1 addition & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
},
"devDependencies": {
"bunyan": "^1.8.15",
"elasticsearch-store": "^0.72.0",
"elasticsearch-store": "^0.73.0",
"fs-extra": "^11.1.1",
"ms": "^2.1.3",
"nanoid": "^3.3.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/data-mate/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/data-mate",
"displayName": "Data-Mate",
"version": "0.47.0",
"version": "0.48.0",
"description": "Library of data validations/transformations",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/data-mate#readme",
"repository": {
Expand Down
103 changes: 84 additions & 19 deletions packages/data-mate/src/aggregation-frame/AggregationFrame.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DataTypeConfig, FieldType } from '@terascope/types';
import { BigMap } from '@terascope/utils';
import { BigMap, isArray, castArray } from '@terascope/utils';
import {
Column,
KeyAggregation, ValueAggregation, valueAggMap,
Expand Down Expand Up @@ -77,6 +77,15 @@ export class AggregationFrame<
*/
protected _fieldToColumnIndexCache?: Map<(keyof T), number>;

/**
* When using mergeBy... similar to groupBy as groups by the unique fields but
* if more than 1 is found it uses the last value and also merges fields
* i.e. mergeBy("foo")
* makes this [ { foo: a, bar: b, one: one }, { foo: a, bar: c, two: two } ]
* turn into [ { foo: a, bar: c, one: one, two: two }, { foo: b, bar: b2 } ]
*/
private _merge?: boolean;

constructor(
columns: Column<any, keyof T>[]|readonly Column<any, keyof T>[],
options: AggregationFrameOptions
Expand Down Expand Up @@ -110,6 +119,25 @@ export class AggregationFrame<
* GroupBy fields
*/
groupBy(...fieldArg: FieldArg<keyof T>[]): this {
if (this._merge) {
throw new Error('AggregationFrame.groupBy and AggregationFrame.mergeBy running at the same time is not currently supported');
}

this._groupByFields = Object.freeze(this._groupByFields.concat(
Array.from(getFieldsFromArg(this.fields, fieldArg))
));
return this;
}

/**
* MergeBy fields
*/
mergeBy(...fieldArg: FieldArg<keyof T>[]): this {
if (this._groupByFields.length) {
throw new Error('AggregationFrame.groupBy and AggregationFrame.mergeBy running at the same time is not currently supported');
}

this._merge = true;
this._groupByFields = Object.freeze(this._groupByFields.concat(
Array.from(getFieldsFromArg(this.fields, fieldArg))
));
Expand Down Expand Up @@ -561,13 +589,35 @@ export class AggregationFrame<
const buckets = new BigMap<string, Bucket<T>>();
const getFieldAggs = makeGetFieldAggs(buckets);

const merge = new Map<string, number[]>();
for (let i = 0; i < this.size; i++) {
const res = makeKeyForRow(keyAggs, i);
if (res) {
if (this._merge) {
let indices = [i];
if (merge.has(res.key)) {
// if merging collect all indices, then it will
// try to set fields with the last index if a value is found,
// otherwise will proceed to previous indices, until a value is found
const _indices = merge.get(res.key);
if (_indices?.length) indices = indices.concat(_indices);
merge.delete(res.key);
}
merge.set(res.key, indices);
} else {
fieldAggMakers.forEach(
makeProcessFieldAgg(getFieldAggs(res.key, i), i)
);
}
}
}

if (merge.size) {
[...merge.entries()].forEach(([key, indices]) => {
fieldAggMakers.forEach(
makeProcessFieldAgg(getFieldAggs(res.key, i), i)
makeProcessFieldAgg(getFieldAggs(key, indices), indices)
);
}
});
}

return buckets;
Expand Down Expand Up @@ -595,21 +645,25 @@ export class AggregationFrame<
return this._buildBucketWithAdjustedRowIndex(builders, bucket);
}

const [fieldAggs, startIndex] = bucket;
const [fieldAggs, bucketIndices] = bucket;
const indices = castArray(bucketIndices);
for (const [field, builder] of builders) {
const agg = fieldAggs.get(field);
if (agg != null) {
builder.append(agg.flush().value);
} else {
const value = this.getColumnOrThrow(field).vector.get(startIndex);
const writeIndex = builder.currentIndex++;
if (value != null) {
// doing this is faster than append
// because we KNOW it is already in a valid format
builder.data.set(
writeIndex,
value
);
for (const index of indices) {
const value = this.getColumnOrThrow(field).vector.get(index);
if (value != null) {
// doing this is faster than append
// because we KNOW it is already in a valid format
builder.data.set(
writeIndex,
value
);
break;
}
}
}
}
Expand All @@ -624,15 +678,19 @@ export class AggregationFrame<
builders: Map<keyof T, Builder<any>>,
[fieldAggs, startIndex]: Bucket<T>,
): void {
let useIndex = startIndex;
let useIndex = isArray(startIndex) ? startIndex[0] : startIndex;
const remainingFields: (keyof T)[] = [];

for (const [field, builder] of builders) {
const agg = fieldAggs.get(field);
if (agg != null) {
const res = agg.flush();
if (res.index != null && res.index > useIndex) {
useIndex = res.index;

if (res.index != null) {
const idx = isArray(res.index) ? res?.index[0] : res.index;
if (idx > useIndex) {
useIndex = res.index;
}
}
builder.append(res.value);
} else {
Expand Down Expand Up @@ -699,7 +757,7 @@ type FieldAggsMap<T extends Record<string, unknown>> = Map<keyof T, FieldAgg> &
adjustsSelectedRow: boolean
};
type Bucket<T extends Record<string, unknown>> = [
fieldAggs: FieldAggsMap<T>, startIndex: number
fieldAggs: FieldAggsMap<T>, startIndex: number|number[]
];

interface AggBuilders<T extends Record<string, any>> {
Expand All @@ -723,7 +781,7 @@ function curryFieldAgg(
}

function makeGetFieldAggs<T extends Record<string, any>>(buckets: BigMap<string, Bucket<T>>) {
return function getFieldAggs(key: string, index: number): FieldAggsMap<T> {
return function getFieldAggs(key: string, index: number|number[]): FieldAggsMap<T> {
const fieldAggRes = buckets.get(key);

if (!fieldAggRes) {
Expand All @@ -737,17 +795,24 @@ function makeGetFieldAggs<T extends Record<string, any>>(buckets: BigMap<string,
}

function makeProcessFieldAgg<T extends Record<string, any>>(
fieldAggs: FieldAggsMap<T>, index: number
fieldAggs: FieldAggsMap<T>, index: number|number[]
) {
return function processFieldAgg(maker: FieldAggMaker, field: keyof T) {
let agg = fieldAggs.get(field);

if (!agg) {
agg = maker[0]();
fieldAggs.set(field, agg);
}
if (agg.adjustsSelectedRow) {
fieldAggs.adjustsSelectedRow = true;
}
agg.push(maker[1].get(index), index);
if (isArray(index)) {
for (const i of index) {
agg.push(maker[1].get(i), i);
}
} else {
agg.push(maker[1].get(index), index);
}
};
}
8 changes: 4 additions & 4 deletions packages/data-mate/src/builder/Builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import {
} from '../vector';

/**
* Since Vectors are immutable, a Builder can be to construct a
* Since Vectors are immutable, a Builder can be used to construct a
* Vector. When values are inserted they are coerced and validated.
*/
export abstract class Builder<T = unknown> {
/**
* Make a instance of a Builder from a DataTypeField config
* Make an instance of a Builder from a DataTypeFieldConfig
*/
static make<R = unknown>(
data: WritableData<R>,
options: BuilderOptions,
): Builder<R> {
throw new Error(
`This will functionality replaced in the index file
`This functionality will be replaced in the index file
${options} ${length} ${data}`
);
}
Expand All @@ -47,7 +47,7 @@ export abstract class Builder<T = unknown> {
}

/**
* The type of Vector, this should only be set the specific Vector type classes.
* The type of Vector, this should only be set with the specific Vector type classes.
*/
readonly type: VectorType;

Expand Down
4 changes: 2 additions & 2 deletions packages/data-mate/src/data-frame/DataFrame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,10 @@ export class DataFrame<
buckets,
(name, i) => {
if (!serializeOptions) {
return this.getColumnOrThrow(name).vector.get(i);
return this.getColumnOrThrow(name).vector.get(i as number);
}
return this.getColumnOrThrow(name).vector.get(
i, true, serializeOptions
i as number, true, serializeOptions
);
}
);
Expand Down
Loading

0 comments on commit e2be585

Please sign in to comment.