-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Handling Missing Data in Anomaly Detection #1274
Conversation
testBackwardsCompatibility failed similiar to opensearch-project/k-NN#1622 Execution failed for task ':adBwcCluster#twoThirdsUpgradedClusterTask'. It is a core issue. Created an issue there: opensearch-project/OpenSearch#15234
|
for (int i = 0; i < pointSamples.size(); i++) { | ||
Sample dataSample = pointSamples.get(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be replaced with for-each loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's hard to say which one is better. https://programmerr47.medium.com/to-index-or-iterate-7b81039e5484 shows indexed loop is faster than for-each loop.
This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy. Key Changes: 1. Enhanced Missing Data Handling: Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner). 2. Refactoring Imputation & Processing: Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer. 3. Improved Imputed Value Reconstruction: Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult). 4. Broadcast Support for HC Detectors: Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor). 5. Introduction of ActionListenerExecutor: Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool. Testing: Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage. Signed-off-by: Kaituo Li <[email protected]>
Signed-off-by: Kaituo Li <[email protected]>
Whitesource check does not run and it is a known issue to infra team. |
@@ -147,23 +145,22 @@ public <RCFDescriptor extends AnomalyDescriptor> IntermediateResultType score( | |||
if (!modelState.getSamples().isEmpty()) { | |||
for (Sample unProcessedSample : modelState.getSamples()) { | |||
// we are sure that the process method will indeed return an instance of RCFDescriptor. | |||
rcfModel.process(unProcessedSample.getValueList(), unProcessedSample.getDataEndTime().getEpochSecond()); | |||
double[] unProcessedPoint = unProcessedSample.getValueList(); | |||
int[] missingIndices = DataUtil.generateMissingIndicesArray(unProcessedPoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these missing indices referring to the features that don't have values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
double[] toScore = null; | ||
if (dataPoint.isEmpty()) { | ||
toScore = new double[detector.getEnabledFeatureIds().size()]; | ||
Arrays.fill(toScore, Double.NaN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we filling it here with Double.NaN and not the filledValues, for example the fixed value or previous value, is this done elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double.NaN is used to signal we should put the corresponding indices in the missing value array. RCF will fill in fixed value or previous value according to the missing value array.
|
||
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; | ||
|
||
public class ADHCImputeTransportAction extends |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a little more comments in this class, I was a little confused on the broadcasting and why we are sending NaN values. Or how do we know which node to has which entities
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment:
/**
- This class manages the broadcasting mechanism and entity data processing for
- the HC detector. The system broadcasts a message after processing all records
- in each interval to ensure that each node examines its hot models in memory
- and determines which entity models have not received data during the current interval.
- "Hot" entities refer to those models actively loaded in memory, as opposed to
- "cold" models, which are not loaded and remain in storage due to limited memory resources.
- Upon receiving the broadcast message, each node checks whether each hot entity
- has received new data. If a hot entity has not received any data, the system
- assigns a NaN value to that entity. This NaN value signals to the model that no
- data was received, prompting it to impute the missing value based on previous data,
- rather than using current interval data.
- The system determines which node manages which entities based on memory availability.
- The coordinating node does not immediately know which entities are hot or cold;
- it learns this during the pagination process. Hot entities are those that have
- recently received data and are actively maintained in memory, while cold entities
- remain in storage and are processed only if time permits within the interval.
- For cold entities whose models are not loaded in memory, the system does not
- produce an anomaly result for that interval due to insufficient time or resources
- to process them. This is particularly relevant in scenarios with short intervals,
- such as one minute, where an underscaled cluster may cause processing delays
- that prevent timely anomaly detection for some entities.
*/
* @param executorService the ExecutorService used to execute the onResponse handler asynchronously | ||
* @return an ActionListener that handles the response and failure cases | ||
*/ | ||
public static <Response> ActionListener<Response> wrap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the only difference with this added that we are making sure only to use AD thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
Signed-off-by: Kaituo Li <[email protected]>
The backport to
To backport manually, run these commands in your terminal: # Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/anomaly-detection/backport-2.x 2.x
# Navigate to the new working tree
pushd ../.worktrees/anomaly-detection/backport-2.x
# Create a new branch
git switch --create backport/backport-1274-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 dc85dc4e97dc1fc14a5d367072c6a40dbec2ee7c
# Push it to GitHub
git push --set-upstream origin backport/backport-1274-to-2.x
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/anomaly-detection/backport-2.x Then, create a pull request where the |
…h-project#1274) * Add Support for Handling Missing Data in Anomaly Detection This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy. Key Changes: 1. Enhanced Missing Data Handling: Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner). 2. Refactoring Imputation & Processing: Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer. 3. Improved Imputed Value Reconstruction: Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult). 4. Broadcast Support for HC Detectors: Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor). 5. Introduction of ActionListenerExecutor: Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool. Testing: Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage. Signed-off-by: Kaituo Li <[email protected]> * rebase from main Signed-off-by: Kaituo Li <[email protected]> * add comment and remove redundant code Signed-off-by: Kaituo Li <[email protected]> --------- Signed-off-by: Kaituo Li <[email protected]>
…h-project#1274) * Add Support for Handling Missing Data in Anomaly Detection This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy. Key Changes: 1. Enhanced Missing Data Handling: Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner). 2. Refactoring Imputation & Processing: Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer. 3. Improved Imputed Value Reconstruction: Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult). 4. Broadcast Support for HC Detectors: Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor). 5. Introduction of ActionListenerExecutor: Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool. Testing: Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage. Signed-off-by: Kaituo Li <[email protected]> * rebase from main Signed-off-by: Kaituo Li <[email protected]> * add comment and remove redundant code Signed-off-by: Kaituo Li <[email protected]> --------- Signed-off-by: Kaituo Li <[email protected]>
…1281) * Add Support for Handling Missing Data in Anomaly Detection This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy. Key Changes: 1. Enhanced Missing Data Handling: Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner). 2. Refactoring Imputation & Processing: Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer. 3. Improved Imputed Value Reconstruction: Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult). 4. Broadcast Support for HC Detectors: Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor). 5. Introduction of ActionListenerExecutor: Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool. Testing: Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage. * rebase from main * add comment and remove redundant code --------- Signed-off-by: Kaituo Li <[email protected]>
Description
This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.
Key Changes:
Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).
Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.
Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).
Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).
Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.
Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7177 lines added and 1685 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.