Skip to content

Commit

Permalink
Feature: Add HeartbeatConsistencyChecks (#2656)
Browse files Browse the repository at this point in the history
This is a new feature for allowing keepalive commands to be sent every heartbeat regardless of if they're needed for actual keepalives. I've gotten a ping about a network drop that went undetected for ~8 minutes because of usage only in strings. We see this use case not uncommonly, and because of that string type being the only thing used, we don't detect a protocol fault from subsequent string commands.

The symptoms here are partial/unexpected string payloads but ultimately the wrong answers to the wrong query. Given we don't know at what layer this drop happens (this appears to be extremely rare, 1 in quadrillions as far as we know), the best we can currently do is react to it ASAP.

There may be a better way to accomplish what this is after - discussing with Marc soon as we're both online but prepping this PR in case it's best path.
  • Loading branch information
NickCraver authored Feb 26, 2024
1 parent 6392871 commit 54a633f
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ Additional code-only options:
- The thread pool to use for scheduling work to and from the socket connected to Redis, one of...
- `SocketManager.Shared`: Use a shared dedicated thread pool for _all_ multiplexers (defaults to 10 threads) - best balance for most scenarios.
- `SocketManager.ThreadPool`: Use the build-in .NET thread pool for scheduling. This can perform better for very small numbers of cores or with large apps on large machines that need to use more than 10 threads (total, across all multiplexers) under load. **Important**: this option isn't the default because it's subject to thread pool growth/starvation and if for example synchronous calls are waiting on a redis command to come back to unblock other threads, stalls/hangs can result. Use with caution, especially if you have sync-over-async work in play.
- HeartbeatConsistencyChecks - Default: `false`
- Allows _always_ sending keepalive checks even if a connection isn't idle. This trades extra commands (per `HeartbeatInterval` - default 1 second) to check the network stream for consistency. If any data was lost, the result won't be as expected and the connection will be terminated ASAP. This is a check to react to any data loss at the network layer as soon as possible.
- HeartbeatInterval - Default: `1000ms`
- Allows running the heartbeat more often which importantly includes timeout evaluation for async commands. For example if you have a 50ms async command timeout, we're only actually checking it during the heartbeat (once per second by default), so it's possible 50-1050ms pass _before we notice it timed out_. If you want more fidelity in that check and to observe that a server failed faster, you can lower this to run the heartbeat more often to achieve that.
- **Note: heartbeats are not free and that's why the default is 1 second. There is additional overhead to running this more often simply because it does some work each time it fires.**
Expand Down
3 changes: 2 additions & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current package versions:
## Unreleased

- Fix [#2653](https://github.com/StackExchange/StackExchange.Redis/issues/2653): Client library metadata should validate contents ([#2654](https://github.com/StackExchange/StackExchange.Redis/pull/2654) by mgravell)
- Add `HeartbeatConsistencyChecks` option (opt-in) to enabled per-heartbeat (defaults to once per second) checks to be sent to ensure no network stream corruption has occured ([#2656 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2656))

## 2.7.20

Expand All @@ -25,7 +26,7 @@ Current package versions:

- Fix [#2593](https://github.com/StackExchange/StackExchange.Redis/issues/2593): `EXPIRETIME` and `PEXPIRETIME` miscategorized as `PrimaryOnly` commands causing them to fail when issued against a read-only replica ([#2593 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2593))
- Fix [#2591](https://github.com/StackExchange/StackExchange.Redis/issues/2591): Add `HELLO` to Sentinel connections so they can support RESP3 ([#2601 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2601))
- Fix [#2595](https://github.com/StackExchange/StackExchange.Redis/issues/2595): Add detection handling for dead sockets that the OS says are okay, seen especially in Linux environments (https://github.com/StackExchange/StackExchange.Redis/pull/2610)
- Fix [#2595](https://github.com/StackExchange/StackExchange.Redis/issues/2595): Add detection handling for dead sockets that the OS says are okay, seen especially in Linux environments ([#2610 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2610))

## 2.7.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public static DefaultOptionsProvider GetProvider(EndPoint endpoint)
/// <remarks>Be aware setting this very low incurs additional overhead of evaluating the above more often.</remarks>
public virtual TimeSpan HeartbeatInterval => TimeSpan.FromSeconds(1);

/// <summary>
/// Whether to enable ECHO checks on every heartbeat to ensure network stream consistency.
/// This is a rare measure to react to any potential network traffic drops ASAP, terminating the connection.
/// </summary>
public virtual bool HeartbeatConsistencyChecks => false;

/// <summary>
/// Should exceptions include identifiable details? (key names, additional .Data annotations)
/// </summary>
Expand Down
12 changes: 11 additions & 1 deletion src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static string TryNormalize(string value)

private DefaultOptionsProvider? defaultOptions;

private bool? allowAdmin, abortOnConnectFail, resolveDns, ssl, checkCertificateRevocation,
private bool? allowAdmin, abortOnConnectFail, resolveDns, ssl, checkCertificateRevocation, heartbeatConsistencyChecks,
includeDetailInExceptions, includePerformanceCountersInExceptions, setClientLibrary;

private string? tieBreaker, sslHost, configChannel, user, password;
Expand Down Expand Up @@ -402,6 +402,16 @@ public Version DefaultVersion
/// </remarks>
public EndPointCollection EndPoints { get; init; } = new EndPointCollection();

/// <summary>
/// Whether to enable ECHO checks on every heartbeat to ensure network stream consistency.
/// This is a rare measure to react to any potential network traffic drops ASAP, terminating the connection.
/// </summary>
public bool HeartbeatConsistencyChecks
{
get => heartbeatConsistencyChecks ?? Defaults.HeartbeatConsistencyChecks;
set => heartbeatConsistencyChecks = value;
}

/// <summary>
/// Controls how often the connection heartbeats. A heartbeat includes:
/// - Evaluating if any messages have timed out
Expand Down
25 changes: 19 additions & 6 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,13 @@ internal void IncrementOpCount()
Interlocked.Increment(ref operationCount);
}

internal void KeepAlive()
/// <summary>
/// Sends a keepalive message (ECHO or PING) to keep connections alive and check validity of response.
/// </summary>
/// <param name="forceRun">Whether to run even then the connection isn't idle.</param>
internal void KeepAlive(bool forceRun = false)
{
if (!(physical?.IsIdle() ?? false)) return; // don't pile on if already doing something
if (!forceRun && !(physical?.IsIdle() ?? false)) return; // don't pile on if already doing something

var commandMap = Multiplexer.CommandMap;
Message? msg = null;
Expand Down Expand Up @@ -596,6 +600,15 @@ internal void OnHeartbeat(bool ifConnectedOnly)
checkConfigSeconds = ServerEndPoint.ConfigCheckSeconds;

if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive
&& tmp.BridgeCouldBeNull?.Multiplexer.RawConfig.HeartbeatConsistencyChecks == true)
{
// If HeartbeatConsistencyChecks are enabled, we're sending a PING (expecting PONG) or ECHO (expecting UniqueID back) every single
// heartbeat as an opt-in measure to react to any network stream drop ASAP to terminate the connection as faulted.
// If we don't get the expected response to that command, then the connection is terminated.
// This is to prevent the case of things like 100% string command usage where a protocol error isn't otherwise encountered.
KeepAlive(forceRun: true);
}
else if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive
&& checkConfigSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds
&& ServerEndPoint.CheckInfoReplication())
{
Expand All @@ -614,13 +627,13 @@ internal void OnHeartbeat(bool ifConnectedOnly)
tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely
}
}
else if (writeEverySeconds <= 0 && tmp.IsIdle()
else if (writeEverySeconds <= 0
&& tmp.IsIdle()
&& tmp.LastWriteSecondsAgo > 2
&& tmp.GetSentAwaitingResponseCount() != 0)
{
// there's a chance this is a dead socket; sending data will shake that
// up a bit, so if we have an empty unsent queue and a non-empty sent
// queue, test the socket
// There's a chance this is a dead socket; sending data will shake that up a bit,
// so if we have an empty unsent queue and a non-empty sent queue, test the socket.
KeepAlive();
}
else if (timedOutThisHeartbeat > 0
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ StackExchange.Redis.ConfigurationOptions.DefaultVersion.get -> System.Version!
StackExchange.Redis.ConfigurationOptions.DefaultVersion.set -> void
StackExchange.Redis.ConfigurationOptions.EndPoints.get -> StackExchange.Redis.EndPointCollection!
StackExchange.Redis.ConfigurationOptions.EndPoints.init -> void
StackExchange.Redis.ConfigurationOptions.HeartbeatConsistencyChecks.get -> bool
StackExchange.Redis.ConfigurationOptions.HeartbeatConsistencyChecks.set -> void
StackExchange.Redis.ConfigurationOptions.HeartbeatInterval.get -> System.TimeSpan
StackExchange.Redis.ConfigurationOptions.HeartbeatInterval.set -> void
StackExchange.Redis.ConfigurationOptions.HighPrioritySocketThreads.get -> bool
Expand Down Expand Up @@ -1797,6 +1799,7 @@ virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.DefaultVersion.
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.GetDefaultClientName() -> string!
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.GetDefaultSsl(StackExchange.Redis.EndPointCollection! endPoints) -> bool
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.GetSslHostFromEndpoints(StackExchange.Redis.EndPointCollection! endPoints) -> string?
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.HeartbeatConsistencyChecks.get -> bool
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.HeartbeatInterval.get -> System.TimeSpan
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.IncludeDetailInExceptions.get -> bool
virtual StackExchange.Redis.Configuration.DefaultOptionsProvider.IncludePerformanceCountersInExceptions.get -> bool
Expand Down
6 changes: 3 additions & 3 deletions src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ internal string GetProfile()

internal string? GetStormLog(Message message) => GetBridge(message)?.GetStormLog();

internal Message GetTracerMessage(bool assertIdentity)
internal Message GetTracerMessage(bool checkResponse)
{
// Different configurations block certain commands, as can ad-hoc local configurations, so
// we'll do the best with what we have available.
Expand All @@ -576,7 +576,7 @@ internal Message GetTracerMessage(bool assertIdentity)
var map = Multiplexer.CommandMap;
Message msg;
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget;
if (assertIdentity && map.IsAvailable(RedisCommand.ECHO))
if (checkResponse && map.IsAvailable(RedisCommand.ECHO))
{
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)Multiplexer.UniqueId);
}
Expand All @@ -588,7 +588,7 @@ internal Message GetTracerMessage(bool assertIdentity)
{
msg = Message.Create(-1, flags, RedisCommand.TIME);
}
else if (!assertIdentity && map.IsAvailable(RedisCommand.ECHO))
else if (!checkResponse && map.IsAvailable(RedisCommand.ECHO))
{
// We'll use echo as a PING substitute if it is all we have (in preference to EXISTS)
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)Multiplexer.UniqueId);
Expand Down
38 changes: 37 additions & 1 deletion tests/StackExchange.Redis.Tests/ConnectCustomConfigTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Xunit;
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests;
Expand Down Expand Up @@ -89,4 +91,38 @@ public void TiebreakerIncorrectType()
var ex = Assert.Throws<RedisServerException>(() => db.StringGet(tiebreakerKey));
Assert.Contains("WRONGTYPE", ex.Message);
}

[Theory]
[InlineData(true, 5, 15)]
[InlineData(false, 0, 0)]
public async Task HeartbeatConsistencyCheckPingsAsync(bool enableConsistencyChecks, int minExpected, int maxExpected)
{
var options = new ConfigurationOptions()
{
HeartbeatConsistencyChecks = enableConsistencyChecks,
HeartbeatInterval = TimeSpan.FromMilliseconds(100),
};
options.EndPoints.Add(TestConfig.Current.PrimaryServerAndPort);

using var conn = await ConnectionMultiplexer.ConnectAsync(options, Writer);

var db = conn.GetDatabase();
db.Ping();
Assert.True(db.IsConnected(default));

var preCount = conn.OperationCount;
Log("OperationCount (pre-delay): " + preCount);

// Allow several heartbeats to happen, but don't need to be strict here
// e.g. allow thread pool starvation flex with the test suite's load (just check for a few)
await Task.Delay(TimeSpan.FromSeconds(1));

var postCount = conn.OperationCount;
Log("OperationCount (post-delay): " + postCount);

var opCount = postCount - preCount;
Log("OperationCount (diff): " + opCount);

Assert.True(minExpected <= opCount && opCount >= minExpected, $"Expected opcount ({opCount}) between {minExpected}-{maxExpected}");
}
}
5 changes: 5 additions & 0 deletions tests/StackExchange.Redis.Tests/DefaultOptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class TestOptionsProvider : DefaultOptionsProvider
public override int ConnectRetry => 123;
public override Version DefaultVersion => new Version(1, 2, 3, 4);
protected override string GetDefaultClientName() => "TestPrefix-" + base.GetDefaultClientName();
public override bool HeartbeatConsistencyChecks => true;
public override TimeSpan HeartbeatInterval => TimeSpan.FromMilliseconds(500);
public override bool IsMatch(EndPoint endpoint) => endpoint is DnsEndPoint dnsep && dnsep.Host.EndsWith(_domainSuffix);
public override TimeSpan KeepAliveInterval => TimeSpan.FromSeconds(125);
public override ILoggerFactory? LoggerFactory => NullLoggerFactory.Instance;
Expand Down Expand Up @@ -99,6 +101,9 @@ private static void AssertAllOverrides(ConfigurationOptions options)
Assert.Equal(123, options.ConnectRetry);
Assert.Equal(new Version(1, 2, 3, 4), options.DefaultVersion);

Assert.True(options.HeartbeatConsistencyChecks);
Assert.Equal(TimeSpan.FromMilliseconds(500), options.HeartbeatInterval);

Assert.Equal(TimeSpan.FromSeconds(125), TimeSpan.FromSeconds(options.KeepAlive));
Assert.Equal(NullLoggerFactory.Instance, options.LoggerFactory);
Assert.Equal(Proxy.Twemproxy, options.Proxy);
Expand Down

0 comments on commit 54a633f

Please sign in to comment.