Skip to content

Commit

Permalink
Feature/arrow dont deserialise (#57)
Browse files Browse the repository at this point in the history
* Expose ArrowBatch from RequestExecutor and use that rather than using ArrowVisitior to decode batches

* converting more collects to collect as arrow

* disable spark 4 build
  • Loading branch information
GoEddie authored Jan 8, 2025
1 parent d66248a commit 079d1b4
Show file tree
Hide file tree
Showing 8 changed files with 614 additions and 115 deletions.
30 changes: 15 additions & 15 deletions .github/workflows/dotnet-desktop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,25 @@ jobs:
path: ./test_results3.xml


- uses: vemonet/setup-spark@v1
with:
spark-url: https://archive.apache.org/dist/spark/spark-4.0.0-preview2/spark-4.0.0-preview2-bin-hadoop3.tgz
spark-version: '4.0.0-preview2'
hadoop-version: '3'
# - uses: vemonet/setup-spark@v1
# with:
# spark-url: https://archive.apache.org/dist/spark/spark-4.0.0-preview2/spark-4.0.0-preview2-bin-hadoop3.tgz
# spark-version: '4.0.0-preview2'
# hadoop-version: '3'

- run: $SPARK_HOME/sbin/stop-connect-server.sh --force
# - run: $SPARK_HOME/sbin/stop-connect-server.sh --force

- run: $SPARK_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.13:4.0.0-preview2
# - run: $SPARK_HOME/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.13:4.0.0-preview2

- name: Execute unit tests Spark 4.0
working-directory: ./src/test/Spark.Connect.Dotnet.Tests/
run: dotnet test -l:"console;verbosity=detailed" --logger "trx;LogFileName=./test_results4.xml" --filter "SparkMinVersion=4"
# - name: Execute unit tests Spark 4.0
# working-directory: ./src/test/Spark.Connect.Dotnet.Tests/
# run: dotnet test -l:"console;verbosity=detailed" --logger "trx;LogFileName=./test_results4.xml" --filter "SparkMinVersion=4"

- name: Upload test results
uses: actions/upload-artifact@v4
with:
name: Test Results
path: ./test_results4.xml
# - name: Upload test results
# uses: actions/upload-artifact@v4
# with:
# name: Test Results
# path: ./test_results4.xml



Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
using Apache.Arrow;
using Apache.Arrow.Ipc;
using Google.Protobuf.Collections;
using Grpc.Core;
using Spark.Connect.Dotnet.Sql;

namespace Spark.Connect.Dotnet.Grpc;



public enum ArrowHandling
{
None = 0,
SlowConvertToDotNet = 1,
ArrowBuffers = 2
}

/// <summary>
/// When we connect to remote spark clusters, including Databricks we can find our tcp connections are closed
Expand All @@ -26,6 +33,7 @@ public class RequestExecutor : IDisposable
{
private readonly SparkSession _session;
private readonly Plan _plan;
private readonly ArrowHandling _arrowHandling;
private readonly GrpcLogger _logger;

private string _operationId = string.Empty;
Expand All @@ -37,6 +45,7 @@ public class RequestExecutor : IDisposable
private Relation? _relation;
private DataType? _schema;
private readonly List<Row> _rows = new ();
private readonly List<RecordBatch> _recordBatches = new();
private StreamingQueryInstanceId? _streamingQueryId;
private StreamingQueryCommandResult.Types.StatusResult? _streamingResultStatus;
private string? _streamingQueryName;
Expand All @@ -51,17 +60,23 @@ private enum RetryableState
}

private RetryableState _retryableState = RetryableState.Processing;

/// <summary>
/// Create the Executor
/// </summary>
/// <param name="session"></param>
/// <param name="plan"></param>
public RequestExecutor(SparkSession session, Plan plan)
/// <param name="arrowHandling"></param>
public RequestExecutor(SparkSession session, Plan plan, ArrowHandling arrowHandling = ArrowHandling.SlowConvertToDotNet)
{
_logger = GetLogger(session);
_session = session;
_plan = plan;
_arrowHandling = arrowHandling;
if (_session.Conf.IsTrue(SparkDotnetKnownConfigKeys.DontDecodeArrow))
{
_arrowHandling = ArrowHandling.None;
}

_relation = plan.Root;
}
Expand Down Expand Up @@ -121,8 +136,8 @@ private async Task<bool> ProcessRequest()
var response = GetResponse();
await response.ResponseStream.MoveNext();
_retryableState = RetryableState.Processing;

while (response.ResponseStream.Current != null)
while (response.ResponseStream is { Current: not null })
{
var current = response.ResponseStream.Current;

Expand Down Expand Up @@ -151,22 +166,15 @@ private async Task<bool> ProcessRequest()
if (current.ArrowBatch != null)
{
_logger.Log(GrpcLoggingLevel.Verbose, "Have Arrow Batch");
var wrapper = new ArrowWrapper();


if (_schema == null)
{
_logger.Log(GrpcLoggingLevel.Verbose, "Cannot decode arrow batch as schema is null");
}
else
{
if (!_session.Conf.IsTrue(SparkDotnetKnownConfigKeys.DontDecodeArrow))
{
_rows.AddRange(await wrapper.ArrowBatchToRows(current.ArrowBatch, _schema));
}
else
{
_logger.Log(GrpcLoggingLevel.Verbose, "Not decoding Arrow as DontDecodeArrow is true");
}
await HandleArrowResponse(current.ArrowBatch);
}
}

Expand Down Expand Up @@ -263,7 +271,29 @@ private async Task<bool> ProcessRequest()

return true;
}


private async Task HandleArrowResponse(ExecutePlanResponse.Types.ArrowBatch arrowBatch)
{
if (_arrowHandling == ArrowHandling.None)
{
_logger.Log(GrpcLoggingLevel.Verbose, "Not decoding Arrow as ArrowHandling is None");
}

if (_arrowHandling == ArrowHandling.SlowConvertToDotNet)
{
var wrapper = new ArrowWrapper();
_rows.AddRange(await wrapper.ArrowBatchToRows(arrowBatch, _schema));
}

if (_arrowHandling == ArrowHandling.ArrowBuffers)
{
var reader = new ArrowStreamReader(new ReadOnlyMemory<byte>(arrowBatch.Data.ToByteArray()));
var recordBatch = await reader.ReadNextRecordBatchAsync();

_recordBatches.Add(recordBatch);
}
}

private AsyncServerStreamingCall<ExecutePlanResponse> GetResponse()
{
if (_operationId == string.Empty)
Expand Down Expand Up @@ -390,6 +420,16 @@ public void Dispose()
/// <returns></returns>
public IList<Row> GetData() => _rows;

public IList<RecordBatch> GetArrowBatches()
{
if (_arrowHandling != ArrowHandling.ArrowBuffers)
{
throw new Exception($"Arrow Batches are not available as you need to set ArrowHandling.ArrowBuffers in the constructor to the RequestExecutor, current value set = '{_arrowHandling}'");
}

return _recordBatches;
}

/// <summary>
/// The schema of the last request (if available)
/// </summary>
Expand Down
Loading

0 comments on commit 079d1b4

Please sign in to comment.