Skip to content

Commit

Permalink
HPCC-31650 Address incorrect analyzer cost calc and cost threshold
Browse files Browse the repository at this point in the history
* The rate used to calculate the cost of issues has been updated so
that the unit is consistent and produces valid cost calculation.
* 'minInterestingCost' is a decimal value to set the minimum dollar
cost value for reported issues.
* 'minInterestingCost' is compared with the calculated cost of the
issue
*  'minInterestingWaste' is a new option to set the minimum time
threshold for reported issues.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jan 9, 2025
1 parent 803672b commit 4e1059e
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 71 deletions.
12 changes: 5 additions & 7 deletions common/wuanalysis/anacommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void PerformanceIssue::print() const
printf("\n");
}

void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
void PerformanceIssue::createException(IWorkUnit * wu)
{
ErrorSeverity mappedSeverity = wu->getWarningSeverity(errorCode, (ErrorSeverity)SeverityWarning);
if (mappedSeverity == SeverityIgnore)
Expand All @@ -66,17 +66,15 @@ void PerformanceIssue::createException(IWorkUnit * wu, double costRate)
StringBuffer s(comment); // Append scope to comment as scope column is not visible in ECLWatch
s.appendf(" (%s)", scope.str());
we->setExceptionMessage(s.str());
if (costRate!=0.0)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
we->setCost(timePenaltyPerHour*costRate);
}
if (costPenalty!=0)
we->setCost(cost_type2money(costPenalty));
we->setExceptionSource(CostOptimizerName);
}

void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...)
void PerformanceIssue::set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...)
{
timePenalty = _timePenalty;
costPenalty = _costPenalty;
errorCode = _errorCode;
va_list args;
va_start(args, msg);
Expand Down
10 changes: 7 additions & 3 deletions common/wuanalysis/anacommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ class PerformanceIssue : public CInterface
public:
int compareCost(const PerformanceIssue & other) const;
void print() const;
void createException(IWorkUnit * we, double costRate);
void createException(IWorkUnit * we);

void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, const char * msg, ...) __attribute__((format(printf, 4, 5)));
void set(AnalyzerErrorCode _errorCode, stat_type _timePenalty, cost_type _costPenalty, const char * msg, ...) __attribute__((format(printf, 5, 6)));
void setLocation(const char * definition);
void setScope(const char *_scope) { scope.set(_scope); }
stat_type getTimePenalityCost() const { return timePenalty; }
stat_type getTimePenalty() const { return timePenalty; }
cost_type getCostPenalty() const { return costPenalty; }

private:
AnalyzerErrorCode errorCode = ANA_GENERICERROR_ID;
Expand All @@ -76,6 +77,7 @@ class PerformanceIssue : public CInterface
unsigned column = 0;
StringAttr scope;
stat_type timePenalty = 0; // number of nanoseconds lost as a result.
cost_type costPenalty = 0;
StringBuffer comment;
};

Expand All @@ -84,9 +86,11 @@ enum WutOptionType
watOptFirst=0,
watOptMinInterestingTime=0,
watOptMinInterestingCost,
watOptMinInterestingWaste,
watOptSkewThreshold,
watOptMinRowsPerNode,
watPreFilteredKJThreshold,
watClusterCostPerHour,
watOptMax
};

Expand Down
98 changes: 68 additions & 30 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@
#include "anarule.hpp"
#include "commonext.hpp"


static constexpr cost_type calcIssueCost(const stat_type clusterCostPerHour, stat_type timeWasted)
{
double timePenaltyHours = statUnits2seconds(timeWasted) / 3600;
return timePenaltyHours * clusterCostPerHour;
}

static constexpr bool isWorthReporting(const IAnalyserOptions & options, stat_type timeWasted, cost_type moneyWasted)
{
// if neither threshold is set, then report all issues
if (timeWasted && options.queryOption(watOptMinInterestingWaste)==0 && options.queryOption(watOptMinInterestingCost)==0)
return true;
// if the cluster cost available and the issue cost is greater than threshold, report it
if (options.queryOption(watClusterCostPerHour) && options.queryOption(watOptMinInterestingCost)
&& (moneyWasted >= options.queryOption(watOptMinInterestingCost)))
return true;
// if the issue time wasted is greater than threshold, report it
if (options.queryOption(watOptMinInterestingWaste) && (timeWasted >= options.queryOption(watOptMinInterestingWaste)))
return true;
return false;
}

