Skip to content

Commit

Permalink
Fixed Multi-thread issue in ConnectionMultiplexer.Subscription & Adde…
Browse files Browse the repository at this point in the history
…d unit test for it (#2763)
  • Loading branch information
Chuck-EP committed Aug 1, 2024
1 parent 1e40509 commit 173dfa7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/StackExchange.Redis/RedisSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ internal enum SubscriptionAction
internal sealed class Subscription
{
private Action<RedisChannel, RedisValue>? _handlers;
private readonly object _handlersLock = new object();
private ChannelMessageQueue? _queues;
private ServerEndPoint? CurrentServer;
public CommandFlags Flags { get; }
Expand Down Expand Up @@ -206,7 +207,10 @@ public void Add(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueue?
{
if (handler != null)
{
_handlers += handler;
lock(_handlersLock)
{
_handlers += handler;
}
}
if (queue != null)
{
Expand All @@ -218,7 +222,10 @@ public bool Remove(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueu
{
if (handler != null)
{
_handlers -= handler;
lock (_handlersLock)
{
_handlers -= handler;
}
}
if (queue != null)
{
Expand All @@ -236,7 +243,10 @@ public bool Remove(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueu

internal void MarkCompleted()
{
_handlers = null;
lock(_handlersLock)
{
_handlers = null;
}
ChannelMessageQueue.MarkAllCompleted(ref _queues);
}

Expand Down
46 changes: 46 additions & 0 deletions tests/StackExchange.Redis.Tests/Issues/Issue2763Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests.Issues
{
public class Issue2763Tests : TestBase
{
public Issue2763Tests(ITestOutputHelper output) : base(output) { }

[Fact]
public void Execute()
{
using var conn = Create();
var subscriber = conn.GetSubscriber();

void Handler(RedisChannel c, RedisValue v) { }

const int COUNT = 1000;
RedisChannel channel = RedisChannel.Literal("CHANNEL:TEST");

List<Action> subscribes = new List<Action>(COUNT);
for (int i = 0; i < COUNT; i++)
subscribes.Add(() => subscriber.Subscribe(channel, Handler));
Parallel.ForEach(subscribes, action => action());

Assert.Equal(COUNT, CountSubscriptionsForChannel(subscriber, channel));

List<Action> unsubscribes = new List<Action>(COUNT);
for (int i = 0; i < COUNT; i++)
unsubscribes.Add(() => subscriber.Unsubscribe(channel, Handler));
Parallel.ForEach(unsubscribes, action => action());

Assert.Equal(0, CountSubscriptionsForChannel(subscriber, channel));
}

private int CountSubscriptionsForChannel(ISubscriber subscriber, RedisChannel channel)
{
ConnectionMultiplexer connMultiplexer = (ConnectionMultiplexer)subscriber.Multiplexer;
connMultiplexer.GetSubscriberCounts(channel, out int handlers, out int queues);
return handlers;
}
}
}

0 comments on commit 173dfa7

Please sign in to comment.