From 1de8dac8ee593108ea5562683cc797e3ca0b2973 Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Mon, 24 Jun 2024 16:17:38 +0100 Subject: [PATCH] Implement high integrity mode for commands (#2741) * initial implementation of #2706 * build: net8 compat * Docs: add HighIntegrity initial docs * PR fixups * Options fixes * Update tests/StackExchange.Redis.Tests/TestBase.cs * add tests that result boxes / continuations work correctly for high-integrity-mode * - switch to counter rather than entropy - add transaction work to basic tests, to ensure handled * naming is hard * - add explicit connection failure type - burn the connection on failure - add initial metrics * benchmark impact of high performance mode * be more flexible in HeartbeatConsistencyCheckPingsAsync * add config tests --------- Co-authored-by: Nick Craver Co-authored-by: Nick Craver --- docs/Configuration.md | 5 + .../Configuration/DefaultOptionsProvider.cs | 11 ++ .../ConfigurationOptions.cs | 28 +++- .../Enums/ConnectionFailureType.cs | 6 +- src/StackExchange.Redis/Message.cs | 32 +++++ src/StackExchange.Redis/PhysicalBridge.cs | 42 ++++++ src/StackExchange.Redis/PhysicalConnection.cs | 122 +++++++++++++++--- .../PublicAPI/PublicAPI.Shipped.txt | 4 + .../PublicAPI/PublicAPI.Unshipped.txt | 2 +- tests/BasicTest/BasicTest.csproj | 2 +- .../BasicTestBaseline.csproj | 2 +- tests/ConsoleTest/Program.cs | 89 +++++++++++-- .../ConsoleTestBaseline.csproj | 1 + .../StackExchange.Redis.Tests/BasicOpTests.cs | 48 ++++++- .../StackExchange.Redis.Tests/ConfigTests.cs | 23 +++- .../ConnectCustomConfigTests.cs | 2 +- .../ResultBoxTests.cs | 95 ++++++++++++++ tests/StackExchange.Redis.Tests/TestBase.cs | 17 ++- 18 files changed, 485 insertions(+), 46 deletions(-) create mode 100644 tests/StackExchange.Redis.Tests/ResultBoxTests.cs diff --git a/docs/Configuration.md b/docs/Configuration.md index 323d89984..96e4b5bae 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -99,6 +99,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a | tunnel={string} | `Tunnel` | `null` | Tunnel for connections (use `http:{proxy url}` for "connect"-based proxy server) | | setlib={bool} | `SetClientLibrary` | `true` | Whether to attempt to use `CLIENT SETINFO` to set the library name/version on the connection | | protocol={string} | `Protocol` | `null` | Redis protocol to use; see section below | +| highIntegrity={bool} | `HighIntegrity` | `false` | High integrity (incurs overhead) sequence checking on every command; see section below | Additional code-only options: - LoggerFactory (`ILoggerFactory`) - Default: `null` @@ -115,6 +116,10 @@ Additional code-only options: - The thread pool to use for scheduling work to and from the socket connected to Redis, one of... - `SocketManager.Shared`: Use a shared dedicated thread pool for _all_ multiplexers (defaults to 10 threads) - best balance for most scenarios. - `SocketManager.ThreadPool`: Use the build-in .NET thread pool for scheduling. This can perform better for very small numbers of cores or with large apps on large machines that need to use more than 10 threads (total, across all multiplexers) under load. **Important**: this option isn't the default because it's subject to thread pool growth/starvation and if for example synchronous calls are waiting on a redis command to come back to unblock other threads, stalls/hangs can result. Use with caution, especially if you have sync-over-async work in play. +- HighIntegrity - Default: `false` + - This enables sending a sequence check command after _every single command_ sent to Redis. This is an opt-in option that incurs overhead to add this integrity check which isn't in the Redis protocol (RESP2/3) itself. The impact on this for a given workload depends on the number of commands, size of payloads, etc. as to how proportionately impactful it will be - you should test with your workloads to assess this. + - This is especially relevant if your primary use case is all strings (e.g. key/value caching) where the protocol would otherwise not error. + - Intended for cases where network drops (e.g. bytes from the Redis stream, not packet loss) are suspected and integrity of responses is critical. - HeartbeatConsistencyChecks - Default: `false` - Allows _always_ sending keepalive checks even if a connection isn't idle. This trades extra commands (per `HeartbeatInterval` - default 1 second) to check the network stream for consistency. If any data was lost, the result won't be as expected and the connection will be terminated ASAP. This is a check to react to any data loss at the network layer as soon as possible. - HeartbeatInterval - Default: `1000ms` diff --git a/src/StackExchange.Redis/Configuration/DefaultOptionsProvider.cs b/src/StackExchange.Redis/Configuration/DefaultOptionsProvider.cs index 67bbc72ac..359b5f5f6 100644 --- a/src/StackExchange.Redis/Configuration/DefaultOptionsProvider.cs +++ b/src/StackExchange.Redis/Configuration/DefaultOptionsProvider.cs @@ -104,6 +104,17 @@ public static DefaultOptionsProvider GetProvider(EndPoint endpoint) /// public virtual bool CheckCertificateRevocation => true; + /// + /// A Boolean value that specifies whether to use per-command validation of strict protocol validity. + /// This sends an additional command after EVERY command which incurs measurable overhead. + /// + /// + /// The regular RESP protocol does not include correlation identifiers between requests and responses; in exceptional + /// scenarios, protocol desynchronization can occur, which may not be noticed immediately; this option adds additional data + /// to ensure that this cannot occur, at the cost of some (small) additional bandwidth usage. + /// + public virtual bool HighIntegrity => false; + /// /// The number of times to repeat the initial connect cycle if no servers respond promptly. /// diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index 3d0a3aae8..4f3ff1287 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -110,7 +110,8 @@ internal const string CheckCertificateRevocation = "checkCertificateRevocation", Tunnel = "tunnel", SetClientLibrary = "setlib", - Protocol = "protocol"; + Protocol = "protocol", + HighIntegrity = "highIntegrity"; private static readonly Dictionary normalizedOptions = new[] { @@ -141,6 +142,7 @@ internal const string WriteBuffer, CheckCertificateRevocation, Protocol, + HighIntegrity, }.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase); public static string TryNormalize(string value) @@ -156,7 +158,7 @@ public static string TryNormalize(string value) private DefaultOptionsProvider? defaultOptions; private bool? allowAdmin, abortOnConnectFail, resolveDns, ssl, checkCertificateRevocation, heartbeatConsistencyChecks, - includeDetailInExceptions, includePerformanceCountersInExceptions, setClientLibrary; + includeDetailInExceptions, includePerformanceCountersInExceptions, setClientLibrary, highIntegrity; private string? tieBreaker, sslHost, configChannel, user, password; @@ -279,6 +281,21 @@ public bool CheckCertificateRevocation set => checkCertificateRevocation = value; } + /// + /// A Boolean value that specifies whether to use per-command validation of strict protocol validity. + /// This sends an additional command after EVERY command which incurs measurable overhead. + /// + /// + /// The regular RESP protocol does not include correlation identifiers between requests and responses; in exceptional + /// scenarios, protocol desynchronization can occur, which may not be noticed immediately; this option adds additional data + /// to ensure that this cannot occur, at the cost of some (small) additional bandwidth usage. + /// + public bool HighIntegrity + { + get => highIntegrity ?? Defaults.HighIntegrity; + set => highIntegrity = value; + } + /// /// Create a certificate validation check that checks against the supplied issuer even when not known by the machine. /// @@ -769,6 +786,7 @@ public static ConfigurationOptions Parse(string configuration, bool ignoreUnknow Protocol = Protocol, heartbeatInterval = heartbeatInterval, heartbeatConsistencyChecks = heartbeatConsistencyChecks, + highIntegrity = highIntegrity, }; /// @@ -849,6 +867,7 @@ public string ToString(bool includePassword) Append(sb, OptionKeys.ResponseTimeout, responseTimeout); Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase); Append(sb, OptionKeys.SetClientLibrary, setClientLibrary); + Append(sb, OptionKeys.HighIntegrity, highIntegrity); Append(sb, OptionKeys.Protocol, FormatProtocol(Protocol)); if (Tunnel is { IsInbuilt: true } tunnel) { @@ -894,7 +913,7 @@ private void Clear() { ClientName = ServiceName = user = password = tieBreaker = sslHost = configChannel = null; keepAlive = syncTimeout = asyncTimeout = connectTimeout = connectRetry = configCheckSeconds = DefaultDatabase = null; - allowAdmin = abortOnConnectFail = resolveDns = ssl = setClientLibrary = null; + allowAdmin = abortOnConnectFail = resolveDns = ssl = setClientLibrary = highIntegrity = null; SslProtocols = null; defaultVersion = null; EndPoints.Clear(); @@ -1013,6 +1032,9 @@ private ConfigurationOptions DoParse(string configuration, bool ignoreUnknown) case OptionKeys.SetClientLibrary: SetClientLibrary = OptionKeys.ParseBoolean(key, value); break; + case OptionKeys.HighIntegrity: + HighIntegrity = OptionKeys.ParseBoolean(key, value); + break; case OptionKeys.Tunnel: if (value.IsNullOrWhiteSpace()) { diff --git a/src/StackExchange.Redis/Enums/ConnectionFailureType.cs b/src/StackExchange.Redis/Enums/ConnectionFailureType.cs index 57958ecbc..d9407b69e 100644 --- a/src/StackExchange.Redis/Enums/ConnectionFailureType.cs +++ b/src/StackExchange.Redis/Enums/ConnectionFailureType.cs @@ -44,6 +44,10 @@ public enum ConnectionFailureType /// /// It has not been possible to create an initial connection to the redis server(s). /// - UnableToConnect + UnableToConnect, + /// + /// High-integrity mode was enabled, and a failure was detected + /// + ResponseIntegrityFailure, } } diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 9acf41fb1..3cdb4c997 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using StackExchange.Redis.Profiling; using System; +using System.Buffers.Binary; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -52,6 +53,8 @@ internal abstract class Message : ICompletable { public readonly int Db; + private uint _highIntegrityToken; + internal const CommandFlags InternalCallFlag = (CommandFlags)128; protected RedisCommand command; @@ -198,6 +201,13 @@ public bool IsAdmin public bool IsAsking => (Flags & AskingFlag) != 0; + public bool IsHighIntegrity => _highIntegrityToken != 0; + + public uint HighIntegrityToken => _highIntegrityToken; + + internal void WithHighIntegrity(uint value) + => _highIntegrityToken = value; + internal bool IsScriptUnavailable => (Flags & ScriptUnavailableFlag) != 0; internal void SetScriptUnavailable() => Flags |= ScriptUnavailableFlag; @@ -710,6 +720,28 @@ internal void WriteTo(PhysicalConnection physical) } } + private static ReadOnlySpan ChecksumTemplate => "$4\r\nXXXX\r\n"u8; + + internal void WriteHighIntegrityChecksumRequest(PhysicalConnection physical) + { + Debug.Assert(IsHighIntegrity, "should only be used for high-integrity"); + try + { + physical.WriteHeader(RedisCommand.ECHO, 1); // use WriteHeader to allow command-rewrite + + Span chk = stackalloc byte[10]; + Debug.Assert(ChecksumTemplate.Length == chk.Length, "checksum template length error"); + ChecksumTemplate.CopyTo(chk); + BinaryPrimitives.WriteUInt32LittleEndian(chk.Slice(4, 4), _highIntegrityToken); + physical.WriteRaw(chk); + } + catch (Exception ex) + { + physical?.OnInternalError(ex); + Fail(ConnectionFailureType.InternalFailure, ex, null, physical?.BridgeCouldBeNull?.Multiplexer); + } + } + internal static Message CreateHello(int protocolVersion, string? username, string? password, string? clientName, CommandFlags flags) => new HelloMessage(protocolVersion, username, password, clientName, flags); diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 522e2fcc9..7f9ffa32e 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -70,6 +70,8 @@ internal sealed class PhysicalBridge : IDisposable internal string? PhysicalName => physical?.ToString(); + private uint _nextHighIntegrityToken; // zero means not enabled + public DateTime? ConnectedAt { get; private set; } public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds) @@ -82,6 +84,11 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti #if !NETCOREAPP _singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds); #endif + if (type == ConnectionType.Interactive && Multiplexer.RawConfig.HighIntegrity) + { + // we just need this to be non-zero to enable tracking + _nextHighIntegrityToken = 1; + } } private readonly int TimeoutMilliseconds; @@ -1546,10 +1553,30 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne break; } + if (_nextHighIntegrityToken is not 0 + && !connection.TransactionActive // validated in the UNWATCH/EXEC/DISCARD + && message.Command is not RedisCommand.AUTH or RedisCommand.HELLO // if auth fails, ECHO may also fail; avoid confusion + ) + { + // make sure this value exists early to avoid a race condition + // if the response comes back super quickly + message.WithHighIntegrity(NextHighIntegrityTokenInsideLock()); + Debug.Assert(message.IsHighIntegrity, "message should be high integrity"); + } + else + { + Debug.Assert(!message.IsHighIntegrity, "prior high integrity message found during transaction?"); + } connection.EnqueueInsideWriteLock(message); isQueued = true; message.WriteTo(connection); + if (message.IsHighIntegrity) + { + message.WriteHighIntegrityChecksumRequest(connection); + IncrementOpCount(); + } + message.SetRequestSent(); IncrementOpCount(); @@ -1602,6 +1629,21 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne } } + private uint NextHighIntegrityTokenInsideLock() + { + // inside lock: no concurrency concerns here + switch (_nextHighIntegrityToken) + { + case 0: return 0; // disabled + case uint.MaxValue: + // avoid leaving the value at zero due to wrap-around + _nextHighIntegrityToken = 1; + return ushort.MaxValue; + default: + return _nextHighIntegrityToken++; + } + } + /// /// For testing only /// diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index d13b6af5b..c282121a5 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -19,6 +19,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging; using static StackExchange.Redis.Message; +using System.Buffers.Binary; namespace StackExchange.Redis { @@ -44,6 +45,8 @@ private static readonly Message // things sent to this physical, but not yet received private readonly Queue _writtenAwaitingResponse = new Queue(); + private Message? _awaitingToken; + private readonly string _physicalName; private volatile int currentDatabase = 0; @@ -388,6 +391,8 @@ public void RecordConnectionFailed( Exception? outerException = innerException; IdentifyFailureType(innerException, ref failureType); var bridge = BridgeCouldBeNull; + Message? nextMessage; + if (_ioPipe != null || isInitialConnect) // if *we* didn't burn the pipe: flag it { if (failureType == ConnectionFailureType.InternalFailure && innerException is not null) @@ -419,9 +424,9 @@ public void RecordConnectionFailed( lock (_writtenAwaitingResponse) { // find oldest message awaiting a response - if (_writtenAwaitingResponse.TryPeek(out var next)) + if (_writtenAwaitingResponse.TryPeek(out nextMessage)) { - unansweredWriteTime = next.GetWriteTime(); + unansweredWriteTime = nextMessage.GetWriteTime(); } } @@ -510,23 +515,17 @@ void add(string lk, string sk, string? v) bridge?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count); } - while (TryDequeueLocked(_writtenAwaitingResponse, out var next)) + var ex = innerException is RedisException ? innerException : outerException; + + nextMessage = Interlocked.Exchange(ref _awaitingToken, null); + if (nextMessage is not null) { - if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) - { - // fine, death of a socket is close enough - next.Complete(); - } - else - { - var ex = innerException is RedisException ? innerException : outerException; - if (bridge != null) - { - bridge.Trace("Failing: " + next); - bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); - } - next.SetExceptionAndComplete(ex!, bridge); - } + RecordMessageFailed(nextMessage, ex, origin, bridge); + } + + while (TryDequeueLocked(_writtenAwaitingResponse, out nextMessage)) + { + RecordMessageFailed(nextMessage, ex, origin, bridge); } // burn the socket @@ -541,6 +540,24 @@ static bool TryDequeueLocked(Queue queue, [NotNullWhen(true)] out Messa } } + private void RecordMessageFailed(Message next, Exception? ex, string? origin, PhysicalBridge? bridge) + { + if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) + { + // fine, death of a socket is close enough + next.Complete(); + } + else + { + if (bridge != null) + { + bridge.Trace("Failing: " + next); + bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); + } + next.SetExceptionAndComplete(ex!, bridge); + } + } + internal bool IsIdle() => _writeStatus == WriteStatus.Idle; internal void SetIdle() => _writeStatus = WriteStatus.Idle; internal void SetWriting() => _writeStatus = WriteStatus.Writing; @@ -880,6 +897,8 @@ internal void WriteHeader(RedisCommand command, int arguments, CommandBytes comm writer.Advance(offset); } + internal void WriteRaw(ReadOnlySpan bytes) => _ioPipe?.Output?.Write(bytes); + internal void RecordQuit() // don't blame redis if we fired the first shot => (_ioPipe as SocketConnection)?.TrySetProtocolShutdown(PipeShutdownKind.ProtocolExitClient); @@ -1680,10 +1699,27 @@ private void MatchResult(in RawResult result) // if it didn't look like "[p]message", then we still need to process the pending queue } Trace("Matching result..."); - Message? msg; + + Message? msg = null; + // check whether we're waiting for a high-integrity mode post-response checksum (using cheap null-check first) + if (_awaitingToken is not null && (msg = Interlocked.Exchange(ref _awaitingToken, null)) is not null) + { + _readStatus = ReadStatus.ResponseSequenceCheck; + if (!ProcessHighIntegrityResponseToken(msg, in result, BridgeCouldBeNull)) + { + RecordConnectionFailed(ConnectionFailureType.ResponseIntegrityFailure, origin: nameof(ReadStatus.ResponseSequenceCheck)); + } + return; + } + _readStatus = ReadStatus.DequeueResult; lock (_writtenAwaitingResponse) { + if (msg is not null) + { + _awaitingToken = null; + } + if (!_writtenAwaitingResponse.TryDequeue(out msg)) { throw new InvalidOperationException("Received response with no message waiting: " + result.ToString()); @@ -1696,11 +1732,56 @@ private void MatchResult(in RawResult result) if (msg.ComputeResult(this, result)) { _readStatus = msg.ResultBoxIsAsync ? ReadStatus.CompletePendingMessageAsync : ReadStatus.CompletePendingMessageSync; - msg.Complete(); + if (!msg.IsHighIntegrity) + { + // can't complete yet if needs checksum + msg.Complete(); + } } + if (msg.IsHighIntegrity) + { + // stash this for the next non-OOB response + Volatile.Write(ref _awaitingToken, msg); + } + + _readStatus = ReadStatus.MatchResultComplete; _activeMessage = null; + static bool ProcessHighIntegrityResponseToken(Message message, in RawResult result, PhysicalBridge? bridge) + { + bool isValid = false; + if (result.Resp2TypeBulkString == ResultType.BulkString) + { + var payload = result.Payload; + if (payload.Length == 4) + { + uint interpreted; + if (payload.IsSingleSegment) + { + interpreted = BinaryPrimitives.ReadUInt32LittleEndian(payload.First.Span); + } + else + { + Span span = stackalloc byte[4]; + payload.CopyTo(span); + interpreted = BinaryPrimitives.ReadUInt32LittleEndian(span); + } + isValid = interpreted == message.HighIntegrityToken; + } + } + if (isValid) + { + message.Complete(); + return true; + } + else + { + message.SetExceptionAndComplete(new InvalidOperationException("High-integrity mode detected possible protocol de-sync"), bridge); + return false; + } + } + static bool TryGetPubSubPayload(in RawResult value, out RedisValue parsed, bool allowArraySingleton = true) { if (value.IsNull) @@ -2032,6 +2113,7 @@ internal enum ReadStatus PubSubPMessage, Reconfigure, InvokePubSub, + ResponseSequenceCheck, // high-integrity mode only DequeueResult, ComputeResult, CompletePendingMessageSync, diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 1bcc6c66d..4504423fe 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -236,6 +236,8 @@ StackExchange.Redis.ConfigurationOptions.HeartbeatConsistencyChecks.get -> bool StackExchange.Redis.ConfigurationOptions.HeartbeatConsistencyChecks.set -> void StackExchange.Redis.ConfigurationOptions.HeartbeatInterval.get -> System.TimeSpan StackExchange.Redis.ConfigurationOptions.HeartbeatInterval.set -> void +StackExchange.Redis.ConfigurationOptions.HighIntegrity.get -> bool +StackExchange.Redis.ConfigurationOptions.HighIntegrity.set -> void StackExchange.Redis.ConfigurationOptions.HighPrioritySocketThreads.get -> bool StackExchange.Redis.ConfigurationOptions.HighPrioritySocketThreads.set -> void StackExchange.Redis.ConfigurationOptions.IncludeDetailInExceptions.get -> bool @@ -319,6 +321,7 @@ StackExchange.Redis.ConnectionFailureType.ProtocolFailure = 4 -> StackExchange.R StackExchange.Redis.ConnectionFailureType.SocketClosed = 6 -> StackExchange.Redis.ConnectionFailureType StackExchange.Redis.ConnectionFailureType.SocketFailure = 2 -> StackExchange.Redis.ConnectionFailureType StackExchange.Redis.ConnectionFailureType.UnableToConnect = 9 -> StackExchange.Redis.ConnectionFailureType +StackExchange.Redis.ConnectionFailureType.ResponseIntegrityFailure = 10 -> StackExchange.Redis.ConnectionFailureType StackExchange.Redis.ConnectionFailureType.UnableToResolvePhysicalConnection = 1 -> StackExchange.Redis.ConnectionFailureType StackExchange.Redis.ConnectionMultiplexer StackExchange.Redis.ConnectionMultiplexer.ClientName.get -> string! @@ -1801,6 +1804,7 @@ virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.GetDefaultSsl(S virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.GetSslHostFromEndpoints(StackExchange.Redis.EndPointCollection! endPoints) -> string? virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.HeartbeatConsistencyChecks.get -> bool virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.HeartbeatInterval.get -> System.TimeSpan +virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.HighIntegrity.get -> bool virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.IncludeDetailInExceptions.get -> bool virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.IncludePerformanceCountersInExceptions.get -> bool virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.IsMatch(System.Net.EndPoint! endpoint) -> bool diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 5f282702b..91b0e1a43 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1 +1 @@ - \ No newline at end of file +#nullable enable \ No newline at end of file diff --git a/tests/BasicTest/BasicTest.csproj b/tests/BasicTest/BasicTest.csproj index 0ba04d459..7fcf776ee 100644 --- a/tests/BasicTest/BasicTest.csproj +++ b/tests/BasicTest/BasicTest.csproj @@ -6,7 +6,7 @@ BasicTest Exe BasicTest - win7-x64 + diff --git a/tests/BasicTestBaseline/BasicTestBaseline.csproj b/tests/BasicTestBaseline/BasicTestBaseline.csproj index 43cf8a8b3..f396ae7c1 100644 --- a/tests/BasicTestBaseline/BasicTestBaseline.csproj +++ b/tests/BasicTestBaseline/BasicTestBaseline.csproj @@ -6,7 +6,7 @@ BasicTestBaseline Exe BasicTestBaseline - win7-x64 + $(DefineConstants);TEST_BASELINE diff --git a/tests/ConsoleTest/Program.cs b/tests/ConsoleTest/Program.cs index 0d6fa7e13..39e00701e 100644 --- a/tests/ConsoleTest/Program.cs +++ b/tests/ConsoleTest/Program.cs @@ -5,14 +5,27 @@ Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); -var options = ConfigurationOptions.Parse("localhost"); +var options = ConfigurationOptions.Parse("127.0.0.1"); +#if !SEREDIS_BASELINE +options.HighIntegrity = false; // as needed +Console.WriteLine($"{nameof(options.HighIntegrity)}: {options.HighIntegrity}"); +#endif + //options.SocketManager = SocketManager.ThreadPool; +Console.WriteLine("Connecting..."); var connection = ConnectionMultiplexer.Connect(options); +Console.WriteLine("Connected"); +connection.ConnectionFailed += Connection_ConnectionFailed; + +void Connection_ConnectionFailed(object? sender, ConnectionFailedEventArgs e) +{ + Console.Error.WriteLine($"CONNECTION FAILED: {e.ConnectionType}, {e.FailureType}, {e.Exception}"); +} var startTime = DateTime.UtcNow; var startCpuUsage = Process.GetCurrentProcess().TotalProcessorTime; -var scenario = args?.Length > 0 ? args[0] : "parallel"; +var scenario = args?.Length > 0 ? args[0] : "mass-insert-async"; switch (scenario) { @@ -24,6 +37,10 @@ Console.WriteLine("Mass insert test..."); MassInsert(connection); break; + case "mass-insert-async": + Console.WriteLine("Mass insert (async/pipelined) test..."); + await MassInsertAsync(connection); + break; case "mass-publish": Console.WriteLine("Mass publish test..."); MassPublish(connection); @@ -48,7 +65,8 @@ static void MassInsert(ConnectionMultiplexer connection) { - const int NUM_INSERTIONS = 100000; + const int NUM_INSERTIONS = 100_000; + const int BATCH = 5000; int matchErrors = 0; var database = connection.GetDatabase(0); @@ -61,20 +79,69 @@ static void MassInsert(ConnectionMultiplexer connection) database.StringSet(key, value); var retrievedValue = database.StringGet(key); - if (retrievedValue != value) - { - matchErrors++; - } + if (retrievedValue != value) + { + matchErrors++; + } + + if (i > 0 && i % BATCH == 0) + { + Console.WriteLine(i); + } + } + + Console.WriteLine($"Match errors: {matchErrors}"); +} + +static async Task MassInsertAsync(ConnectionMultiplexer connection) +{ + const int NUM_INSERTIONS = 100_000; + const int BATCH = 5000; + int matchErrors = 0; + + var database = connection.GetDatabase(0); + + var outstanding = new List<(Task, Task, string)>(BATCH); + + for (int i = 0; i < NUM_INSERTIONS; i++) + { + var key = $"StackExchange.Redis.Test.{i}"; + var value = i.ToString(); + + var set = database.StringSetAsync(key, value); + var get = database.StringGetAsync(key); + + outstanding.Add((set, get, value)); + + if (i > 0 && i % BATCH == 0) + { + matchErrors += await ValidateAsync(outstanding); + Console.WriteLine(i); + } - if (i > 0 && i % 5000 == 0) - { - Console.WriteLine(i); - } } + matchErrors += await ValidateAsync(outstanding); Console.WriteLine($"Match errors: {matchErrors}"); + + static async Task ValidateAsync(List<(Task, Task, string)> outstanding) + { + int matchErrors = 0; + foreach (var row in outstanding) + { + var s = await row.Item2; + await row.Item1; + if (s != row.Item3) + { + matchErrors++; + } + } + outstanding.Clear(); + return matchErrors; + } } + static void ParallelTasks(ConnectionMultiplexer connection) { static void ParallelRun(int taskId, ConnectionMultiplexer connection) diff --git a/tests/ConsoleTestBaseline/ConsoleTestBaseline.csproj b/tests/ConsoleTestBaseline/ConsoleTestBaseline.csproj index dc644561d..2c4bac2f5 100644 --- a/tests/ConsoleTestBaseline/ConsoleTestBaseline.csproj +++ b/tests/ConsoleTestBaseline/ConsoleTestBaseline.csproj @@ -5,6 +5,7 @@ Exe enable enable + $(DefineConstants);SEREDIS_BASELINE diff --git a/tests/StackExchange.Redis.Tests/BasicOpTests.cs b/tests/StackExchange.Redis.Tests/BasicOpTests.cs index 83499fed3..58c82dda6 100644 --- a/tests/StackExchange.Redis.Tests/BasicOpTests.cs +++ b/tests/StackExchange.Redis.Tests/BasicOpTests.cs @@ -7,10 +7,18 @@ namespace StackExchange.Redis.Tests; +[Collection(SharedConnectionFixture.Key)] +public class HighIntegrityBasicOpsTests : BasicOpsTests +{ + public HighIntegrityBasicOpsTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { } + + internal override bool HighIntegrity => true; +} + [Collection(SharedConnectionFixture.Key)] public class BasicOpsTests : TestBase { - public BasicOpsTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base (output, fixture) { } + public BasicOpsTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { } [Fact] public async Task PingOnce() @@ -469,4 +477,42 @@ public void WrappedDatabasePrefixIntegration() int count = (int)conn.GetDatabase().StringGet("abc" + key); Assert.Equal(3, count); } + + [Fact] + public void TransactionSync() + { + using var conn = Create(); + var db = conn.GetDatabase(); + + RedisKey key = Me(); + + var tran = db.CreateTransaction(); + _ = db.KeyDeleteAsync(key); + var x = tran.StringIncrementAsync(Me()); + var y = tran.StringIncrementAsync(Me()); + var z = tran.StringIncrementAsync(Me()); + Assert.True(tran.Execute()); + Assert.Equal(1, x.Result); + Assert.Equal(2, y.Result); + Assert.Equal(3, z.Result); + } + + [Fact] + public async Task TransactionAsync() + { + await using var conn = Create(); + var db = conn.GetDatabase(); + + RedisKey key = Me(); + + var tran = db.CreateTransaction(); + _ = db.KeyDeleteAsync(key); + var x = tran.StringIncrementAsync(Me()); + var y = tran.StringIncrementAsync(Me()); + var z = tran.StringIncrementAsync(Me()); + Assert.True(await tran.ExecuteAsync()); + Assert.Equal(1, await x); + Assert.Equal(2, await y); + Assert.Equal(3, await z); + } } diff --git a/tests/StackExchange.Redis.Tests/ConfigTests.cs b/tests/StackExchange.Redis.Tests/ConfigTests.cs index a2329dc04..f93eb176f 100644 --- a/tests/StackExchange.Redis.Tests/ConfigTests.cs +++ b/tests/StackExchange.Redis.Tests/ConfigTests.cs @@ -43,7 +43,7 @@ public void ExpectedFields() "configChannel", "configCheckSeconds", "connectRetry", "connectTimeout", "DefaultDatabase", "defaultOptions", "defaultVersion", "EndPoints", "heartbeatConsistencyChecks", - "heartbeatInterval", "includeDetailInExceptions", "includePerformanceCountersInExceptions", + "heartbeatInterval", "highIntegrity", "includeDetailInExceptions", "includePerformanceCountersInExceptions", "keepAlive", "LibraryName", "loggerFactory", "password", "Protocol", "proxy", "reconnectRetryPolicy", "resolveDns", "responseTimeout", @@ -760,4 +760,25 @@ public void DefaultConfigOptionsForSetLib(string configurationString, bool setli Assert.Equal(setlib, options.SetClientLibrary); Assert.Equal(configurationString, options.ToString()); } + + [Theory] + [InlineData(null, false, "dummy")] + [InlineData(false, false, "dummy,highIntegrity=False")] + [InlineData(true, true, "dummy,highIntegrity=True")] + public void CheckHighIntegrity(bool? assigned, bool expected, string cs) + { + var options = ConfigurationOptions.Parse("dummy"); + if (assigned.HasValue) options.HighIntegrity = assigned.Value; + + Assert.Equal(expected, options.HighIntegrity); + Assert.Equal(cs, options.ToString()); + + var clone = options.Clone(); + Assert.Equal(expected, clone.HighIntegrity); + Assert.Equal(cs, clone.ToString()); + + var parsed = ConfigurationOptions.Parse(cs); + Assert.Equal(expected, options.HighIntegrity); + + } } diff --git a/tests/StackExchange.Redis.Tests/ConnectCustomConfigTests.cs b/tests/StackExchange.Redis.Tests/ConnectCustomConfigTests.cs index 98351d04b..f0f189372 100644 --- a/tests/StackExchange.Redis.Tests/ConnectCustomConfigTests.cs +++ b/tests/StackExchange.Redis.Tests/ConnectCustomConfigTests.cs @@ -93,7 +93,7 @@ public void TiebreakerIncorrectType() } [Theory] - [InlineData(true, 5, 15)] + [InlineData(true, 4, 15)] [InlineData(false, 0, 0)] public async Task HeartbeatConsistencyCheckPingsAsync(bool enableConsistencyChecks, int minExpected, int maxExpected) { diff --git a/tests/StackExchange.Redis.Tests/ResultBoxTests.cs b/tests/StackExchange.Redis.Tests/ResultBoxTests.cs new file mode 100644 index 000000000..adb1b309f --- /dev/null +++ b/tests/StackExchange.Redis.Tests/ResultBoxTests.cs @@ -0,0 +1,95 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace StackExchange.Redis.Tests; + +public class ResultBoxTests +{ + [Fact] + public void SyncResultBox() + { + var msg = Message.Create(-1, CommandFlags.None, RedisCommand.PING); + var box = SimpleResultBox.Get(); + Assert.False(box.IsAsync); + + int activated = 0; + lock (box) + { + Task.Run(() => + { + lock (box) + { + // release the worker to start work + Monitor.PulseAll(box); + + // wait for the completion signal + if (Monitor.Wait(box, TimeSpan.FromSeconds(10))) + { + Interlocked.Increment(ref activated); + } + } + }); + Assert.True(Monitor.Wait(box, TimeSpan.FromSeconds(10)), "failed to handover lock to worker"); + } + + // check that continuation was not already signalled + Thread.Sleep(100); + Assert.Equal(0, Volatile.Read(ref activated)); + + msg.SetSource(ResultProcessor.DemandOK, box); + Assert.True(msg.TrySetResult("abc")); + + // check that TrySetResult did not signal continuation + Thread.Sleep(100); + Assert.Equal(0, Volatile.Read(ref activated)); + + // check that complete signals continuation + msg.Complete(); + Thread.Sleep(100); + Assert.Equal(1, Volatile.Read(ref activated)); + + var s = box.GetResult(out var ex); + Assert.Null(ex); + Assert.NotNull(s); + Assert.Equal("abc", s); + } + + [Fact] + public void TaskResultBox() + { + // TaskResultBox currently uses a stating field for values before activations are + // signalled; High Integrity Mode *demands* this behaviour, so: validate that it + // works correctly + var msg = Message.Create(-1, CommandFlags.None, RedisCommand.PING); + var box = TaskResultBox.Create(out var tcs, null); + Assert.True(box.IsAsync); + + msg.SetSource(ResultProcessor.DemandOK, box); + Assert.True(msg.TrySetResult("abc")); + + // check that continuation was not already signalled + Thread.Sleep(100); + Assert.False(tcs.Task.IsCompleted); + + msg.SetSource(ResultProcessor.DemandOK, box); + Assert.True(msg.TrySetResult("abc")); + + // check that TrySetResult did not signal continuation + Thread.Sleep(100); + Assert.False(tcs.Task.IsCompleted); + + // check that complete signals continuation + msg.Complete(); + Thread.Sleep(100); + Assert.True(tcs.Task.IsCompleted); + + var s = box.GetResult(out var ex); + Assert.Null(ex); + Assert.NotNull(s); + Assert.Equal("abc", s); + + Assert.Equal("abc", tcs.Task.Result); // we already checked IsCompleted + } +} diff --git a/tests/StackExchange.Redis.Tests/TestBase.cs b/tests/StackExchange.Redis.Tests/TestBase.cs index c0dfb028c..7038aaa2f 100644 --- a/tests/StackExchange.Redis.Tests/TestBase.cs +++ b/tests/StackExchange.Redis.Tests/TestBase.cs @@ -31,7 +31,7 @@ public abstract class TestBase : IDisposable private readonly SharedConnectionFixture? _fixture; - protected bool SharedFixtureAvailable => _fixture != null && _fixture.IsEnabled; + protected bool SharedFixtureAvailable => _fixture != null && _fixture.IsEnabled && !HighIntegrity; protected TestBase(ITestOutputHelper output, SharedConnectionFixture? fixture = null) { @@ -236,6 +236,8 @@ protected static IServer GetAnyPrimary(IConnectionMultiplexer muxer) throw new InvalidOperationException("Requires a primary endpoint (found none)"); } + internal virtual bool HighIntegrity => false; + internal virtual IInternalConnectionMultiplexer Create( string? clientName = null, int? syncTimeout = null, @@ -271,9 +273,10 @@ internal virtual IInternalConnectionMultiplexer Create( protocol ??= Context.Test.Protocol; // Share a connection if instructed to and we can - many specifics mean no sharing + bool highIntegrity = HighIntegrity; if (shared && expectedFailCount == 0 && _fixture != null && _fixture.IsEnabled - && CanShare(allowAdmin, password, tieBreaker, fail, disabledCommands, enabledCommands, channelPrefix, proxy, configuration, defaultDatabase, backlogPolicy)) + && CanShare(allowAdmin, password, tieBreaker, fail, disabledCommands, enabledCommands, channelPrefix, proxy, configuration, defaultDatabase, backlogPolicy, highIntegrity)) { configuration = GetConfiguration(); var fixtureConn = _fixture.GetConnection(this, protocol.Value, caller: caller); @@ -296,7 +299,7 @@ internal virtual IInternalConnectionMultiplexer Create( checkConnect, failMessage, channelPrefix, proxy, logTransactionData, defaultDatabase, - backlogPolicy, protocol, + backlogPolicy, protocol, highIntegrity, caller); ThrowIfIncorrectProtocol(conn, protocol); @@ -319,7 +322,8 @@ internal static bool CanShare( Proxy? proxy, string? configuration, int? defaultDatabase, - BacklogPolicy? backlogPolicy + BacklogPolicy? backlogPolicy, + bool highIntegrity ) => enabledCommands == null && disabledCommands == null @@ -331,7 +335,8 @@ internal static bool CanShare( && tieBreaker == null && defaultDatabase == null && (allowAdmin == null || allowAdmin == true) - && backlogPolicy == null; + && backlogPolicy == null + && !highIntegrity; internal void ThrowIfIncorrectProtocol(IInternalConnectionMultiplexer conn, RedisProtocol? requiredProtocol) { @@ -390,6 +395,7 @@ public static ConnectionMultiplexer CreateDefault( int? defaultDatabase = null, BacklogPolicy? backlogPolicy = null, RedisProtocol? protocol = null, + bool highIntegrity = false, [CallerMemberName] string caller = "") { StringWriter? localLog = null; @@ -425,6 +431,7 @@ public static ConnectionMultiplexer CreateDefault( if (defaultDatabase is not null) config.DefaultDatabase = defaultDatabase.Value; if (backlogPolicy is not null) config.BacklogPolicy = backlogPolicy; if (protocol is not null) config.Protocol = protocol; + if (highIntegrity) config.HighIntegrity = highIntegrity; var watch = Stopwatch.StartNew(); var task = ConnectionMultiplexer.ConnectAsync(config, log); if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))