Skip to content

Commit

Permalink
HPCC-31650 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jan 3, 2025
1 parent 6ddcdc3 commit 3055bf3
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 51 deletions.
3 changes: 2 additions & 1 deletion common/wuanalysis/anacommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ enum WutOptionType
watOptFirst=0,
watOptMinInterestingTime=0,
watOptMinInterestingCost,
watOptMinInterestingWaste,
watOptSkewThreshold,
watOptMinRowsPerNode,
watPreFilteredKJThreshold,
watCostRatePerHour,
watClusterCostPerHour,
watOptMax
};

Expand Down
71 changes: 43 additions & 28 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,25 @@
#include "commonext.hpp"


static cost_type calcIssueCost(stat_type timePenalty, const stat_type clusterCostPerHour)
static constexpr cost_type calcIssueCost(const stat_type clusterCostPerHour, stat_type timeWasted)
{
double timePenaltyPerHour = (double)statUnits2seconds(timePenalty) / 3600;
return timePenaltyPerHour*clusterCostPerHour;
double timePenaltyHours = statUnits2seconds(timeWasted) / 3600;
return timePenaltyHours * clusterCostPerHour;
}

static constexpr bool isWorthReporting(const IAnalyserOptions & options, stat_type timeWasted, cost_type moneyWasted)
{
// if neither threshold is set, then report all issues
if (timeWasted && options.queryOption(watOptMinInterestingWaste)==0 && options.queryOption(watOptMinInterestingCost)==0)
return true;
// if the cluster cost available and the issue cost is greater than threshold, report it
if (options.queryOption(watClusterCostPerHour) && options.queryOption(watOptMinInterestingCost)
&& (moneyWasted >= options.queryOption(watOptMinInterestingCost)))
return true;
// if the issue time wasted is greater than threshold, report it
if (options.queryOption(watOptMinInterestingWaste) && (timeWasted >= options.queryOption(watOptMinInterestingWaste)))
return true;
return false;
}

class ActivityKindRule : public AActivityRule
Expand Down Expand Up @@ -59,21 +74,21 @@ class DistributeSkewRule : public ActivityKindRule
stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
if (rowsMaxSkew > options.queryOption(watOptSkewThreshold))
{
// Use downstream activity time to calculate approximate timePenalty
// Use downstream activity time to calculate approximate time wasted
IWuActivity * targetActivity = outputEdge->queryTarget();
assertex(targetActivity);
stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
// Consider ways to improve this timePenalty calculation further
stat_type timePenalty = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
// Consider ways to improve this time wasted calculation
stat_type timeWasted = timeMaxLocalExecute - timeAvgLocalExecute;
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
IWuEdge * inputEdge = activity.queryInput(0);
if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timePenalty, costPenalty, "DISTRIBUTE output skew is worse than input skew");
result.set(ANA_DISTRIB_SKEW_INPUT_ID, timeWasted, moneyWasted, "DISTRIBUTE output skew is worse than input skew");
else
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timePenalty, costPenalty, "Significant skew in DISTRIBUTE output");
result.set(ANA_DISTRIB_SKEW_OUTPUT_ID, timeWasted, moneyWasted, "Significant skew in DISTRIBUTE output");
updateInformation(result, activity);
return true;
}
Expand Down Expand Up @@ -145,12 +160,12 @@ class IoSkewRule : public AActivityRule
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type timePenalty;
stat_type timeWasted;
const char * msg = nullptr;
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
timePenalty = timeMaxLocalExecute;
timeWasted = timeMaxLocalExecute;
msg = "Uneven worker spilling";
}
else
Expand All @@ -172,7 +187,7 @@ class IoSkewRule : public AActivityRule
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
if (sizeSkew)
{
if (numRowsSkew)
Expand All @@ -184,10 +199,10 @@ class IoSkewRule : public AActivityRule
msg = "Significant skew in IO performance";
}
assertex(msg);
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_IOSKEW_RECORDS_ID, timePenalty, costPenalty, "%s is causing uneven %s time", msg, category);
result.set(ANA_IOSKEW_RECORDS_ID, timeWasted, moneyWasted, "%s is causing uneven %s time", msg, category);
updateInformation(result, activity);
return true;
}
Expand Down Expand Up @@ -223,8 +238,8 @@ class LocalExecuteSkewRule : public AActivityRule

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);
if (timePenalty<options.queryOption(watOptMinInterestingTime))
stat_type timeWasted = (timeMaxLocalExecute - timeAvgLocalExecute);
if (timeWasted<options.queryOption(watOptMinInterestingTime))
return false;