class ActivityKindRule : public AActivityRule
{
public:
Expand Down Expand Up @@ -52,21 +74,24 @@ class DistributeSkewRule : public ActivityKindRule
stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
if (rowsMaxSkew > options.queryOption(watOptSkewThreshold))
{
// Use downstream activity time to calculate approximate cost
// Use downstream activity time to calculate approximate time wasted
IWuActivity * targetActivity = outputEdge->queryTarget();
assertex(targetActivity);
stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
// Consider ways to improve this cost calculation further
stat_type cost = timeMaxLocalExecute - timeAvgLocalExecute;

IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, cost, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, cost, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
// Consider ways to improve this time wasted calculation
stat_type timeWasted = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timeWasted, moneyWasted, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timeWasted, moneyWasted, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
}
}
return false;
}
Expand Down Expand Up @@ -135,12 +160,13 @@ class IoSkewRule : public AActivityRule
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type cost;
stat_type timeWasted;
const char * msg = nullptr;
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
cost = timeMaxLocalExecute;
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Uneven worker spilling is causing uneven %s time", category);
timeWasted = timeMaxLocalExecute;
msg = "Uneven worker spilling";
}
else
{
Expand All @@ -161,19 +187,25 @@ class IoSkewRule : public AActivityRule
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
cost = (timeMaxLocalExecute - timeAvgLocalExecute);
timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
if (sizeSkew)
{
if (numRowsSkew)
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in number of records is causing uneven %s time", category);
msg = "Significant skew in number of records";
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in record sizes is causing uneven %s time", category);
msg = "Significant skew in record sizes";
}
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in IO performance is causing uneven %s time", category);
msg = "Significant skew in IO performance";
}
assertex(msg);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_IOSKEW_RECORDS_ID, timeWasted, moneyWasted, "%s is causing uneven %s time", msg, category);
updateInformation(result, activity);
return true;
}
updateInformation(result, activity);
return true;
}
return false;
}
Expand Down Expand Up @@ -206,8 +238,9 @@ class LocalExecuteSkewRule : public AActivityRule

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
if (timePenalty<options.queryOption(watOptMinInterestingTime))
stat_type timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (!isWorthReporting(options, timeWasted, moneyWasted))
return false;

bool inputSkewed = false;
Expand All @@ -225,11 +258,12 @@ class LocalExecuteSkewRule : public AActivityRule
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time");
updateInformation(result, activity);
return true;
}
};
Expand All @@ -252,12 +286,16 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold))
{
IWuActivity * inputActivity = inputEdge->querySource();
// Use input activity as the basis of cost because the rows generated from input activity is being filtered out
// Use input activity as the basis of time wasted because the rows generated from input activity is being filtered out
stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX);
stat_type cost = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
result.set(ANA_KJ_EXCESS_PREFILTER_ID, cost, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
stat_type timeWasted = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timeWasted, moneyWasted, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
}
}
}
}
Expand Down
61 changes: 38 additions & 23 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ enum WutOptValueType
wutOptValueTypePercent,
wutOptValueTypeCount,
wutOptValueTypeBool,
wutOptValueTypeCost,
wutOptValueTypeMax,
};

Expand All @@ -71,10 +72,13 @@ struct WuOption

constexpr struct WuOption wuOptionsDefaults[watOptMax]
= { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", 30000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(1.00) /* $1.00 */, wutOptValueTypeCost},
{watOptMinInterestingWaste, "minInterestingTimeWaste", 30000, wutOptValueTypeMSec},
{watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent},
{watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount},
{watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent},
/* Note watClusterCostPerHour cannot be used as debug option or config option (this is calculated) */
{watClusterCostPerHour, "costRatePerHour", 0, wutOptValueTypeCost},
};

