Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32964 Add a Roxie Background priority queue #19290

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_SLA_PRIORITY 0x40000000 // mask in activityId indicating it goes SLA priority queue
#define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
#define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
Expand Down
12 changes: 8 additions & 4 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,11 +1239,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
void noteQuery(bool failed, unsigned elapsedTime, unsigned priority)
{
switch(priority)
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1355,11 +1356,12 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
isBlind = isBlind || blindLogging;
logctx.setBlind(isBlind);
priority = queryFactory->queryOptions().priority;
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1524,11 +1526,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
virtual void noteQueryActive()
{
unsigned priority = getQueryPriority();
switch (priority)
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down Expand Up @@ -1677,11 +1680,12 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
}
else
{
switch(getQueryPriority())
switch((int)getQueryPriority())
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
default: unknownQueryStats.noteQuery(failed, elapsedTime); return; // Don't include unknown in the combined stats
}
combinedQueryStats.noteQuery(failed, elapsedTime);
Expand Down
13 changes: 11 additions & 2 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,17 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
if ((int)priority < 0)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
}
updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down
71 changes: 41 additions & 30 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
{
const IpAddress serverIP = serverId.getIpAddress();
ret.append("activityId=");
switch(activityId & ~ROXIE_PRIORITY_MASK)
switch (activityId & ~ROXIE_PRIORITY_MASK)
{
case 0: ret.append("IBYTI"); break;
case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
Expand All @@ -157,11 +157,12 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
switch(activityId & ROXIE_PRIORITY_MASK)
switch (activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
case ROXIE_BG_PRIORITY: ret.append("BG"); break;
default: ret.append("???"); break;
}
ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
Expand Down Expand Up @@ -1166,11 +1167,14 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers, const char *qname=nullptr)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
StringBuffer tname("RoxieWorkers");
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1904,12 +1908,13 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
RoxieQueue slaQueue;
RoxieQueue hiQueue;
RoxieQueue loQueue;
RoxieQueue bgQueue;
unsigned numWorkers;

public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers, "SLA"), hiQueue(headRegionSize, _numWorkers, "HIGH"), loQueue(headRegionSize, _numWorkers, "LOW"), bgQueue(headRegionSize, _numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1923,27 +1928,31 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
bgQueue.setHeadRegionSize(newSize);
}

virtual void start()
{
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
}

virtual void stop()
{
loQueue.stopAll();
hiQueue.stopAll();
slaQueue.stopAll();
bgQueue.stopAll();
}

virtual void join()
{
loQueue.join();
hiQueue.join();
slaQueue.join();
bgQueue.join();
}

IArrayOf<CallbackEntry> callbacks;
Expand Down Expand Up @@ -2254,7 +2263,7 @@ class DelayedPacketQueue
}

// Move any that we are done waiting for our buddy onto the active queue
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
assert(GetCurrentThreadId()==roxiePacketReaderThread);
DelayedPacketEntry *finger = head;
Expand All @@ -2270,12 +2279,13 @@ class DelayedPacketQueue
DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str());
}
unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp();
if (header.activityId & ROXIE_SLA_PRIORITY)
slaQueue.enqueue(packet, IBYTIdelay);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
hiQueue.enqueue(packet, IBYTIdelay);
else
loQueue.enqueue(packet, IBYTIdelay);
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_LOW_PRIORITY: loQueue.enqueue(packet, IBYTIdelay); break;
default: bgQueue.enqueue(packet, IBYTIdelay); break;
}
for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
{
if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
Expand Down Expand Up @@ -2354,11 +2364,11 @@ class DelayedPacketQueueChannel : public CInterface
}
return min;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
for (unsigned queue = 0; queue <= maxSeen; queue++)
{
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue);
queues[queue].checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2398,11 +2408,11 @@ class DelayedPacketQueueManager
}
return ret;
}
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue)
void checkExpired(unsigned now, RoxieQueue &slaQueue, RoxieQueue &hiQueue, RoxieQueue &loQueue, RoxieQueue &bgQueue)
{
ForEachItemIn(idx, channels)
{
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue);
channels.item(idx).checkExpired(now, slaQueue, hiQueue, loQueue, bgQueue);
}
}
private:
Expand Down Expand Up @@ -2894,12 +2904,13 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
StringBuffer s;
DBGLOG("Read roxie packet: %s", header.toString(s).str());
}
if (header.activityId & ROXIE_SLA_PRIORITY)
processMessage(mb, header, slaQueue);
else if (header.activityId & ROXIE_HIGH_PRIORITY)
processMessage(mb, header, hiQueue);
else
processMessage(mb, header, loQueue);
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break;
case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break;
case ROXIE_LOW_PRIORITY: processMessage(mb, header, loQueue); break;
default: processMessage(mb, header, bgQueue); break;
}
}
catch (IException *E)
{
Expand Down Expand Up @@ -2938,7 +2949,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
}
#ifdef NEW_IBYTI
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue, bgQueue);
#endif
}
return 0;
Expand Down Expand Up @@ -3708,13 +3719,13 @@ class RoxieLocalQueueManager : public RoxieReceiverBase
return; // No point sending the retry in localAgent mode
}
RoxieQueue *targetQueue;
if (header.activityId & ROXIE_SLA_PRIORITY)
targetQueue = &slaQueue;
else if (header.activityId & ROXIE_HIGH_PRIORITY)
targetQueue = &hiQueue;
else
targetQueue = &loQueue;

switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break;
case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break;
case ROXIE_LOW_PRIORITY: targetQueue = &loQueue; break;
default: targetQueue = &bgQueue; break;
}
Owned<ISerializedRoxieQueryPacket> serialized = packet->serialize();
if (header.channel)
{
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4557,8 +4557,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// But this could still cause too many reply packets on the fastlane
// (higher priority output Q), which may cause the activities on the
// low priority output Q to not get service on time.
unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
(p->queryHeader().activityId & (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY)) &&
( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only want to request fastlane for SLA/HIGH. This code might do that, but it took me several reads of it to be sure. Can it be coded more clearly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
pmask != 0 => its SLA, HIGH or BG
pmask != ROXIE_PRIORITY_MASK => its not BG

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Compare explicitly against ROXIE_LOW_PRIORITY rather than implicitly, and use a new constant for background.

(p->queryHeader().overflowSequence == 0) &&
(p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0)
p->queryHeader().retries |= ROXIE_FASTLANE;
Expand Down
2 changes: 2 additions & 0 deletions roxie/ccd/ccdsnmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RoxieQueryStats unknownQueryStats;
RoxieQueryStats loQueryStats;
RoxieQueryStats hiQueryStats;
RoxieQueryStats slaQueryStats;
RoxieQueryStats bgQueryStats;
RoxieQueryStats combinedQueryStats;

#define addMetric(a) doAddMetric(a, #a)
Expand Down Expand Up @@ -185,6 +186,7 @@ CRoxieMetricsManager::CRoxieMetricsManager()
loQueryStats.addMetrics(this, "lo");
hiQueryStats.addMetrics(this, "hi");
slaQueryStats.addMetrics(this, "sla");
bgQueryStats.addMetrics(this, "bg");
combinedQueryStats.addMetrics(this, "all");
addMetric(restarts);

Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdsnmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extern RoxieQueryStats unknownQueryStats;
extern RoxieQueryStats loQueryStats;
extern RoxieQueryStats hiQueryStats;
extern RoxieQueryStats slaQueryStats;
extern RoxieQueryStats bgQueryStats;
extern RoxieQueryStats combinedQueryStats;

interface IRoxieMetricsManager : extends IInterface
Expand Down
Loading