Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed May 27, 2024
1 parent 48c0198 commit 6ebfe8f
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 16 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/node-js-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- 15672:15672
strategy:
matrix:
node-version: [14.x]
node-version: [14.x, 16.x, 18.x, 20.x]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand All @@ -30,7 +30,7 @@ jobs:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3-management-alpine
image: rabbitmq:3.13.2-management-alpine
ports:
- 5672:5672
- 15672:15672
Expand Down
3 changes: 2 additions & 1 deletion lib/amqp/SubscriberError.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const format = require('util').format;
const _ = require('lodash');
const async = require('async');
const setTimeoutUnref = require('../utils/setTimeoutUnref');
const { EMPTY_X_DEATH } = require('./XDeath');

module.exports = function SubscriptionRecovery(broker, vhost) {
this.handle = function (session, message, err, recoveryOptions, next) {
Expand Down Expand Up @@ -86,7 +87,7 @@ module.exports = function SubscriptionRecovery(broker, vhost) {

if (strategyConfig.immediateNack) {
const xDeathRecords = message.properties.headers['x-death'] || [];
const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue };
const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH;
_.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue], { immediateNack: true, xDeath });
}

Expand Down
21 changes: 14 additions & 7 deletions lib/amqp/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const SubscriberSession = require('./SubscriberSession');
const SubscriberError = require('./SubscriberError');
const backoff = require('../backoff');
const setTimeoutUnref = require('../utils/setTimeoutUnref');
const { EMPTY_X_DEATH } = require('./XDeath');

module.exports = {
create(broker, vhost, counter, config, next) {
Expand Down Expand Up @@ -214,16 +215,22 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
function immediateNack(message) {
const originalQueue = message.properties.headers.rascal.originalQueue;
const xDeathRecords = message.properties.headers['x-death'] || [];
const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue };
const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], { count: 0, queue: originalQueue });
const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH;
const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], EMPTY_X_DEATH);
const hasImmediateNackHeader = _.has(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
if (!hasImmediateNackHeader) return false;
debug('Message %s has been marked for immediate nack. Previous xDeath is %o. Current xDeath is %o.', message.properties.messageId, previousXDeath, currentXDeath);
if (currentXDeath.count === previousXDeath.count) return true;
debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']);
return false;
// See https://github.com/rabbitmq/rabbitmq-server/issues/11331
// RabbitMQ v3.13 stopped updating the xDeath record's count property.
// RabbitMQ v3.12 does not update the xDeath record's time property.
// Therefore having test them both
if (currentXDeath.count > previousXDeath.count || currentXDeath.time.value > previousXDeath.time.value) {
debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']);
_.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']);
return false;
}
return true;
}

function getAckOrNack(session, message) {
Expand Down
5 changes: 5 additions & 0 deletions lib/amqp/XDeath.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const EMPTY_X_DEATH = { count: 0, time: { value: 0 } };

module.exports = {
EMPTY_X_DEATH,
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"lint:fix": "eslint --fix .",
"lint-staged": "lint-staged",
"coverage": "nyc --report html --reporter lcov --reporter text-summary zUnit",
"docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management-alpine",
"docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12.9-management-alpine",
"prepare": "husky install"
},
"lint-staged": {
Expand Down
6 changes: 3 additions & 3 deletions test/subscriptions.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ describe('Subscriptions', () => {
count++;
if (count === 1) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
ackOrNack(new Error(`Test Error ${count}`), {
strategy: 'republish',
immediateNack: true,
});
Expand Down Expand Up @@ -1739,7 +1739,7 @@ describe('Subscriptions', () => {
count++;
if (count <= 2) {
assert.ok(message);
ackOrNack(new Error(`immediate nack: ${count}`), {
ackOrNack(new Error(`Test Error ${count}`), {
strategy: 'republish',
immediateNack: true,
});
Expand All @@ -1762,7 +1762,7 @@ describe('Subscriptions', () => {
});
},
);
}, { exclusive: true, timeout: 10000 });
});

it('should forward messages to publication when requested', (test, done) => {
createBroker(
Expand Down
4 changes: 2 additions & 2 deletions test/subscriptionsAsPromised.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ describe('Subscriptions As Promised', () => {
subscription.on('message', (message, content, ackOrNack) => {
assert.strictEqual(++count, 1);
assert.ok(message);
ackOrNack(new Error('immediate nack'), { strategy: 'republish', immediateNack: true });
ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true });
});
});

Expand Down Expand Up @@ -1167,7 +1167,7 @@ describe('Subscriptions As Promised', () => {
count++;
if (count === 1) {
assert.ok(message);
ackOrNack(new Error('immediate nack'), {
ackOrNack(new Error(`Test Error ${count}`), {
strategy: 'republish',
immediateNack: true,
});
Expand Down

0 comments on commit 6ebfe8f

Please sign in to comment.