Skip to content

Commit

Permalink
Fix IPC data handling (#7714)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubo authored Nov 6, 2024
1 parent ed63eea commit bba37d4
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 73 deletions.
53 changes: 30 additions & 23 deletions src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -63,7 +64,6 @@ public async Task Can_handle_very_large_objects()
Assert.That(sent, Is.EqualTo(received));
}

[Test]
[TestCase(1)]
[TestCase(2)]
[TestCase(10)]
Expand All @@ -80,8 +80,13 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
byte[] buffer = new byte[10];
while (true)
{
ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false);
if (result?.EndOfMessage == true)
ReceiveResult? result = await stream.ReceiveAsync(buffer);

// Imitate random delays
if (Stopwatch.GetTimestamp() % 101 == 0)
await Task.Delay(1);

if (result is not null && IsEndOfIpcMessage(result))
{
messages++;
}
Expand All @@ -108,7 +113,7 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
Task<int> sendMessages = Task.Run(async () =>
{
using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false);
await socket.ConnectAsync(ipEndPoint);
using IpcSocketMessageStream stream = new(socket);
using JsonRpcSocketsClient<IpcSocketMessageStream> client = new(
Expand All @@ -123,18 +128,18 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
for (int i = 0; i < messageCount; i++)
{
using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1_000, () => disposeCount++), default);
await client.SendJsonRpcResult(result).ConfigureAwait(false);
await Task.Delay(1).ConfigureAwait(false);
using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(100, () => disposeCount++), default);
await client.SendJsonRpcResult(result);
await Task.Delay(1);
}
disposeCount.Should().Be(messageCount);
await cts.CancelAsync().ConfigureAwait(false);
await cts.CancelAsync();
return messageCount;
});

await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false);
await Task.WhenAll(sendMessages, receiveMessages);
int sent = sendMessages.Result;
int received = receiveMessages.Result;

Expand All @@ -158,17 +163,17 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc
byte[] buffer = new byte[bufferSize];
while (true)
{
ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false);
ReceiveResult? result = await stream.ReceiveAsync(buffer);
if (result is not null)
{
msg.AddRange(buffer.Take(result.Read));
}

if (result?.EndOfMessage == true)
{
messages++;
receivedMessages.Add(msg.ToArray());
msg = [];
if (IsEndOfIpcMessage(result))
{
messages++;
receivedMessages.Add(msg.ToArray());
msg = [];
}
}

if (result is null || result.Closed)
Expand All @@ -190,14 +195,14 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc

Task<int> receiveMessages = OneShotServer(
ipEndPoint,
async socket => await ReadMessages(socket, receivedMessages, cts.Token).ConfigureAwait(false)
async socket => await ReadMessages(socket, receivedMessages, cts.Token)
);

Task<int> sendMessages = Task.Run(async () =>
{
int messageCount = 0;
using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false);
await socket.ConnectAsync(ipEndPoint);
using IpcSocketMessageStream stream = new(socket);
using JsonRpcSocketsClient<IpcSocketMessageStream> client = new(
Expand All @@ -216,20 +221,20 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc
messageCount++;
var msg = Enumerable.Range(11, i).Select(x => (byte)x).ToArray();
sentMessages.Add(msg);
await stream.WriteAsync(msg).ConfigureAwait(false);
await stream.WriteEndOfMessageAsync().ConfigureAwait(false);
await stream.WriteAsync(msg.Append((byte)'\n').ToArray());
if (i % 10 == 0)
{
await Task.Delay(1).ConfigureAwait(false);
await Task.Delay(1);
}
}
stream.Close();
await cts.CancelAsync().ConfigureAwait(false);
await cts.CancelAsync();
return messageCount;
});

await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false);
await Task.WhenAll(sendMessages, receiveMessages);
int sent = sendMessages.Result;
int received = receiveMessages.Result;

Expand Down Expand Up @@ -551,4 +556,6 @@ private static string RandomString(int length)
}
return new string(stringChars);
}

