From 0913ec441b6214e34c761d68566caa40497e1952 Mon Sep 17 00:00:00 2001 From: M Kelly Date: Mon, 11 Nov 2024 08:43:24 -0500 Subject: [PATCH] HPCC-32964 Add a Roxie Background priority queue Signed-off-by: M Kelly --- roxie/ccd/ccd.hpp | 2 ++ roxie/ccd/ccdlistener.cpp | 12 ++++--- roxie/ccd/ccdquery.cpp | 13 +++++-- roxie/ccd/ccdqueue.cpp | 71 ++++++++++++++++++++++----------------- roxie/ccd/ccdserver.cpp | 3 +- roxie/ccd/ccdsnmp.cpp | 2 ++ roxie/ccd/ccdsnmp.hpp | 1 + 7 files changed, 67 insertions(+), 37 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..f84e493a2c0 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -78,6 +78,8 @@ void setMulticastEndpoints(unsigned numChannels); #define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue #define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue #define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default) +// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set +#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY) #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 4b19ab3afb2..57e5c1f2d0a 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker { void noteQuery(bool failed, unsigned elapsedTime, unsigned priority) { - switch(priority) + switch((int)priority) { case 0: loQueryStats.noteQuery(failed, elapsedTime); break; case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; + case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; } combinedQueryStats.noteQuery(failed, elapsedTime); } @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker isBlind = isBlind || blindLogging; logctx.setBlind(isBlind); priority = queryFactory->queryOptions().priority; - switch (priority) + switch ((int)priority) { case 0: loQueryStats.noteActive(); break; case 1: hiQueryStats.noteActive(); break; case 2: slaQueryStats.noteActive(); break; + case -1: bgQueryStats.noteActive(); break; } combinedQueryStats.noteActive(); Owned ctx = queryFactory->createContext(wu, logctx); @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte virtual void noteQueryActive() { unsigned priority = getQueryPriority(); - switch (priority) + switch ((int)priority) { case 0: loQueryStats.noteActive(); break; case 1: hiQueryStats.noteActive(); break; case 2: slaQueryStats.noteActive(); break; + case -1: bgQueryStats.noteActive(); break; } unknownQueryStats.noteComplete(); combinedQueryStats.noteActive(); @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte } else { - switch(getQueryPriority()) + switch((int)getQueryPriority()) { case 0: loQueryStats.noteQuery(failed, elapsedTime); break; case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; + case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats } combinedQueryStats.noteQuery(failed, elapsedTime); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index c983ebd0067..67631c107fa 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -400,8 +400,17 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat updateFromWorkUnit(priority, wu, "priority"); if (stateInfo) updateFromContext(priority, stateInfo, "@priority"); - timeLimit = defaultTimeLimit[priority]; - warnTimeLimit = defaultWarnTimeLimit[priority]; + if ((int)priority < 0) + { + // use LOW queue time limits ... + timeLimit = defaultTimeLimit[0]; + warnTimeLimit = defaultWarnTimeLimit[0]; + } + else + { + timeLimit = defaultTimeLimit[priority]; + warnTimeLimit = defaultWarnTimeLimit[priority]; + } updateFromWorkUnit(timeLimit, wu, "timeLimit"); updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit"); updateFromWorkUnitM(memoryLimit, wu, "memoryLimit"); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index a46fb407127..5d0d52c0d6c 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -137,7 +137,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const { const IpAddress serverIP = serverId.getIpAddress(); ret.append("activityId="); - switch(activityId & ~ROXIE_PRIORITY_MASK) + switch (activityId & ~ROXIE_PRIORITY_MASK) { case 0: ret.append("IBYTI"); break; case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break; @@ -157,11 +157,12 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const break; } ret.appendf(" uid=" RUIDF " pri=", uid); - switch(activityId & ROXIE_PRIORITY_MASK) + switch (activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break; case ROXIE_LOW_PRIORITY: ret.append("LOW"); break; + case ROXIE_BG_PRIORITY: ret.append("BG"); break; default: ret.append("???"); break; } ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence); @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory public: IMPLEMENT_IINTERFACE; - RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers) + RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr) { headRegionSize = _headRegionSize; numWorkers = _numWorkers; - workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers)); + StringBuffer tname("RoxieWorkers"); + if (qname && *qname) + tname.appendf(" (%s)", qname); + workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers)); started = 0; idle = 0; if (IBYTIbufferSize) @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface RoxieQueue slaQueue; RoxieQueue hiQueue; RoxieQueue loQueue; + RoxieQueue bgQueue; unsigned numWorkers; public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers) { } @@ -1923,6 +1928,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface slaQueue.setHeadRegionSize(newSize); hiQueue.setHeadRegionSize(newSize); loQueue.setHeadRegionSize(newSize); + bgQueue.setHeadRegionSize(newSize); } virtual void start() @@ -1930,6 +1936,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); + bgQueue.start(); // consider nice(+3) BG threads } virtual void stop() @@ -1937,6 +1944,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.stopAll(); hiQueue.stopAll(); slaQueue.stopAll(); + bgQueue.stopAll(); } virtual void join() @@ -1944,6 +1952,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.join(); hiQueue.join(); slaQueue.join(); + bgQueue.join(); } IArrayOf callbacks; @@ -2254,7 +2263,7 @@ class DelayedPacketQueue } // Move any that we are done waiting for our buddy onto the active queue - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { assert(GetCurrentThreadId()==roxiePacketReaderThread); DelayedPacketEntry *finger = head; @@ -2270,12 +2279,13 @@ class DelayedPacketQueue DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str()); } unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp(); - if (header.activityId & ROXIE_SLA_PRIORITY) - slaQueue.enqueue(packet, IBYTIdelay); - else if (header.activityId & ROXIE_HIGH_PRIORITY) - hiQueue.enqueue(packet, IBYTIdelay); - else - loQueue.enqueue(packet, IBYTIdelay); + switch (header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break; + case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break; + case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break; + default: bgQueue.enqueue(packet, IBYTIdelay); break; + } for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++) { if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull()) @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface } return min; } - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { for (unsigned queue = 0; queue <= maxSeen; queue++) { - queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue); + queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue); } } private: @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager } return ret; } - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { ForEachItemIn(idx, channels) { - channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue); + channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue); } } private: @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase StringBuffer s; DBGLOG("Read roxie packet: %s", header.toString(s).str()); } - if (header.activityId & ROXIE_SLA_PRIORITY) - processMessage(mb, header, slaQueue); - else if (header.activityId & ROXIE_HIGH_PRIORITY) - processMessage(mb, header, hiQueue); - else - processMessage(mb, header, loQueue); + switch (header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break; + case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break; + case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break; + default: processMessage(mb, header, bgQueue); break; + } } catch (IException *E) { @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } } #ifdef NEW_IBYTI - delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue); + delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue); #endif } return 0; @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase return; // No point sending the retry in localAgent mode } RoxieQueue *targetQueue; - if (header.activityId & ROXIE_SLA_PRIORITY) - targetQueue = &slaQueue; - else if (header.activityId & ROXIE_HIGH_PRIORITY) - targetQueue = &hiQueue; - else - targetQueue = &loQueue; - + switch (header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break; + case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break; + case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break; + default: targetQueue = &bgQueue; break; + } Owned serialized = packet->serialize(); if (header.channel) { diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index cb57bb3b26a..e99399e6f8d 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4557,8 +4557,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie // But this could still cause too many reply packets on the fastlane // (higher priority output Q), which may cause the activities on the // low priority output Q to not get service on time. + unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK; if ((colocalArg == 0) && // not a child query activity?? - (p->queryHeader().activityId & (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY)) && + ( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) && (p->queryHeader().overflowSequence == 0) && (p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0) p->queryHeader().retries |= ROXIE_FASTLANE; diff --git a/roxie/ccd/ccdsnmp.cpp b/roxie/ccd/ccdsnmp.cpp index 7d57f3781ce..9bdbd2d2ae8 100644 --- a/roxie/ccd/ccdsnmp.cpp +++ b/roxie/ccd/ccdsnmp.cpp @@ -33,6 +33,7 @@ RoxieQueryStats unknownQueryStats; RoxieQueryStats loQueryStats; RoxieQueryStats hiQueryStats; RoxieQueryStats slaQueryStats; +RoxieQueryStats bgQueryStats; RoxieQueryStats combinedQueryStats; #define addMetric(a) doAddMetric(a, #a) @@ -185,6 +186,7 @@ CRoxieMetricsManager::CRoxieMetricsManager() loQueryStats.addMetrics(this, "lo"); hiQueryStats.addMetrics(this, "hi"); slaQueryStats.addMetrics(this, "sla"); + bgQueryStats.addMetrics(this, "bg"); combinedQueryStats.addMetrics(this, "all"); addMetric(restarts); diff --git a/roxie/ccd/ccdsnmp.hpp b/roxie/ccd/ccdsnmp.hpp index 49a46db233f..ec9c4b8827f 100644 --- a/roxie/ccd/ccdsnmp.hpp +++ b/roxie/ccd/ccdsnmp.hpp @@ -82,6 +82,7 @@ extern RoxieQueryStats unknownQueryStats; extern RoxieQueryStats loQueryStats; extern RoxieQueryStats hiQueryStats; extern RoxieQueryStats slaQueryStats; +extern RoxieQueryStats bgQueryStats; extern RoxieQueryStats combinedQueryStats; interface IRoxieMetricsManager : extends IInterface