Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for XPENDING IDLE parameter #2822

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2658,11 +2658,12 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add an overload here, i.e. a second method with different parameters; otherwise, this is a hard binary break - we try very hard not to do that. If the compiler complains about two methods with optional parameters, we can work around that

(yes: technically adding methods to the interface is also problematic, but: it is problematic in different ways, and in reality we don't expect custom implementations of the IDatabase etc APIs)


/// <summary>
/// Read a stream using the given range of IDs.
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
/// <inheritdoc cref="IDatabase.StreamPending(RedisKey, RedisValue, CommandFlags)"/>
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, long?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamRange(RedisKey, RedisValue?, RedisValue?, int?, Order, CommandFlags)"/>
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingAsync(ToInner(key), groupName, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPending(ToInner(key), groupName, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key,
StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo
StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
Expand Down Expand Up @@ -971,7 +971,7 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi
StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamInfo>!
StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingInfo>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
Expand Down
39 changes: 29 additions & 10 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2803,7 +2803,7 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
}

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2812,12 +2812,13 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
}

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2826,6 +2827,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
Expand Down Expand Up @@ -4300,9 +4302,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro
/// Gets a message for <see href="https://redis.io/commands/xpending/"/>.
/// </summary>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags)
{
// > XPENDING mystream mygroup - + 10 [consumer name]
// > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name]
// 1) 1) 1526569498055 - 0
// 2) "Bob"
// 3) (integer)74170458
Expand All @@ -4316,16 +4318,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}

var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5];
var valuesLength = 4;
if (consumerName != RedisValue.Null)
{
valuesLength++;
}

values[0] = groupName;
values[1] = minId ?? StreamConstants.ReadMinValue;
values[2] = maxId ?? StreamConstants.ReadMaxValue;
values[3] = count;
if (minIdleTimeInMs is not null)
{
valuesLength += 2;
}
var values = new RedisValue[valuesLength];

var offset = 0;

values[offset++] = groupName;
if (minIdleTimeInMs is not null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{
values[offset++] = "IDLE";
values[offset++] = minIdleTimeInMs;
}
values[offset++] = minId ?? StreamConstants.ReadMinValue;
values[offset++] = maxId ?? StreamConstants.ReadMaxValue;
values[offset++] = count;

if (consumerName != RedisValue.Null)
{
values[4] = consumerName;
values[offset++] = consumerName;
}

return Message.Create(
Expand Down
4 changes: 2 additions & 2 deletions tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,8 +1154,8 @@ public void StreamPendingInfoGet()
[Fact]
public void StreamPendingMessageInfoGet()
{
prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
Copy link
Collaborator

@mgravell mgravell Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this 1000 is the new arg, we probably need a version-check here, to not break test setups

Copy link
Author

@david-brink-talogy david-brink-talogy Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following the concern here and likely showing my lack of knowledge about your setup. 1000 is the new arg, but this seems to be mocked and I'm not following how that could break. What is the "test setup" you're referencing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, if it is mocked: fine - ignore me

mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
}

[Fact]
Expand Down
4 changes: 2 additions & 2 deletions tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,8 @@ public async Task StreamPendingInfoGetAsync()
[Fact]
public async Task StreamPendingMessageInfoGetAsync()
{
await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
}

[Fact]
Expand Down
35 changes: 35 additions & 0 deletions tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,41 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
Assert.Equal(id1, pendingMessageInfoList[0].MessageId);
}

[Fact]
public async Task StreamConsumerGroupViewPendingMessageWithMinIdle()
{
await using var conn = Create(require: RedisFeatures.v6_2_0);

var db = conn.GetDatabase();
var key = Me();
const string groupName = "test_group",
consumer1 = "test_consumer_1";
const int minIdleTimeInMs = 100;

var id1 = db.StreamAdd(key, "field1", "value1");

db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);

// Read a single message into the first consumer.
db.StreamReadGroup(key, groupName, consumer1, count: 1);

var preDelayPendingMessages =
db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs);

await Task.Delay(minIdleTimeInMs * 2).ForAwait();

var postDelayPendingMessages =
db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs);

Assert.NotNull(preDelayPendingMessages);
Assert.Empty(preDelayPendingMessages);
Assert.NotNull(postDelayPendingMessages);
Assert.Single(postDelayPendingMessages);
Assert.Equal(1, postDelayPendingMessages[0].DeliveryCount);
Assert.True((int)postDelayPendingMessages[0].IdleTimeInMilliseconds > minIdleTimeInMs);
Assert.Equal(id1, postDelayPendingMessages[0].MessageId);
}

[Fact]
public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
{
Expand Down
Loading