private static bool IsEndOfIpcMessage(ReceiveResult result) => result.EndOfMessage && (!result.Closed || result.Read != 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@
"Docker": {
"commandName": "Docker",
"commandLineArgs": "-c holesky --data-dir .data /data --jsonrpc-enginehost 0.0.0.0 --jsonrpc-engineport 8551 --jsonrpc-host 0.0.0.0"
},
"WSL": {
"commandName": "WSL",
"commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"distributionName": ""
}
}
}
93 changes: 43 additions & 50 deletions src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,76 +11,69 @@ namespace Nethermind.Sockets;

public class IpcSocketMessageStream(Socket socket) : NetworkStream(socket), IMessageBorderPreservingStream
{
private static readonly byte Delimiter = Convert.ToByte('\n');
private const byte Delimiter = (byte)'\n';

public byte[] bufferedData = [];
public int bufferedDataLength = 0;
private byte[] _bufferedData = [];
private int _bufferedDataLength = 0;

public async Task<ReceiveResult?> ReceiveAsync(ArraySegment<byte> buffer)
{
ReceiveResult? result = null;
if (Socket.Connected)
if (!Socket.Connected)
return null;

if (_bufferedDataLength > 0)
{
if (bufferedDataLength > 0)
if (_bufferedDataLength > buffer.Count)
throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one");

try
{
if (bufferedDataLength > buffer.Count)
{
throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one");
}
try
{
Buffer.BlockCopy(bufferedData, 0, buffer.Array!, buffer.Offset, bufferedDataLength);
}
catch (Exception)
{

}
Buffer.BlockCopy(_bufferedData, 0, buffer.Array!, buffer.Offset, _bufferedDataLength);
}
catch { }
}

int read = bufferedDataLength + await Socket.ReceiveAsync(buffer[bufferedDataLength..], SocketFlags.None);
int read = _bufferedDataLength + await Socket.ReceiveAsync(buffer[_bufferedDataLength..], SocketFlags.None);

int delimiter = ((IList<byte>)buffer[..read]).IndexOf(Delimiter);
int delimiter = ((IList<byte>)buffer[..read]).IndexOf(Delimiter);
bool endOfMessage;

bool endOfMessage;
if (delimiter != -1 && (delimiter + 1) < read)
{
bufferedDataLength = read - delimiter - 1;

if (bufferedData.Length < buffer.Count)
{
if (bufferedData.Length != 0)
{
ArrayPool<byte>.Shared.Return(bufferedData);
}
bufferedData = ArrayPool<byte>.Shared.Rent(buffer.Count);
}
endOfMessage = true;
buffer[(delimiter + 1)..read].CopyTo(bufferedData);
read = delimiter + 1;
}
else
if (delimiter != -1 && (delimiter + 1) < read)
{
_bufferedDataLength = read - delimiter - 1;

if (_bufferedData.Length < buffer.Count)
{
endOfMessage = delimiter != -1;
bufferedDataLength = 0;
if (_bufferedData.Length != 0)
ArrayPool<byte>.Shared.Return(_bufferedData);

_bufferedData = ArrayPool<byte>.Shared.Rent(buffer.Count);
}

result = new ReceiveResult()
{
Closed = read == 0,
Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read,
EndOfMessage = endOfMessage,
CloseStatusDescription = null
};
endOfMessage = true;
buffer[(delimiter + 1)..read].CopyTo(_bufferedData);
read = delimiter + 1;
}
else
{
endOfMessage = delimiter != -1 || Socket.Available == 0;
_bufferedDataLength = 0;
}

return result;
return new()
{
Closed = read == 0,
Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read,
EndOfMessage = endOfMessage,
CloseStatusDescription = null
};
}

protected override void Dispose(bool disposing)
{
if (disposing && bufferedData.Length != 0)
if (disposing && _bufferedData.Length != 0)
{
ArrayPool<byte>.Shared.Return(bufferedData);
ArrayPool<byte>.Shared.Return(_bufferedData);
}
base.Dispose(disposing);
}
Expand Down

0 comments on commit bba37d4

Please sign in to comment.