constexpr bool checkWuOptionsDefaults(int i = watOptMax)
Expand Down Expand Up @@ -107,9 +111,8 @@ class WuAnalyserOptions : public IAnalyserOptions
case wutOptValueTypePercent:
wuOptions[opt] = statPercent((stat_type)val);
break;
case wutOptValueTypeCost:
case wutOptValueTypeCount:
wuOptions[opt] = (stat_type) val;
break;
case wutOptValueTypeBool:
wuOptions[opt] = (stat_type) val;
break;
Expand All @@ -125,9 +128,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("@");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = options->getPropInt64(wuOptionName, -1);
if (val!=-1)
if (options->hasProp(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(options->getPropReal(wuOptionName));
else
val = options->getPropInt64(wuOptionName);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}

Expand All @@ -137,9 +146,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("analyzer_");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = wu->getDebugValueInt64(wuOptionName, -1);
if (val!=-1)
if (wu->hasDebugValue(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(wu->getDebugValueReal(wuOptionName, 0.0));
else
val = wu->getDebugValueInt64(wuOptionName, 0);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}
stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; }
Expand Down Expand Up @@ -175,12 +190,12 @@ class WorkunitRuleAnalyser : public WorkunitAnalyserBase
public:
WorkunitRuleAnalyser();

void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu);
void applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double _costRate);

void applyRules();
void check(const char * scope, IWuActivity & activity);
void print();
void update(IWorkUnit *wu, double costRate);
void update(IWorkUnit *wu);

protected:
CIArrayOf<AActivityRule> rules;
Expand Down Expand Up @@ -1353,10 +1368,13 @@ WorkunitRuleAnalyser::WorkunitRuleAnalyser()
gatherRules(rules);
}

void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu)
void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu, double costRate)
{
options.applyConfig(cfg);
options.applyConfig(wu);
/* watClusterCostPerHour is calculated by caller and its value is set in options*/
/* (So, watClusterCostPerHour cannot be used as debug option or config option)*/
options.setOptionValue(watClusterCostPerHour, money2cost_type(costRate));
}


Expand All @@ -1372,11 +1390,8 @@ void WorkunitRuleAnalyser::check(const char * scope, IWuActivity & activity)
Owned<PerformanceIssue> issue (new PerformanceIssue);
if (rules.item(i).check(*issue, activity, options))
{
if (issue->getTimePenalityCost() >= options.queryOption(watOptMinInterestingCost))
{
if (!highestCostIssue || highestCostIssue->getTimePenalityCost() < issue->getTimePenalityCost())
highestCostIssue.setown(issue.getClear());
}
if (!highestCostIssue || highestCostIssue->getTimePenalty() < issue->getTimePenalty())
highestCostIssue.setown(issue.getClear());
}
}
}
Expand All @@ -1401,10 +1416,10 @@ void WorkunitRuleAnalyser::print()
issues.item(i).print();
}

void WorkunitRuleAnalyser::update(IWorkUnit *wu, double costRate)
void WorkunitRuleAnalyser::update(IWorkUnit *wu)
{
ForEachItemIn(i, issues)
issues.item(i).createException(wu, costRate);
issues.item(i).createException(wu);
}


Expand Down Expand Up @@ -2087,27 +2102,27 @@ void WorkunitStatsAnalyser::traceDependencies()

//---------------------------------------------------------------------------------------------------------------------

void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerMs)
void WUANALYSIS_API analyseWorkunit(IWorkUnit * wu, const char *optGraph, IPropertyTree *options, double costPerHour)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(options, wu);
analyser.applyConfig(options, wu, costPerHour);
analyser.analyse(wu, optGraph);
analyser.applyRules();
analyser.update(wu, costPerMs);
analyser.update(wu);
}

void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costRate, bool updatewu)
void WUANALYSIS_API analyseAndPrintIssues(IConstWorkUnit * wu, double costPerHour, bool updatewu)
{
WorkunitRuleAnalyser analyser;
analyser.applyConfig(nullptr, wu);
analyser.applyConfig(nullptr, wu, costPerHour);
analyser.analyse(wu, nullptr);
analyser.applyRules();
analyser.print();
if (updatewu)
{
Owned<IWorkUnit> lockedwu = &(wu->lock());
lockedwu->clearExceptions(CostOptimizerName);
analyser.update(lockedwu, costRate);
analyser.update(lockedwu);
}
}

Expand Down
Loading

0 comments on commit 4e1059e

Please sign in to comment.