From f68a78fb8f0d2d7ff2da063b0fb885318f5ac996 Mon Sep 17 00:00:00 2001 From: M Kelly Date: Fri, 3 Jan 2025 12:12:16 -0500 Subject: [PATCH] HPCC-32958 Roxie dynamic priority 4 Signed-off-by: M Kelly --- roxie/ccd/ccd.hpp | 9 +++++- roxie/ccd/ccdlistener.cpp | 26 ++++++++--------- roxie/ccd/ccdquery.cpp | 60 ++++++++++++++++++--------------------- roxie/ccd/ccdquery.hpp | 3 +- roxie/ccd/ccdserver.cpp | 18 ++++++------ 5 files changed, 60 insertions(+), 56 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 59dab8cdaa3..03268d603c0 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -82,6 +82,13 @@ void setMulticastEndpoints(unsigned numChannels); #define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY) +#define QUERY_BG_PRIORITY_VALUE -1 +#define QUERY_LOW_PRIORITY_VALUE 0 +#define QUERY_HIGH_PRIORITY_VALUE 1 +#define QUERY_SLA_PRIORITY_VALUE 2 +static constexpr int queryMinPriorityValue = QUERY_BG_PRIORITY_VALUE; +static constexpr int queryMaxPriorityValue = QUERY_SLA_PRIORITY_VALUE; + #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities // Status information returned in the activityId field of the header: @@ -485,7 +492,7 @@ inline unsigned getBondedChannel(unsigned partNo) return ((partNo - 1) % numChannels) + 1; } -extern unsigned priorityMask(int priority); +extern unsigned getPriorityMask(int priority); extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2))); extern unsigned getNextInstanceId(); diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 57e5c1f2d0a..a0c014358b6 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1241,10 +1241,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker { switch((int)priority) { - case 0: loQueryStats.noteQuery(failed, elapsedTime); break; - case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; - case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; - case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteQuery(failed, elapsedTime); break; } combinedQueryStats.noteQuery(failed, elapsedTime); } @@ -1334,7 +1334,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker unsigned agentsReplyLen = 0; unsigned agentsDuplicates = 0; unsigned agentsResends = 0; - unsigned priority = (unsigned) -2; + unsigned priority = (unsigned) -2; // NB -2 is outside of priority range try { bool isBlind = wu->getDebugValueBool("blindLogging", false); @@ -1358,10 +1358,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker priority = queryFactory->queryOptions().priority; switch ((int)priority) { - case 0: loQueryStats.noteActive(); break; - case 1: hiQueryStats.noteActive(); break; - case 2: slaQueryStats.noteActive(); break; - case -1: bgQueryStats.noteActive(); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break; } combinedQueryStats.noteActive(); Owned ctx = queryFactory->createContext(wu, logctx); @@ -1528,10 +1528,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte unsigned priority = getQueryPriority(); switch ((int)priority) { - case 0: loQueryStats.noteActive(); break; - case 1: hiQueryStats.noteActive(); break; - case 2: slaQueryStats.noteActive(); break; - case -1: bgQueryStats.noteActive(); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break; } unknownQueryStats.noteComplete(); combinedQueryStats.noteActive(); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index bbdb64e42ae..05287d8f803 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -318,8 +318,8 @@ class CSharedOnceContext : public CInterfaceOf QueryOptions::QueryOptions() { - priority = 0; - dynPriority = 0; + priority = QUERY_LOW_PRIORITY_VALUE; + dynPriority = QUERY_LOW_PRIORITY_VALUE; timeLimit = defaultTimeLimit[0]; warnTimeLimit = defaultWarnTimeLimit[0]; @@ -359,7 +359,7 @@ QueryOptions::QueryOptions() QueryOptions::QueryOptions(const QueryOptions &other) { priority = other.priority; - dynPriority = other.dynPriority; + dynPriority = other.dynPriority.load(); timeLimit = other.timeLimit; warnTimeLimit = other.warnTimeLimit; @@ -396,14 +396,10 @@ QueryOptions::QueryOptions(const QueryOptions &other) numWorkflowThreads = other.numWorkflowThreads; } -void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo) +void QueryOptions::updateDynPriority(int _priority) { - // calculate priority before others since it affects the defaults of others - updateFromWorkUnit(priority, wu, "priority"); - if (stateInfo) - updateFromContext(priority, stateInfo, "@priority"); - dynPriority = priority; - if ((int)priority < 0) + dynPriority = _priority; + if (dynPriority < QUERY_LOW_PRIORITY_VALUE) { // use LOW queue time limits ... timeLimit = defaultTimeLimit[0]; @@ -411,9 +407,20 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat } else { - timeLimit = defaultTimeLimit[priority]; - warnTimeLimit = defaultWarnTimeLimit[priority]; + timeLimit = defaultTimeLimit[_priority]; + warnTimeLimit = defaultWarnTimeLimit[_priority]; } +} + +void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo) +{ + // calculate priority before others since it affects the defaults of others + updateFromWorkUnit(priority, wu, "priority"); + if (stateInfo) + updateFromContext(priority, stateInfo, "@priority"); + + updateDynPriority((int)priority); + updateFromWorkUnit(timeLimit, wu, "timeLimit"); updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit"); updateFromWorkUnitM(memoryLimit, wu, "memoryLimit"); @@ -500,28 +507,17 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx) // Note: priority cannot be set at context level // b/c this is after activities have been created, but we could // dynamically adj priority in the header activityId before sending - int tmpPriority; + int tmpPriority = (int)priority; updateFromContext(tmpPriority, ctx, "@priority", "_Priority"); - if (tmpPriority > 1) - tmpPriority = 1; - if (tmpPriority < -1) - tmpPriority = -1; + if (tmpPriority > queryMaxPriorityValue) + tmpPriority = queryMaxPriorityValue; + if (tmpPriority < queryMinPriorityValue) + tmpPriority = queryMinPriorityValue; + + // only adjust lower ... if (tmpPriority < (int)priority) - { - dynPriority = tmpPriority; - if (dynPriority < 0) - { - // use LOW queue time limits ... - timeLimit = defaultTimeLimit[0]; - warnTimeLimit = defaultWarnTimeLimit[0]; - } - else - { - timeLimit = defaultTimeLimit[dynPriority]; - warnTimeLimit = defaultWarnTimeLimit[dynPriority]; - } - } + updateDynPriority(tmpPriority); updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit"); updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit"); @@ -653,7 +649,7 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub if (isSuspended) return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point? - rid |= priorityMask(options.priority); + rid |= getPriorityMask(options.priority); StringBuffer helperName; helperName.append("fAc").append(id); diff --git a/roxie/ccd/ccdquery.hpp b/roxie/ccd/ccdquery.hpp index 844ed6834b3..0c2f5d90968 100644 --- a/roxie/ccd/ccdquery.hpp +++ b/roxie/ccd/ccdquery.hpp @@ -79,9 +79,10 @@ class QueryOptions void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo); void setFromContext(const IPropertyTree *ctx); void setFromAgentLoggingFlags(unsigned loggingFlags); + void updateDynPriority(int _priority); unsigned priority; - mutable int dynPriority; + mutable std::atomic dynPriority; unsigned timeLimit; unsigned warnTimeLimit; unsigned traceLimit; diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index ed5124a0c52..503e57c2d99 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -3629,18 +3629,18 @@ void throwRemoteException(IMessageUnpackCursor *extra) throwUnexpected(); } -unsigned priorityMask(int priority) +unsigned getPriorityMask(int priority) { unsigned newPri = ROXIE_BG_PRIORITY; switch (priority) { - case 2: + case QUERY_SLA_PRIORITY_VALUE: newPri = ROXIE_SLA_PRIORITY; break; - case 1: + case QUERY_HIGH_PRIORITY_VALUE: newPri = ROXIE_HIGH_PRIORITY; break; - case 0: + case QUERY_LOW_PRIORITY_VALUE: newPri = ROXIE_LOW_PRIORITY; break; } @@ -4584,16 +4584,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie int dynPriority = ctx->queryOptions().dynPriority; if (dynPriority < origPriority) { - unsigned newPri = priorityMask(dynPriority); + unsigned newPri = getPriorityMask(dynPriority); p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; p->queryHeader().activityId |= newPri; } // TODO: perhaps check elapsed every Nth msg ? - if ( (dynPriorityAdjustCycles > 0) && (origPriority == 0) && (dynPriority == 0) && + if ( (dynPriorityAdjustCycles > 0) && (origPriority == QUERY_LOW_PRIORITY_VALUE) && (dynPriority == QUERY_LOW_PRIORITY_VALUE) && (ctx->queryElapsedCycles() > dynPriorityAdjustCycles) ) { - ctx->queryOptions().dynPriority = -1; + ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE; unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles(); UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec); p->queryHeader().activityId |= ROXIE_BG_PRIORITY; @@ -5061,10 +5061,10 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie unsigned timeout = lowTimeout; switch (activity.queryContext()->queryOptions().dynPriority) { - case 2: + case QUERY_SLA_PRIORITY_VALUE: timeout = slaTimeout; break; - case 1: + case QUERY_HIGH_PRIORITY_VALUE: timeout = highTimeout; break; }