Skip to content

Commit

Permalink
HPCC-32964 Add a Roxie Background priority queue 3
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Dec 31, 2024
1 parent 51fe43c commit a61b416
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
1 change: 1 addition & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_HIGH_PRIORITY 0x80000000 // mask in activityId indicating it goes on the fast queue
#define ROXIE_LOW_PRIORITY 0x00000000 // mask in activityId indicating it goes on the slow queue (= default)
// background priority queue is when both ROXIE_SLA_PRIORITY and ROXIE_HIGH_PRIORITY are set
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities
Expand Down
12 changes: 6 additions & 6 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
{
const IpAddress serverIP = serverId.getIpAddress();
ret.append("activityId=");
switch(activityId & ~ROXIE_PRIORITY_MASK)
switch (activityId & ~ROXIE_PRIORITY_MASK)
{
case 0: ret.append("IBYTI"); break;
case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
Expand All @@ -157,12 +157,12 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
switch(activityId & ROXIE_PRIORITY_MASK)
switch (activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
case ROXIE_HIGH_PRIORITY: ret.append("HIGH"); break;
case ROXIE_LOW_PRIORITY: ret.append("LOW"); break;
case ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY: ret.append("BG"); break;
case ROXIE_BG_PRIORITY: ret.append("BG"); break;
default: ret.append("???"); break;
}
ret.appendf(" queryHash=%" I64F "x ch=%u seq=%d cont=%d server=", queryHash, channel, overflowSequence, continueSequence);
Expand Down Expand Up @@ -2279,7 +2279,7 @@ class DelayedPacketQueue
DBGLOG("No IBYTI received in time for delayed packet %s - enqueuing", header.toString(s).str());
}
unsigned __int64 IBYTIdelay = nsTick()-packet->queryEnqueuedTimeStamp();
switch(header.activityId & ROXIE_PRIORITY_MASK)
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: slaQueue.enqueue(packet, IBYTIdelay); break;
case ROXIE_HIGH_PRIORITY: hiQueue.enqueue(packet, IBYTIdelay); break;
Expand Down Expand Up @@ -2904,7 +2904,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
StringBuffer s;
DBGLOG("Read roxie packet: %s", header.toString(s).str());
}
switch(header.activityId & ROXIE_PRIORITY_MASK)
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: processMessage(mb, header, slaQueue); break;
case ROXIE_HIGH_PRIORITY: processMessage(mb, header, hiQueue); break;
Expand Down Expand Up @@ -3719,7 +3719,7 @@ class RoxieLocalQueueManager : public RoxieReceiverBase
return; // No point sending the retry in localAgent mode
}
RoxieQueue *targetQueue;
switch(header.activityId & ROXIE_PRIORITY_MASK)
switch (header.activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: targetQueue = &slaQueue; break;
case ROXIE_HIGH_PRIORITY: targetQueue = &hiQueue; break;
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4559,7 +4559,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
// low priority output Q to not get service on time.
unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK;
if ((colocalArg == 0) && // not a child query activity??
(pmask && (pmask != (ROXIE_SLA_PRIORITY + ROXIE_HIGH_PRIORITY))) &&
( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) &&
(p->queryHeader().overflowSequence == 0) &&
(p->queryHeader().continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)==0)
p->queryHeader().retries |= ROXIE_FASTLANE;
Expand Down

0 comments on commit a61b416

Please sign in to comment.