Skip to content

Commit

Permalink
Added buffer factory and async data capture to allow for TCP streams …
Browse files Browse the repository at this point in the history
…to be captured successfully even under high network load. Modified FFXIV monitor to use socket filtering by IP address by default.
  • Loading branch information
ravahn committed Jan 30, 2018
2 parents efcb875 + ddd508d commit 9238e37
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 101 deletions.
7 changes: 7 additions & 0 deletions Machina.FFXIV/FFXIVBundleDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ public unsafe void StoreData(byte[] buffer)
}
int messageBufferSize;
byte[] message = DecompressFFXIVMessage(ref header, _bundleBuffer, offset, out messageBufferSize);

// Handle error condition from decompression - stream may be invalid.
if (message == null)
{
_allocated = 0;
return;
}

offset += header.length;
if (offset == _allocated)
Expand Down
9 changes: 8 additions & 1 deletion Machina.FFXIV/FFXIVNetworkMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public uint ProcessID
public string LocalIP
{ get; set; } = "";

/// <summary>
/// Specifies whether to use Winsock/WinPcap server IP filtering instead of filtering in code
/// This has a small chance of losing data when new TCP sockets connect, but significantly reduces data processing overhead.
/// </summary>
public Boolean UseSocketFilter
{ get; set; } = false;

#region Message Delegates section
public delegate void MessageReceivedDelegate(string connection, long epoch, byte[] message);

Expand Down Expand Up @@ -97,7 +104,7 @@ public void Start()
_monitor.WindowName = "FINAL FANTASY XIV";
_monitor.MonitorType = MonitorType;
_monitor.LocalIP = LocalIP;
_monitor.UseOneSocketPerRemoteIP = true;
_monitor.UseOneSocketPerRemoteIP = UseSocketFilter;

_monitor.DataSent = (string connection, byte[] data) => ProcessSentMessage(connection, data);
_monitor.DataReceived = (string connection, byte[] data) => ProcessReceivedMessage(connection, data);
Expand Down
2 changes: 2 additions & 0 deletions Machina.FFXIV/Machina.FFXIV.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
Expand All @@ -31,6 +32,7 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
Expand Down
2 changes: 1 addition & 1 deletion Machina.Tests/RawPCapTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void RawPCap_GetDataTwiceTest()
System.Net.IPAddress address = System.Net.IPAddress.Parse(ip);

var sut = new RawPCap();
sut.Create((uint)address.Address);

// start an async download
System.Net.WebClient client = new System.Net.WebClient();
Expand All @@ -55,7 +56,6 @@ public void RawPCap_GetDataTwiceTest()

