diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index b1d002d53..cb53897b1 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -7,7 +7,9 @@ Current package versions: | [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) | ## Unreleased -No pending unreleased changes. + +- Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822)) + ## 2.8.22 diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 207c03326..91b22aa46 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2649,6 +2649,21 @@ IEnumerable SortedSetScan( /// StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); + /// + /// View information about each pending message. + /// + /// The key of the stream. + /// The name of the consumer group. + /// The maximum number of pending messages to return. + /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. + /// The minimum ID from which to read the stream of pending messages. Pass null to read from the beginning of the stream. + /// The maximum ID to read to within the stream of pending messages. Pass null to read to the end of the stream. + /// The flags to use for this operation. + /// An instance of for each pending message. + /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// + StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags); + /// /// View information about each pending message. /// @@ -2658,11 +2673,12 @@ IEnumerable SortedSetScan( /// The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers. /// The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream. /// The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream. + /// The minimum idle time threshold for pending messages to be claimed. /// The flags to use for this operation. /// An instance of for each pending message. - /// Equivalent of calling XPENDING key group start-id end-id count consumer-name. + /// Equivalent of calling XPENDING key group IDLE min-idle-time start-id end-id count consumer-name. /// - StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); /// /// Read a stream using the given range of IDs. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 9852c131c..e6458d957 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -646,7 +646,10 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None); /// - Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None); + Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags); + + /// + Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None); /// Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index b97bba73b..012b70474 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -615,9 +615,12 @@ public Task StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN public Task StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPendingAsync(ToInner(key), groupName, flags); - public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) => Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); + public Task StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) => Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 75d93d0f9..7be727ef5 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -597,9 +597,12 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) => Inner.StreamPending(ToInner(key), groupName, flags); - public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) => Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags); + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags); + public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) => Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index a24333c8e..629376b85 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -734,7 +734,8 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo -StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]! +StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamPendingMessageInfo[]! +StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]! StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]! StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]! @@ -971,7 +972,8 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 7468bdb64..56a7d5e12 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2803,7 +2803,10 @@ public Task StreamPendingAsync(RedisKey key, RedisValue group return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo); } - public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + StreamPendingMessages(key, groupName, count, consumerName, minId, maxId, null, flags); + + public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage( key, @@ -2812,12 +2815,16 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue maxId, count, consumerName, + minIdleTimeInMs, flags); return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty()); } - public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) => + StreamPendingMessagesAsync(key, groupName, count, consumerName, minId, maxId, null, flags); + + public Task StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) { var msg = GetStreamPendingMessagesMessage( key, @@ -2826,6 +2833,7 @@ public Task StreamPendingMessagesAsync(RedisKey key, maxId, count, consumerName, + minIdleTimeInMs, flags); return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty()); @@ -4300,9 +4308,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro /// Gets a message for . /// /// - private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags) + private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags) { - // > XPENDING mystream mygroup - + 10 [consumer name] + // > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name] // 1) 1) 1526569498055 - 0 // 2) "Bob" // 3) (integer)74170458 @@ -4316,16 +4324,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); } - var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5]; + var valuesLength = 4; + if (consumerName != RedisValue.Null) + { + valuesLength++; + } - values[0] = groupName; - values[1] = minId ?? StreamConstants.ReadMinValue; - values[2] = maxId ?? StreamConstants.ReadMaxValue; - values[3] = count; + if (minIdleTimeInMs is not null) + { + valuesLength += 2; + } + var values = new RedisValue[valuesLength]; + + var offset = 0; + + values[offset++] = groupName; + if (minIdleTimeInMs is not null) + { + values[offset++] = "IDLE"; + values[offset++] = minIdleTimeInMs; + } + values[offset++] = minId ?? StreamConstants.ReadMinValue; + values[offset++] = maxId ?? StreamConstants.ReadMaxValue; + values[offset++] = count; if (consumerName != RedisValue.Null) { - values[4] = consumerName; + values[offset++] = consumerName; } return Message.Create( diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index f467aca24..bc18f71ac 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1154,8 +1154,8 @@ public void StreamPendingInfoGet() [Fact] public void StreamPendingMessageInfoGet() { - prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); - mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); + prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); + mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index aebc8fc6d..e3facde07 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -1070,8 +1070,8 @@ public async Task StreamPendingInfoGetAsync() [Fact] public async Task StreamPendingMessageInfoGetAsync() { - await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); - await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); + await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); + await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 0ea744848..628b8c5c2 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -1098,6 +1098,41 @@ public async Task StreamConsumerGroupViewPendingMessageInfo() Assert.Equal(id1, pendingMessageInfoList[0].MessageId); } + [Fact] + public async Task StreamConsumerGroupViewPendingMessageWithMinIdle() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + const string groupName = "test_group", + consumer1 = "test_consumer_1"; + const int minIdleTimeInMs = 100; + + var id1 = db.StreamAdd(key, "field1", "value1"); + + db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); + + // Read a single message into the first consumer. + db.StreamReadGroup(key, groupName, consumer1, count: 1); + + var preDelayPendingMessages = + db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); + + await Task.Delay(minIdleTimeInMs * 2).ForAwait(); + + var postDelayPendingMessages = + db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs); + + Assert.NotNull(preDelayPendingMessages); + Assert.Empty(preDelayPendingMessages); + Assert.NotNull(postDelayPendingMessages); + Assert.Single(postDelayPendingMessages); + Assert.Equal(1, postDelayPendingMessages[0].DeliveryCount); + Assert.True((int)postDelayPendingMessages[0].IdleTimeInMilliseconds > minIdleTimeInMs); + Assert.Equal(id1, postDelayPendingMessages[0].MessageId); + } + [Fact] public void StreamConsumerGroupViewPendingMessageInfoForConsumer() {