bool inputSkewed = false;
Expand All @@ -241,15 +256,15 @@ class LocalExecuteSkewRule : public AActivityRule
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven input");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time caused by uneven output");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, costPenalty, "Significant skew in local execute time");
result.set(ANA_EXECUTE_SKEW_ID, timeWasted, moneyWasted, "Significant skew in local execute time");
updateInformation(result, activity);
return true;
}
Expand All @@ -275,13 +290,13 @@ class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
if (preFilteredPer > options.queryOption(watPreFilteredKJThreshold))
{
IWuActivity * inputActivity = inputEdge->querySource();
// Use input activity as the basis of timePenalty because the rows generated from input activity is being filtered out
// Use input activity as the basis of time wasted because the rows generated from input activity is being filtered out
stat_type timeAvgLocalExecute = inputActivity->getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type costPenalty = calcIssueCost(timePenalty, options.queryOption(watCostRatePerHour));
if (costPenalty >= options.queryOption(watOptMinInterestingCost))
stat_type timeWasted = statPercentageOf(timeAvgLocalExecute, preFilteredPer);
cost_type moneyWasted = calcIssueCost(options.queryOption(watClusterCostPerHour), timeWasted);
if (isWorthReporting(options, timeWasted, moneyWasted))
{
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timePenalty, costPenalty, "Large number of rows from left dataset rejected in keyed join");
result.set(ANA_KJ_EXCESS_PREFILTER_ID, timeWasted, moneyWasted, "Large number of rows from left dataset rejected in keyed join");
updateInformation(result, activity);
return true;
}
Expand Down
45 changes: 25 additions & 20 deletions common/wuanalysis/anawu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ struct WuOption

constexpr struct WuOption wuOptionsDefaults[watOptMax]
= { {watOptMinInterestingTime, "minInterestingTime", 1000, wutOptValueTypeMSec},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(5.0) /* $5 */, wutOptValueTypeCost},
{watOptMinInterestingCost, "minInterestingCost", money2cost_type(1.00) /* $1.00 */, wutOptValueTypeCost},
{watOptMinInterestingWaste, "minInterestingTimeWaste", 30000, wutOptValueTypeMSec},
{watOptSkewThreshold, "skewThreshold", 20, wutOptValueTypePercent},
{watOptMinRowsPerNode, "minRowsPerNode", 1000, wutOptValueTypeCount},
{watPreFilteredKJThreshold, "preFilteredKJThreshold", 50, wutOptValueTypePercent},
/* Note watCostRatePerHour cannot be used as debug option or config option (this is calculated) */
{watCostRatePerHour, "costRatePerHour", 0, wutOptValueTypeCost},
/* Note watClusterCostPerHour cannot be used as debug option or config option (this is calculated) */
{watClusterCostPerHour, "costRatePerHour", 0, wutOptValueTypeCost},
};

constexpr bool checkWuOptionsDefaults(int i = watOptMax)
Expand Down Expand Up @@ -127,13 +128,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("@");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(options->getPropReal(wuOptionName, cost_type2money(-1.0)));
else
val = options->getPropInt64(wuOptionName, -1);
if (val>0)
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
if (options->hasProp(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(options->getPropReal(wuOptionName));
else
val = options->getPropInt64(wuOptionName);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}

Expand All @@ -143,13 +146,15 @@ class WuAnalyserOptions : public IAnalyserOptions
{
StringBuffer wuOptionName("analyzer_");
wuOptionName.append(wuOptionsDefaults[opt].name);
__int64 val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(wu->getDebugValueReal(wuOptionName, cost_type2money(-1.0)));
else
val = wu->getDebugValueInt64(wuOptionName, -1);
if (val>0)
setOptionValue(static_cast<WutOptionType>(opt), money2cost_type(val));
if (wu->hasDebugValue(wuOptionName))
{
stat_type val = 0;
if (opt==watOptMinInterestingCost)
val = money2cost_type(wu->getDebugValueReal(wuOptionName, 0.0));
else
val = wu->getDebugValueInt64(wuOptionName, 0);
setOptionValue(static_cast<WutOptionType>(opt), val);
}
}
}
stat_type queryOption(WutOptionType opt) const override { return wuOptions[opt]; }
Expand Down Expand Up @@ -1367,9 +1372,9 @@ void WorkunitRuleAnalyser::applyConfig(IPropertyTree *cfg, IConstWorkUnit * wu,
{
options.applyConfig(cfg);
options.applyConfig(wu);
/* watCostRatePerHour is calculated by caller and its value is set in options*/
/* (So, watCostRatePerHour cannot be used as debug option or config option)*/
options.setOptionValue(watCostRatePerHour, money2cost_type(costRate));
/* watClusterCostPerHour is calculated by caller and its value is set in options*/
/* (So, watClusterCostPerHour cannot be used as debug option or config option)*/
options.setOptionValue(watClusterCostPerHour, money2cost_type(costRate));
}


Expand Down
4 changes: 2 additions & 2 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const unsigned __int64 AnyStatisticValue = MaxStatisticValue; // Use the maximum

inline constexpr stat_type seconds2StatUnits(stat_type secs) { return secs * 1000000000; }
inline constexpr stat_type msecs2StatUnits(stat_type ms) { return ms * 1000000; }
inline constexpr stat_type statUnits2seconds(stat_type stat) {return stat / 1000000000; }
inline constexpr stat_type statUnits2msecs(stat_type stat) {return stat / 1000000; }
inline constexpr double statUnits2seconds(stat_type stat) {return ((double)stat) / 1000000000; }
inline constexpr double statUnits2msecs(stat_type stat) {return ((double)stat) / 1000000; }

inline constexpr stat_type statPercent(int value) { return (stat_type)value * 100; } // Since 1 = 0.01% skew
inline constexpr stat_type statPercent(double value) { return (stat_type)(value * 100); }
Expand Down

0 comments on commit 3055bf3

Please sign in to comment.