Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Extending merge rollup capabilities #14310

Open
davecromberge opened this issue Oct 25, 2024 · 2 comments
Open

Proposal: Extending merge rollup capabilities #14310

davecromberge opened this issue Oct 25, 2024 · 2 comments
Labels
PEP-Request Pinot Enhancement Proposal request to be reviewed.

Comments

@davecromberge
Copy link
Member

davecromberge commented Oct 25, 2024

What needs to be done?

Extend the merge-rollup framework to create additional transformations:

  • dimensionality reduction/erasure
  • varying aggregate behaviour over time

Dimensionality reduction/erasure

Eliminate a particular dimension column's values to allow more rows to aggregate as duplicates.

For example:

Dimension Pre-transformation Post-transformation
Country United States United States
Device Mobile Mobile
Browser Safari Null / Other

The above example shows the Browser dimension erased or set to some default value after some time window has passed.

Varying aggregate behaviour over time

Some aggregate values could change precision over time. The multi-level merge functionality can be used to reduce the resolution or precision of aggregates for older segments. This applies primarily to sketches, but could also be used for other binary aggregate types.

Sketch Pre-transformation Post-transformation
Theta sketch 1 512kb 256kb
Theta sketch 2 400kb 200kb
Theta sketch 3 512kb 256kb

The above example shows a size reduction of 2x on existing sketches which could be achieved by reducing the lgK value by a factor of 1 as data ages. Be aware that this could cause varying precisions for queries that span time ranges, where the sketch implementation supports this.

Why the feature is needed (e.g. describing the use case).

The primary justification for such a feature is more aggressive space saving for historic data. As the merge rollup task processes older time windows, users could eliminate non-critical dimensions which would result in a greater degree of documents rolling up into a single aggregate. Similarly, users could sacrifice aggregate accuracy for historic queries and thus trade this off for a smaller storage footprint - especially when dealing with Theta / Tuple sketches which can be in the order of megabytes at lgK = 16.

Idea on how this may be implemented

Both extensions would require changes to the configuration for the Minion Merge rollup task. In particular, the most flexible approach would be to have a dynamic bag of properties that could apply to each individual aggregation function, where these could be interpreted before rolling up or merging the data.

Dimensionality reduction/erasure

  • applies to “map” phase of the SegmentProcessorFramework.
  • default reducer will function as normal
  • configuration should include:
    • time bucket periods
    • dimension name
    • leverage default value
  • configuration should be part of merge rollup task / segment refresh config
    • "dimensionName.eliminate.after": "7d",

Varying aggregate behaviour over time

  • applies to “map” phase of the SegmentProcessorFramework.
  • configuration could be uniformly applied in a global manner or part of the specific table task config:
    • hard coded parameters for Theta and Tuple sketch lgK (cumbersome)
    • dynamic bag of properties associated with time bucket (hard to validate)
    • not necessary to extend the function name parameter parser

Note: This issue should be treated PEP-request.

@mayankshriv
Copy link
Contributor

mayankshriv commented Oct 25, 2024

Thanks @davecromberge for filing this PEP, very well authored. Tagging @swaminathanmanish @Jackie-Jiang

@Jackie-Jiang Jackie-Jiang added the PEP-Request Pinot Enhancement Proposal request to be reviewed. label Oct 30, 2024
@davecromberge
Copy link
Member Author

davecromberge commented Oct 31, 2024

I have done further investigation as to how both options might be implemented and have concluded that it might be best to pursue dimensionality reduction in the first pass and re-evaluate whether varying aggregate behaviour is necessary.

Dimensionality reduction/erasure

Requires additional configuration for each time bucket with a list of dimension names to erase. In this context, erasing a dimension refers to looking up the defaultNullValue in the fieldSpec for that dimension.

The configuration might change to include an additional array configuration field as follows:

