From 97894e529c273ff62bb080752f437e85faebd944 Mon Sep 17 00:00:00 2001 From: david-brink-talogy Date: Thu, 5 Dec 2024 08:44:57 -0500 Subject: [PATCH 1/3] Adds support for XPENDING IDLE parameter fixes #2432 --- .../Interfaces/IDatabase.cs | 3 +- .../Interfaces/IDatabaseAsync.cs | 4 +- .../KeyspaceIsolation/KeyPrefixed.cs | 4 +- .../KeyspaceIsolation/KeyPrefixedDatabase.cs | 4 +- .../PublicAPI/PublicAPI.Shipped.txt | 4 +- src/StackExchange.Redis/RedisDatabase.cs | 39 ++++++++++++++----- .../KeyPrefixedDatabaseTests.cs | 4 +- .../KeyPrefixedTests.cs | 4 +- .../StackExchange.Redis.Tests/StreamTests.cs | 35 +++++++++++++++++ 9 files changed, 78 insertions(+), 23 deletions(-) diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 207c03326..beab61b69 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2658,11 +2658,12 @@ IEnumerable SortedSetScan( /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. /// The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream. /// The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream. + /// The minimum idle time threshold for pending messages to be claimed. /// The flags to use for this operation. /// An instance of for each pending message. /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. /// - StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); /// /// Read a stream using the given range of IDs. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 9852c131c..666be5c64 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -645,8 +645,8 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); - /// - Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + /// + Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); /// Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index b97bba73b..7fd309c13 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -615,8 +615,8 @@ public Task StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN public Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPendingAsync(ToInner(key), groupName, flags); - public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => - Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); public Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) => Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 75d93d0f9..2529043f9 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -597,8 +597,8 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPending(ToInner(key), groupName, flags); - public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => - Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) => Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index a24333c8e..eb9ae302b 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -734,7 +734,7 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo -StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]! +StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]! StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! @@ -971,7 +971,7 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 7468bdb64..daf1b14fe 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2803,7 +2803,7 @@ public Task StreamPendingAsync(RedisKey key, RedisValue group return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo); } - public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage( key, @@ -2812,12 +2812,13 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue maxId, count, consumerName, + minIdleTimeInMs, flags); return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty()); } - public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage( key, @@ -2826,6 +2827,7 @@ public Task StreamPendingMessagesAsync(RedisKey key, maxId, count, consumerName, + minIdleTimeInMs, flags); return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty()); @@ -4300,9 +4302,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro /// Gets a message for . /// /// - private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags) + private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags) { - // > XPENDING mystream mygroup - + 10 [consumer name] + // > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name] // 1) 1) 1526569498055 - 0 // 2) "Bob" // 3) (integer)74170458 @@ -4316,16 +4318,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); } - var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5]; + var valuesLength = 4; + if (consumerName != RedisValue.Null) + { + valuesLength++; + } - values[0] = groupName; - values[1] = minId ?? StreamConstants.ReadMinValue; - values[2] = maxId ?? StreamConstants.ReadMaxValue; - values[3] = count; + if (minIdleTimeInMs is not null) + { + valuesLength += 2; + } + var values = new RedisValue[valuesLength]; + + var offset = 0; + + values[offset++] = groupName; + if (minIdleTimeInMs is not null) + { + values[offset++] = "IDLE"; + values[offset++] = minIdleTimeInMs; + } + values[offset++] = minId ?? StreamConstants.ReadMinValue; + values[offset++] = maxId ?? StreamConstants.ReadMaxValue; + values[offset++] = count; if (consumerName != RedisValue.Null) { - values[4] = consumerName; + values[offset++] = consumerName; } return Message.Create( diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index f467aca24..bc18f71ac 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1154,8 +1154,8 @@ public void StreamPendingInfoGet() [Fact] public void StreamPendingMessageInfoGet() { - prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); - mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); + prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); + mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index aebc8fc6d..e3facde07 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -1070,8 +1070,8 @@ public async Task StreamPendingInfoGetAsync() [Fact] public async Task StreamPendingMessageInfoGetAsync() { - await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); - await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); + await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); + await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 0ea744848..26fe7bace 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -1098,6 +1098,41 @@ public async Task StreamConsumerGroupViewPendingMessageInfo() Assert.Equal(id1, pendingMessageInfoList[0].MessageId); } + [Fact] + public async Task StreamConsumerGroupViewPendingMessageWithMinIdle() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer1 = "test_consumer_1"; + const int minIdleTimeInMs = 100; + + var id1 = db.StreamAdd(key, "field1", "value1"); + + db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); + + // Read a single message into the first consumer. + db.StreamReadGroup(key, groupName, consumer1, count: 1); + + var preDelayPendingMessages = + db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); + + await Task.Delay(minIdleTimeInMs).ForAwait(); + + var postDelayPendingMessages = + db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); + + Assert.NotNull(preDelayPendingMessages); + Assert.Empty(preDelayPendingMessages); + Assert.NotNull(postDelayPendingMessages); + Assert.Single(postDelayPendingMessages); + Assert.Equal(1, postDelayPendingMessages[0].DeliveryCount); + Assert.True((int)postDelayPendingMessages[0].IdleTimeInMilliseconds > minIdleTimeInMs); + Assert.Equal(id1, postDelayPendingMessages[0].MessageId); + } + [Fact] public void StreamConsumerGroupViewPendingMessageInfoForConsumer() { From d16473e9759b62bc4c93f348983535ef5d1f3042 Mon Sep 17 00:00:00 2001 From: david-brink-talogy Date: Thu, 5 Dec 2024 09:05:16 -0500 Subject: [PATCH 2/3] Increase delay to ensure sufficient time for idle --- tests/StackExchange.Redis.Tests/StreamTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 26fe7bace..628b8c5c2 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -1119,7 +1119,7 @@ public async Task StreamConsumerGroupViewPendingMessageWithMinIdle() var preDelayPendingMessages = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); - await Task.Delay(minIdleTimeInMs).ForAwait(); + await Task.Delay(minIdleTimeInMs * 2).ForAwait(); var postDelayPendingMessages = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); From c905d7f21cae2f8261d40ed26d08aa67e111e70a Mon Sep 17 00:00:00 2001 From: david-brink-talogy Date: Thu, 5 Dec 2024 11:38:40 -0500 Subject: [PATCH 3/3] PR feedback - Add release notes - Add overload to StreamPendingMessages/StreamPendingMessagesAsync to preserve back compat --- docs/ReleaseNotes.md | 4 +++- src/StackExchange.Redis/Interfaces/IDatabase.cs | 17 ++++++++++++++++- .../Interfaces/IDatabaseAsync.cs | 3 +++ .../KeyspaceIsolation/KeyPrefixed.cs | 3 +++ .../KeyspaceIsolation/KeyPrefixedDatabase.cs | 3 +++ .../PublicAPI/PublicAPI.Shipped.txt | 2 ++ src/StackExchange.Redis/RedisDatabase.cs | 6 ++++++ 7 files changed, 36 insertions(+), 2 deletions(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index b1d002d53..cb53897b1 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -7,7 +7,9 @@ Current package versions: | [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) | ## Unreleased -No pending unreleased changes. + +- Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822)) + ## 2.8.22 diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index beab61b69..91b22aa46 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2649,6 +2649,21 @@ IEnumerable SortedSetScan( /// StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + /// + /// View information about each pending message. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The maximum number of pending messages to return. + /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. + /// The minimum ID from which to read the stream of pending messages. Pass null to read from the beginning of the stream. + /// The maximum ID to read to within the stream of pending messages. Pass null to read to the end of the stream. + /// The flags to use for this operation. + /// An instance of for each pending message. + /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// + StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags); + /// /// View information about each pending message. /// @@ -2661,7 +2676,7 @@ IEnumerable SortedSetScan( /// The minimum idle time threshold for pending messages to be claimed. /// The flags to use for this operation. /// An instance of for each pending message. - /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// Equivalent of calling XPENDING key group IDLE min-idle-time start-id end-id count consumer-name. /// StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 666be5c64..e6458d957 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -645,6 +645,9 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + /// + Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags); + /// Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index 7fd309c13..012b70474 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -615,6 +615,9 @@ public Task StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN public Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPendingAsync(ToInner(key), groupName, flags); + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) => + Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 2529043f9..7be727ef5 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -597,6 +597,9 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPending(ToInner(key), groupName, flags); + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) => + Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index eb9ae302b..629376b85 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -734,6 +734,7 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo +StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamPendingMessageInfo[]! StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]! StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! @@ -971,6 +972,7 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index daf1b14fe..56a7d5e12 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2803,6 +2803,9 @@ public Task StreamPendingAsync(RedisKey key, RedisValue group return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo); } + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + StreamPendingMessages(key, groupName, count, consumerName, minId, maxId, null, flags); + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage( @@ -2818,6 +2821,9 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty()); } + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + StreamPendingMessagesAsync(key, groupName, count, consumerName, minId, maxId, null, flags); + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage(