Skip to content

Commit

Permalink
Add Support for Handling Missing Data in Anomaly Detection (#1274) (#…
Browse files Browse the repository at this point in the history
…1281)

* Add Support for Handling Missing Data in Anomaly Detection

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.



* rebase from main



* add comment and remove redundant code



---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Aug 19, 2024
1 parent d3bd162 commit b5e85e1
Show file tree
Hide file tree
Showing 141 changed files with 7,305 additions and 1,692 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
strategy:
matrix:
# each test scenario (rule, hc, single_stream) is treated as a separate job.
test: [rule, hc, single_stream]
test: [rule, hc, single_stream,missing]
fail-fast: false
concurrency:
# The concurrency setting is used to limit the concurrency of each test scenario group to ensure they do not run concurrently on the same machine.
Expand Down Expand Up @@ -51,11 +51,16 @@ jobs:
chown -R 1000:1000 `pwd`
case ${{ matrix.test }} in
rule)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RuleModelPerfIT' \
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.HistoricalRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
;;
hc)
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \
Expand All @@ -69,4 +74,10 @@ jobs:
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
missing)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeMissingSingleFeatureModelPerfIT' \
-Dtests.seed=60CDDB34427ACD0C -Dtests.security.manager=false \
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
esac
62 changes: 42 additions & 20 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.1.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.1.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.1.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand Down Expand Up @@ -356,8 +356,7 @@ integTest {

if (System.getProperty("model-benchmark") == null || System.getProperty("model-benchmark") == "false") {
filter {
excludeTestsMatching "org.opensearch.ad.e2e.SingleStreamModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.RuleModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.*ModelPerfIT"
}
}

Expand Down Expand Up @@ -676,34 +675,57 @@ List<String> jacocoExclusions = [
// rest layer is tested in integration testing mostly, difficult to mock all of it
'org.opensearch.ad.rest.*',

'org.opensearch.ad.model.ModelProfileOnNode',
'org.opensearch.ad.model.InitProgressProfile',
'org.opensearch.ad.rest.*',
'org.opensearch.ad.AnomalyDetectorJobRunner',

// Class containing just constants. Don't need to test
'org.opensearch.ad.constant.*',
'org.opensearch.forecast.constant.*',
'org.opensearch.timeseries.constant.*',
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

// related to transport actions added for security
'org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction.1',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
// TODO: fix unstable code coverage caused by null NodeClient issue
// https://github.com/opensearch-project/anomaly-detection/issues/241
'org.opensearch.ad.task.ADBatchTaskRunner',
'org.opensearch.ad.task.ADTaskManager',
// TODO: add forecast test coverage before release

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.*',
'org.opensearch.ad.*',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.ad.ml.ADColdStart',
'org.opensearch.ad.transport.ADHCImputeNodesResponse',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
'org.opensearch.timeseries.ml.MemoryAwareConcurrentHashmap',
'org.opensearch.timeseries.transport.ResultBulkTransportAction',
'org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler',
'org.opensearch.timeseries.transport.handler.ResultIndexingHandler',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.timeseries.ml.Sample',
'org.opensearch.timeseries.ratelimit.FeatureRequest',
'org.opensearch.ad.transport.ADHCImputeNodeRequest',
'org.opensearch.timeseries.model.ModelProfileOnNode',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
'org.opensearch.ad.transport.ADHCImputeRequest',
'org.opensearch.timeseries.transport.BaseDeleteConfigTransportAction.1',
'org.opensearch.timeseries.transport.BaseSuggestConfigParamTransportAction',
'org.opensearch.timeseries.rest.AbstractSearchAction.1',
'org.opensearch.ad.transport.ADSingleStreamResultTransportAction',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker.RequestQueue',
'org.opensearch.timeseries.rest.RestStatsAction',
'org.opensearch.ad.ml.ADCheckpointDao',
'org.opensearch.timeseries.transport.CronRequest',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
'org.opensearch.timeseries.util.TimeUtil',
]


Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public void executeDetector(
startTime.toEpochMilli(),
endTime.toEpochMilli(),
ActionListener.wrap(features -> {
List<ThresholdingResult> entityResults = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> entityResults = modelManager.getPreviewResults(features, detector);
List<AnomalyResult> sampledEntityResults = sample(
parsePreviewResult(detector, features, entityResults, entity),
maxPreviewResults
Expand All @@ -116,8 +115,7 @@ public void executeDetector(
} else {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> results = modelManager.getPreviewResults(features, detector);
listener.onResponse(sample(parsePreviewResult(detector, features, results, null), maxPreviewResults));
} catch (Exception e) {
onFailure(e, listener, detector.getId());
Expand Down
43 changes: 24 additions & 19 deletions src/main/java/org/opensearch/ad/ml/ADColdStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected List<Sample> trainModelFromDataSegments(

double[] firstPoint = pointSamples.get(0).getValueList();
if (firstPoint == null || firstPoint.length == 0) {
logger.info("Return early since data points must not be empty.");
logger.info("Return early since the first data point must not be empty.");
return null;
}

Expand Down Expand Up @@ -216,6 +216,29 @@ protected List<Sample> trainModelFromDataSegments(
}

AnomalyDetector detector = (AnomalyDetector) config;
applyRule(rcfBuilder, detector);

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
// We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage.
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}

public static void applyRule(ThresholdedRandomCutForest.Builder rcfBuilder, AnomalyDetector detector) {
ThresholdArrays thresholdArrays = IgnoreSimilarExtractor.processDetectorRules(detector);

if (thresholdArrays != null) {
Expand All @@ -235,23 +258,5 @@ protected List<Sample> trainModelFromDataSegments(
rcfBuilder.ignoreNearExpectedFromBelowByRatio(thresholdArrays.ignoreSimilarFromBelowByRatio);
}
}

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}
}
74 changes: 52 additions & 22 deletions src/main/java/org/opensearch/ad/ml/ADModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.ImputedFeatureResult;
import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
Expand All @@ -47,13 +49,17 @@
import org.opensearch.timeseries.feature.FeatureManager;
import org.opensearch.timeseries.feature.Features;
import org.opensearch.timeseries.ml.MemoryAwareConcurrentHashmap;
import org.opensearch.timeseries.ml.ModelColdStart;
import org.opensearch.timeseries.ml.ModelManager;
import org.opensearch.timeseries.ml.ModelState;
import org.opensearch.timeseries.ml.SingleStreamModelIdMapper;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DateUtils;
import org.opensearch.timeseries.util.ModelUtil;

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.ForestMode;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
Expand Down Expand Up @@ -137,7 +143,11 @@ public ADModelManager(
this.initialAcceptFraction = rcfNumMinSamples * 1.0d / rcfNumSamplesInTree;
}

@Deprecated
/**
* used in RCFResultTransportAction to handle request from old node request.
* In the new logic, we switch to SingleStreamResultAction.
*
* Returns to listener the RCF anomaly result using the specified model.
*
* @param detectorId ID of the detector
Expand Down Expand Up @@ -194,7 +204,9 @@ private void getTRcfResult(
result.getExpectedValuesList(),
result.getLikelihoodOfValues(),
result.getThreshold(),
result.getNumberOfTrees()
result.getNumberOfTrees(),
point,
null
)
);
} catch (Exception e) {
Expand Down Expand Up @@ -513,11 +525,10 @@ private <T> void maintenanceForIterator(
* Returns computed anomaly results for preview data points.
*
* @param features features of preview data points
* @param shingleSize model shingle size
* @return rcfTimeDecay rcf time decay
* @param detector Anomaly detector
* @throws IllegalArgumentException when preview data points are not valid
*/
public List<ThresholdingResult> getPreviewResults(Features features, int shingleSize, double rcfTimeDecay) {
public List<ThresholdingResult> getPreviewResults(Features features, AnomalyDetector detector) {
double[][] dataPoints = features.getUnprocessedFeatures();
if (dataPoints.length < minPreviewSize) {
throw new IllegalArgumentException("Insufficient data for preview results. Minimum required: " + minPreviewSize);
Expand All @@ -528,11 +539,15 @@ public List<ThresholdingResult> getPreviewResults(Features features, int shingle
String.format(Locale.ROOT, "time range size %d does not match data points size %d", timeRanges.size(), dataPoints.length)
);
}

int shingleSize = detector.getShingleSize();
double rcfTimeDecay = detector.getTimeDecay();

// Train RCF models and collect non-zero scores
int baseDimension = dataPoints[0].length;
// speed is important in preview. We don't want cx to wait too long.
// thus use the default value of boundingBoxCacheFraction = 1
ThresholdedRandomCutForest trcf = ThresholdedRandomCutForest
ThresholdedRandomCutForest.Builder trcfBuilder = ThresholdedRandomCutForest
.builder()
.randomSeed(0L)
.dimensions(baseDimension * shingleSize)
Expand All @@ -550,27 +565,32 @@ public List<ThresholdingResult> getPreviewResults(Features features, int shingle
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(true)
.build();
.internalShinglingEnabled(true);

if (shingleSize > 1) {
trcfBuilder.forestMode(ForestMode.STREAMING_IMPUTE);
trcfBuilder = ModelColdStart.applyImputationMethod(detector, trcfBuilder);
} else {
// imputation with shingle size 1 is not meaningful
trcfBuilder.forestMode(ForestMode.STANDARD);
}

ADColdStart.applyRule(trcfBuilder, detector);

ThresholdedRandomCutForest trcf = trcfBuilder.build();

return IntStream.range(0, dataPoints.length).mapToObj(i -> {
// we don't have missing values in preview data. We have already filtered them out.
double[] point = dataPoints[i];
// Get the data end epoch milliseconds corresponding to this index and convert it to seconds
long timestampSecs = timeRanges.get(i).getValue() / 1000;
AnomalyDescriptor descriptor = trcf.process(point, timestampSecs); // Use the timestamp here
return new ThresholdingResult(
descriptor.getAnomalyGrade(),
descriptor.getDataConfidence(),
descriptor.getRCFScore(),
descriptor.getTotalUpdates(),
descriptor.getRelativeIndex(),
normalizeAttribution(trcf.getForest(), descriptor.getRelevantAttribution()),
descriptor.getPastValues(),
descriptor.getExpectedValuesList(),
descriptor.getLikelihoodOfValues(),
descriptor.getThreshold(),
rcfNumTrees
);

if (descriptor != null) {
return toResult(trcf.getForest(), descriptor, point, false, detector);
}

return null;
}).collect(Collectors.toList());
}

Expand Down Expand Up @@ -623,7 +643,15 @@ protected ThresholdingResult createEmptyResult() {
}

@Override
protected ThresholdingResult toResult(RandomCutForest rcf, AnomalyDescriptor anomalyDescriptor) {
protected ThresholdingResult toResult(
RandomCutForest rcf,
AnomalyDescriptor anomalyDescriptor,
double[] point,
boolean isImputed,
Config config
) {
ImputedFeatureResult result = ModelUtil.calculateImputedFeatures(anomalyDescriptor, point, isImputed, config);

return new ThresholdingResult(
anomalyDescriptor.getAnomalyGrade(),
anomalyDescriptor.getDataConfidence(),
Expand All @@ -635,7 +663,9 @@ protected ThresholdingResult toResult(RandomCutForest rcf, AnomalyDescriptor ano
anomalyDescriptor.getExpectedValuesList(),
anomalyDescriptor.getLikelihoodOfValues(),
anomalyDescriptor.getThreshold(),
rcfNumTrees
rcfNumTrees,
result.getActual(),
result.getIsFeatureImputed()
);
}
}
Loading

0 comments on commit b5e85e1

Please sign in to comment.