try
{
sut.Create((uint)address.Address);
t.Wait();

byte[] buffer;
Expand Down
9 changes: 6 additions & 3 deletions Machina.Tests/TCPNetworkMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ public void TCPNetworkMonitor_RawSocket_SendAndReceiveData()
monitor.MonitorType = TCPNetworkMonitor.NetworkMonitorType.RawSocket;
monitor.DataReceived += (string connection, byte[] data) => DataReceived(connection, data);
monitor.DataSent += (string connection, byte[] data) => DataSent(connection, data);
monitor.UseOneSocketPerRemoteIP = false;


// start an async download
monitor.Start();
// start a dummy async download
System.Net.WebClient client = new System.Net.WebClient();
Task t = client.DownloadStringTaskAsync("http://www.google.com");
monitor.Start();
t.Wait();

t = client.DownloadStringTaskAsync("http://www.google.com");
t.Wait();

for (int i=0;i<100;i++)
Expand Down
37 changes: 36 additions & 1 deletion Machina/IRawSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,49 @@

namespace Machina
{
/// <summary>
/// Defines the common public routines for raw socket capturea
/// </summary>
interface IRawSocket
{
/// <summary>
/// Returns the local IP address for which the raw socket is configured.
/// </summary>
uint LocalIP
{ get; }

/// <summary>
/// Returns the remote IP address for which the raw socket is configured, or 0 if none is configured.
/// </summary>
uint RemoteIP
{ get; }

/// <summary>
/// Initializes the raw socket and starts the capture process
/// Note that remoteAddress can be used to significantly improve the reliability of capture of active connections by
/// offloading filtering to the winsock/winpcap layer, however if the goal is to capture all packets for a single
/// TCP connection, the remote address must be known and the socket created before the connection is initiated.
/// This is frequently impossible for monitoring third-party applications.
/// </summary>
/// <param name="localAddress">local IP address of the interface initiating the packets of interest</param>
/// <param name="remoteAddress">remote IP address of the host, or 0 to capture all packets on the local interface.</param>
void Create(uint localAddress, uint remoteAddress = 0);
int Receive(out byte[] buffer);

/// <summary>
/// Stops raw socket capture and cleans up any resources.
/// </summary>
void Destroy();

/// <summary>
/// Returns both a reference to a byte array buffer, and the amount of bytes in that array containing payload data.
/// return value of 0 indicates that there is no data available.
/// </summary>
int Receive(out byte[] buffer);

/// <summary>
/// Stores the buffer after it is processed for future reads. This allows for fewer .Net garbage collection calls,
/// but is not strictly necessary for functioning.
/// </summary>
void FreeBuffer(ref byte[] buffer);
}
}
2 changes: 2 additions & 0 deletions Machina/Machina.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
<Optimize>true</Optimize>
<PlatformTarget>AnyCPU</PlatformTarget>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.CSharp" />
<Reference Include="System" />
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="NetworkBufferFactory.cs" />
<Compile Include="IPDecoder.cs" />
<Compile Include="Headers\IPv4Header.cs" />
<Compile Include="Headers\IPv6Header.cs" />
Expand Down
103 changes: 103 additions & 0 deletions Machina/NetworkBufferFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Machina
{
/// <summary>
/// This class manages a number of byte arrays in order to optimize memory usage for raw network socket reading.
/// By recycling the arrays when the data is done, it reduces the amount of garbage collection
/// which reduces the interruptions in network data capture, and thus reduces the frequency of lost packets.
/// </summary>
public class NetworkBufferFactory
{
/// <summary>
/// Helper class to encapsulate each array
/// </summary>
public class Buffer
{
public byte[] Data;
public int AllocatedSize;
}

// Internal queues used to store the two types of buffers - allocated an free.
private ConcurrentQueue<Buffer> _freeBufferQueue = new ConcurrentQueue<Buffer>();
private ConcurrentQueue<Buffer> _allocatedBufferQueue = new ConcurrentQueue<Buffer>();

// default buffer size
private int _bufferSize = 1024 * 64+1; // maximum TCP packet size is just under 64kb, and winsock seems to limit receive calls to this size.

/// <summary>
/// constructor for the buffer factory
/// </summary>
/// <param name="initialBufferCount">Initial number of buffers to pre-allocate</param>
/// <param name="bufferSize">size of each buffer, in case the default size is insufficient</param>
public NetworkBufferFactory(int initialBufferCount, int bufferSize)
{
if (bufferSize > 0)
_bufferSize = bufferSize;

if (initialBufferCount > 0)
for (int i = 0; i < initialBufferCount; i++)
_freeBufferQueue.Enqueue(CreateNewBuffer());
}

/// <summary>
/// Returns the next unused buffer from the queue. If none are available, returns a new buffer.
/// </summary>
public Buffer GetNextFreeBuffer()
{
// attempt to pull from free buffer queue
if (!_freeBufferQueue.IsEmpty)
if (_freeBufferQueue.TryDequeue(out Buffer result))
return result;

return CreateNewBuffer();
}

/// <summary>
/// Returns the next used buffer from the queue. If none are available, returns NULL
/// </summary>
public Buffer GetNextAllocatedBuffer()
{
// attempt to pull from allocated buffer queue
if (!_allocatedBufferQueue.IsEmpty)
if (_allocatedBufferQueue.TryDequeue(out Buffer buffer))
return buffer;

return null;
}

/// <summary>
/// Adds a buffer back to the free queue, so it can be reused.
/// </summary>
public void AddFreeBuffer(Buffer buffer)
{
if (buffer != null)
{
buffer.AllocatedSize = 0;
_freeBufferQueue.Enqueue(buffer);
}
}

/// <summary>
/// Adds a buffer containing data to the allocated queue.
/// </summary>
public void AddAllocatedBuffer(Buffer buffer)
{
if (buffer != null)
_allocatedBufferQueue.Enqueue(buffer);
}


/// <summary>
/// Creates a new buffer, including allocating the array.
/// </summary>
private Buffer CreateNewBuffer()
{
return new Buffer { Data = new byte[_bufferSize], AllocatedSize = 0 };
}
}
}
Loading

0 comments on commit 9238e37

Please sign in to comment.