"MergeRollupTask": {
  "1hour.mergeType": "rollup",
  "1hour.bucketTimePeriod": "1h",
  "1hour.bufferTimePeriod": "3h",
  "1hour.maxNumRecordsPerSegment": "1000000",
  "1hour.maxNumRecordsPerTask": "5000000",
  "1hour.maxNumParallelBuckets": "5",
  "1day.eraseDimensionValues": "dimColA",
  "1day.mergeType": "rollup",
  "1day.bucketTimePeriod": "1d",
  "1day.bufferTimePeriod": "1d",
  "1day.roundBucketTimePeriod": "1d",
  "1day.maxNumRecordsPerSegment": "1000000",
  "1day.maxNumRecordsPerTask": "5000000",
  "1day.eraseDimensionValues": "dimColA,dimColB",
  "metricColA.aggregationType": "sum",
  "metricColB.aggregationType": "max"
}

In the example above, only dimColA is eliminated in the 1 hour merge task, where both dimColA and dimColB are eliminated in the 1 day merge tasks.

Concerning implementation, if the new field is provided, the MergeRollupTask will have to pass a custom RecordTransformer to the SegmentProcessorFramework. This custom record transformer will:

  1. For each dimension value name, lookup the corresponding fieldSpec in the table configuration.
  2. Use the defaultNullValue to transform (overwrite) the existing record value.
  3. Log warning messages for dimensions that are invalid

Note: The custom record transformer precedes all existing record transformers. Finally, the rollup process will consolidate all records where the dimensions have matching coordinates. The transformed records should result in a greater degree of rollup expressed as a fraction of the number of input records.

Varying aggregate behaviour over time (abandoned?)

Varying aggregate behaviour over time introduces complexity for indeterminate gains.

Firstly, the configuration for sketch precision would have to be defined for both the different metrics and time periods which introduces confusion for how the current task is configured. Example:

"MergeRollupTask": {
  "1hour.mergeType": "rollup",
  "1hour.bucketTimePeriod": "1h",
  "1hour.bufferTimePeriod": "3h",
  "1hour.maxNumRecordsPerSegment": "1000000",
  "1hour.maxNumRecordsPerTask": "5000000",
  "1hour.maxNumParallelBuckets": "5",
  "1hour.metricColA.functionParameters": { "nominalEntries":  "4096" },
  "1hour.metricColB.functionParameters": { "nominalEntries":  "8192" },
  "1day.mergeType": "rollup",
  "1day.bucketTimePeriod": "1d",
  "1day.bufferTimePeriod": "1d",
  "1day.roundBucketTimePeriod": "1d",
  "1day.maxNumRecordsPerSegment": "1000000",
  "1day.maxNumRecordsPerTask": "5000000",
  "1day.metricColA.functionParameters": { "nominalEntries":  "2048" },
  "1day.metricColB.functionParameters": { "nominalEntries":  "4096" },
  "metricColA.aggregationType": "distinctCountThetaSketch",
  "metricColB.aggregationType": "distinctCountThetaSketch"
}

In the example above, the aggregation function is configured both within the time buckets as well as on the metrics directly which might be confusing. Alternatively, the function parameters could be supplied directly on the metrics which still requires additional time configuration for each parameter.

Secondly, and more importantly, varying aggregate behaviour over time can lead to incorrect results. This is because StarTree indexes are constructed using the functionParameters configuration that is present on the StarTree. Constructing new trees from merged segments may no longer be possible at the given functionParameter configuration if the underlying aggregates have varying precision (in the case of sketches). This would not be necessarily be a problem for Apache Datasketches but it might not hold true for other aggregation types. Finally, the canUseStarTree method will consult the configured functionParameters and determine whether queries can be serviced directly from the StarTree, even though the underlying sketch aggregates might have a different precision.

davecromberge added a commit to davecromberge/pinot that referenced this issue Nov 1, 2024
Adds the capability to erase dimension values from a merged segment
before rollup to reduce cardinality and increase the degree to which
common dimension coordinates are aggregated.  This can result in a
space saving for some dimensions which are not important in historic
data.

See: apache#14310
davecromberge added a commit to davecromberge/pinot that referenced this issue Nov 1, 2024
Adds the capability to erase dimension values from a merged segment
before rollup to reduce cardinality and increase the degree to which
common dimension coordinates are aggregated.  This can result in a
space saving for some dimensions which are not important in historic
data.

See: apache#14310
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
PEP-Request Pinot Enhancement Proposal request to be reviewed.
Projects
None yet
Development

No branches or pull requests

3 participants