From 7e608d5d6359a9e9b78c61f60384fce40c04ae13 Mon Sep 17 00:00:00 2001 From: M Kelly Date: Mon, 11 Nov 2024 08:43:24 -0500 Subject: [PATCH] HPCC-32964 Add a Roxie Background priority queue Signed-off-by: M Kelly --- roxie/ccd/ccd.hpp | 5 +-- roxie/ccd/ccdlistener.cpp | 12 ++++--- roxie/ccd/ccdmain.cpp | 6 ++-- roxie/ccd/ccdquery.cpp | 12 +++++-- roxie/ccd/ccdqueue.cpp | 67 +++++++++++++++++++++++---------------- roxie/ccd/ccdserver.cpp | 3 +- roxie/ccd/ccdsnmp.cpp | 2 ++ roxie/ccd/ccdsnmp.hpp | 1 + roxie/ccd/ccdstate.cpp | 10 ++++++ 9 files changed, 79 insertions(+), 39 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..b818edc53d9 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -78,6 +78,7 @@ void setMulticastEndpoints(unsigned numChannels); #define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue #define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue #define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default) +// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY) #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities @@ -332,8 +333,8 @@ extern unsigned memoryStatsInterval; extern unsigned pingInterval; extern unsigned socketCheckInterval; extern memsize_t defaultMemoryLimit; -extern unsigned defaultTimeLimit[3]; -extern unsigned defaultWarnTimeLimit[3]; +extern unsigned defaultTimeLimit[4]; +extern unsigned defaultWarnTimeLimit[4]; extern unsigned defaultThorConnectTimeout; extern bool pretendAllOpt; extern ClientCertificate clientCert; diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 19c4bc81b91..3e71c773bc7 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker { void noteQuery(bool failed, unsigned elapsedTime, unsigned priority) { - switch(priority) + switch((int)priority) { case 0: loQueryStats.noteQuery(failed, elapsedTime); break; case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; + case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; } combinedQueryStats.noteQuery(failed, elapsedTime); } @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker isBlind = isBlind || blindLogging; logctx.setBlind(isBlind); priority = queryFactory->queryOptions().priority; - switch (priority) + switch ((int)priority) { case 0: loQueryStats.noteActive(); break; case 1: hiQueryStats.noteActive(); break; case 2: slaQueryStats.noteActive(); break; + case -1: bgQueryStats.noteActive(); break; } combinedQueryStats.noteActive(); Owned ctx = queryFactory->createContext(wu, logctx); @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte virtual void noteQueryActive() { unsigned priority = getQueryPriority(); - switch (priority) + switch ((int)priority) { case 0: loQueryStats.noteActive(); break; case 1: hiQueryStats.noteActive(); break; case 2: slaQueryStats.noteActive(); break; + case -1: bgQueryStats.noteActive(); break; } unknownQueryStats.noteComplete(); combinedQueryStats.noteActive(); @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte } else { - switch(getQueryPriority()) + switch((int)getQueryPriority()) { case 0: loQueryStats.noteQuery(failed, elapsedTime); break; case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; + case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats } combinedQueryStats.noteQuery(failed, elapsedTime); diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 192e58a5670..3d4262b6922 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -164,8 +164,8 @@ int backgroundCopyPrio = 0; unsigned memoryStatsInterval = 0; memsize_t defaultMemoryLimit; -unsigned defaultTimeLimit[3] = {0, 0, 0}; -unsigned defaultWarnTimeLimit[3] = {0, 5000, 5000}; +unsigned defaultTimeLimit[4] = {0, 0, 0, 0}; +unsigned defaultWarnTimeLimit[4] = {0, 5000, 5000, 10000}; unsigned defaultThorConnectTimeout; unsigned defaultParallelJoinPreload = 0; @@ -1169,9 +1169,11 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) defaultTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeLimit", 0); defaultTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeLimit", 0); defaultTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeLimit", 0); + defaultTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeLimit", 0); defaultWarnTimeLimit[0] = (unsigned) topology->getPropInt64("@defaultLowPriorityTimeWarning", 0); defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0); defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0); + defaultWarnTimeLimit[3] = (unsigned) topology->getPropInt64("@defaultBGPriorityTimeWarning", 0); defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60); continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index c983ebd0067..2ec6239e245 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -400,8 +400,16 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat updateFromWorkUnit(priority, wu, "priority"); if (stateInfo) updateFromContext(priority, stateInfo, "@priority"); - timeLimit = defaultTimeLimit[priority]; - warnTimeLimit = defaultWarnTimeLimit[priority]; + if ((int)priority < 0) + { + timeLimit = defaultTimeLimit[3]; + warnTimeLimit = defaultWarnTimeLimit[3]; + } + else + { + timeLimit = defaultTimeLimit[priority]; + warnTimeLimit = defaultWarnTimeLimit[priority]; + } updateFromWorkUnit(timeLimit, wu, "timeLimit"); updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit"); updateFromWorkUnitM(memoryLimit, wu, "memoryLimit"); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index a46fb407127..a79264cf54d 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -162,6 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break; case ROXIE_LOW_PRIORITY: ret.append("LOW"); break; + case ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY: ret.append("BG"); break; default: ret.append("???"); break; } ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence); @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory public: IMPLEMENT_IINTERFACE; - RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers) + RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr) { headRegionSize = _headRegionSize; numWorkers = _numWorkers; - workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers)); + StringBuffer tname("RoxieWorkers"); + if (qname && *qname) + tname.appendf(" (%s)", qname); + workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers)); started = 0; idle = 0; if (IBYTIbufferSize) @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface RoxieQueue slaQueue; RoxieQueue hiQueue; RoxieQueue loQueue; + RoxieQueue bgQueue; unsigned numWorkers; public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers) { } @@ -1923,6 +1928,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface slaQueue.setHeadRegionSize(newSize); hiQueue.setHeadRegionSize(newSize); loQueue.setHeadRegionSize(newSize); + bgQueue.setHeadRegionSize(newSize); } virtual void start() @@ -1930,6 +1936,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); + bgQueue.start(); } virtual void stop() @@ -1937,6 +1944,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.stopAll(); hiQueue.stopAll(); slaQueue.stopAll(); + bgQueue.stopAll(); } virtual void join() @@ -1944,6 +1952,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.join(); hiQueue.join(); slaQueue.join(); + bgQueue.join(); } IArrayOf callbacks; @@ -2254,7 +2263,7 @@ class DelayedPacketQueue } // Move any that we are done waiting for our buddy onto the active queue - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { assert(GetCurrentThreadId()==roxiePacketReaderThread); DelayedPacketEntry *finger = head; @@ -2270,12 +2279,13 @@ class DelayedPacketQueue DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str()); } unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp(); - if (header.activityId & ROXIE_SLA_PRIORITY) - slaQueue.enqueue(packet, IBYTIdelay); - else if (header.activityId & ROXIE_HIGH_PRIORITY) - hiQueue.enqueue(packet, IBYTIdelay); - else - loQueue.enqueue(packet, IBYTIdelay); + switch(header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break; + case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break; + case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break; + default: bgQueue.enqueue(packet, IBYTIdelay); break; + } for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++) { if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull()) @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface } return min; } - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { for (unsigned queue = 0; queue <= maxSeen; queue++) { - queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue); + queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue); } } private: @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager } return ret; } - void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue) + void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue) { ForEachItemIn(idx, channels) { - channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue); + channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue); } } private: @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase StringBuffer s; DBGLOG("Read roxie packet: %s", header.toString(s).str()); } - if (header.activityId & ROXIE_SLA_PRIORITY) - processMessage(mb, header, slaQueue); - else if (header.activityId & ROXIE_HIGH_PRIORITY) - processMessage(mb, header, hiQueue); - else - processMessage(mb, header, loQueue); + switch(header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break; + case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break; + case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break; + default: processMessage(mb, header, bgQueue); break; + } } catch (IException *E) { @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } } #ifdef NEW_IBYTI - delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue); + delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue); #endif } return 0; @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase return; // No point sending the retry in localAgent mode } RoxieQueue *targetQueue; - if (header.activityId & ROXIE_SLA_PRIORITY) - targetQueue = &slaQueue; - else if (header.activityId & ROXIE_HIGH_PRIORITY) - targetQueue = &hiQueue; - else - targetQueue = &loQueue; - + switch(header.activityId & ROXIE_PRIORITY_MASK) + { + case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break; + case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break; + case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break; + default: targetQueue = &bgQueue; break; + } Owned serialized = packet->serialize(); if (header.channel) { diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index cb57bb3b26a..b4e3cd3f051 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4557,8 +4557,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie // But this could still cause too many reply packets on the fastlane // (higher priority output Q), which may cause the activities on the // low priority output Q to not get service on time. + unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK; if ((colocalArg == 0) && // not a child query activity?? - (p->queryHeader().activityId & (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY)) && + (pmask && (pmask != ROXIE_PRIORITY_MASK)) && (p->queryHeader().overflowSequence == 0) && (p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0) p->queryHeader().retries |= ROXIE_FASTLANE; diff --git a/roxie/ccd/ccdsnmp.cpp b/roxie/ccd/ccdsnmp.cpp index 7d57f3781ce..9bdbd2d2ae8 100644 --- a/roxie/ccd/ccdsnmp.cpp +++ b/roxie/ccd/ccdsnmp.cpp @@ -33,6 +33,7 @@ RoxieQueryStats unknownQueryStats; RoxieQueryStats loQueryStats; RoxieQueryStats hiQueryStats; RoxieQueryStats slaQueryStats; +RoxieQueryStats bgQueryStats; RoxieQueryStats combinedQueryStats; #define addMetric(a) doAddMetric(a, #a) @@ -185,6 +186,7 @@ CRoxieMetricsManager::CRoxieMetricsManager() loQueryStats.addMetrics(this, "lo"); hiQueryStats.addMetrics(this, "hi"); slaQueryStats.addMetrics(this, "sla"); + bgQueryStats.addMetrics(this, "bg"); combinedQueryStats.addMetrics(this, "all"); addMetric(restarts); diff --git a/roxie/ccd/ccdsnmp.hpp b/roxie/ccd/ccdsnmp.hpp index 49a46db233f..ec9c4b8827f 100644 --- a/roxie/ccd/ccdsnmp.hpp +++ b/roxie/ccd/ccdsnmp.hpp @@ -82,6 +82,7 @@ extern RoxieQueryStats unknownQueryStats; extern RoxieQueryStats loQueryStats; extern RoxieQueryStats hiQueryStats; extern RoxieQueryStats slaQueryStats; +extern RoxieQueryStats bgQueryStats; extern RoxieQueryStats combinedQueryStats; interface IRoxieMetricsManager : extends IInterface diff --git a/roxie/ccd/ccdstate.cpp b/roxie/ccd/ccdstate.cpp index 1b239971d9c..2108347a486 100644 --- a/roxie/ccd/ccdstate.cpp +++ b/roxie/ccd/ccdstate.cpp @@ -2439,6 +2439,16 @@ class CRoxiePackageSetManager : implements IRoxieQueryPackageManagerSet, impleme defaultWarnTimeLimit[2] = control->getPropInt("@limit", 0); topology->setPropInt("@defaultSLAPriorityTimeWarning", defaultWarnTimeLimit[2]); } + else if (stricmp(queryName, "control:defaultBGPriorityTimeLimit")==0) + { + defaultTimeLimit[3] = control->getPropInt("@limit", 0); + topology->setPropInt("@defaultBGPriorityTimeLimit", defaultTimeLimit[3]); + } + else if (stricmp(queryName, "control:defaultBGPriorityTimeWarning")==0) + { + defaultWarnTimeLimit[3] = control->getPropInt("@limit", 0); + topology->setPropInt("@defaultBGPriorityTimeWarning", defaultWarnTimeLimit[3]); + } else if (stricmp(queryName, "control:deleteUnneededPhysicalFiles")==0) { UNIMPLEMENTED;