Skip to content

Commit

Permalink
fix(sqs): Support both standard and FIFO SQS queues (#490)
Browse files Browse the repository at this point in the history
Currently, sending an SQS message to a standard (non-FIFO) queue will
fail if the `messageGroupId` param is provided to the `sendMessage`
function. The function should only include this param when it is sending
a message to a FIFO queue.

## Related Issues

- #489
  • Loading branch information
blefebvre authored Dec 13, 2024
1 parent 977ea90 commit 4a975b0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions packages/spacecat-shared-utils/src/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ class SQS {
this.log = log;
}

/**
* Check if the queue is a FIFO queue by examining its URL.
* @param {string} queueUrl - the URL of the SQS queue
* @returns {boolean} true if the queue is a FIFO queue, false otherwise
*/
static #isFifoQueue(queueUrl) {
return hasText(queueUrl) && queueUrl.toLowerCase().endsWith('.fifo');
}

/**
* Send a message to an SQS queue. For FIFO queues, messageGroupId is required.
* @param {string} queueUrl - The URL of the SQS queue.
Expand All @@ -43,8 +52,8 @@ class SQS {
QueueUrl: queueUrl,
};

if (hasText(messageGroupId)) {
// MessageGroupId is required for FIFO queues
// Only include MessageGroupId if the queue is a FIFO queue
if (SQS.#isFifoQueue(queueUrl) && hasText(messageGroupId)) {
params.MessageGroupId = messageGroupId;
}

Expand Down
36 changes: 34 additions & 2 deletions packages/spacecat-shared-utils/test/sqs.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ describe('SQS', () => {
]);
});

it('should include a MessageGroupId when provided', async () => {
it('should include a MessageGroupId when the queue is a FIFO queue', async () => {
const action = wrap(async (req, ctx) => {
await ctx.sqs.sendMessage('queue-url', { key: 'value' }, 'job-id');
await ctx.sqs.sendMessage('https://sqs.us-east-1.amazonaws.com/123456789012/fifo-queue.fifo', { key: 'value' }, 'job-id');
}).with(sqsWrapper);

await action({}, context);
Expand All @@ -204,5 +204,37 @@ describe('SQS', () => {
]);
expect(firstSendArg.input.MessageGroupId).to.equal('job-id');
});

it('should not include a MessageGroupId when the queue is standard queue', async () => {
const action = wrap(async (req, ctx) => {
// Note: no .fifo suffix
await ctx.sqs.sendMessage('https://sqs.us-east-1.amazonaws.com/123456789012/standard-queue', { key: 'value' }, 'job-id');
}).with(sqsWrapper);

await action({}, context);

const firstSendArg = sendStub.getCall(0).args[0];
expect(Object.keys(firstSendArg.input)).to.deep.equal([
'MessageBody',
'QueueUrl',
]);
expect(firstSendArg.input.MessageGroupId).to.be.undefined;
});

it('should not include a MessageGroupId when the queue URL is undefined', async () => {
const action = wrap(async (req, ctx) => {
// Edge case: no queue URL
await ctx.sqs.sendMessage(undefined, { key: 'value' }, 'job-id');
}).with(sqsWrapper);

await action({}, context);

const firstSendArg = sendStub.getCall(0).args[0];
expect(Object.keys(firstSendArg.input)).to.deep.equal([
'MessageBody',
'QueueUrl',
]);
expect(firstSendArg.input.MessageGroupId).to.be.undefined;
});
});
});

0 comments on commit 4a975b0

Please sign in to comment.