Skip to content

Commit

Permalink
test(DtmSample): add tests for workflow return values and fix null ha…
Browse files Browse the repository at this point in the history
…ndling

- Add new unit test and sample for different workflow return
- Fix null value handling for the twice execute
- Update workflow execution to handle null and empty byte arrays consistently
  • Loading branch information
wooln committed Dec 27, 2024
1 parent 9926030 commit 563015e
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 7 deletions.
48 changes: 48 additions & 0 deletions samples/DtmSample/Controllers/WfTestController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Exception = System.Exception;

namespace DtmSample.Controllers
{
Expand Down Expand Up @@ -65,6 +67,52 @@ public async Task<IActionResult> Simple(CancellationToken cancellationToken)
}
}

[HttpPost("wf-twice")]
public async Task<IActionResult> SimpleTwice(CancellationToken cancellationToken)
{
try
{
string wfNameReturnNormal = $"wfNameReturnNormal-{Guid.NewGuid().ToString("N")[..8]}";
_globalTransaction.Register(wfNameReturnNormal, async (wf, data) => await Task.FromResult(Encoding.UTF8.GetBytes("my result")));
string wfNameReturnEmpty = $"wfNameReturnEmpty-{Guid.NewGuid().ToString("N")[..8]}";
_globalTransaction.Register(wfNameReturnEmpty, async (wf, data) => await Task.FromResult(Encoding.UTF8.GetBytes("")));
string wfNameReturnNull = $"wfNameReturnNull-{Guid.NewGuid().ToString("N")[..8]}";
_globalTransaction.Register(wfNameReturnNull, (wf, data) => Task.FromResult<byte[]>(null));

string req = JsonSerializer.Serialize(new TransRequest("1", -30));

string gid;
byte[] result1, result2;
string resultStr1, resultStr2;
gid = wfNameReturnNormal + " " + Guid.NewGuid().ToString("N");
result1 = await _globalTransaction.Execute(wfNameReturnNormal, gid, Encoding.UTF8.GetBytes(req), true);
result2 = await _globalTransaction.Execute(wfNameReturnNormal, gid, Encoding.UTF8.GetBytes(req), true);
resultStr1 = Encoding.UTF8.GetString(result1);
resultStr2 = Encoding.UTF8.GetString(result2);
if ("my result" != resultStr1) throw new Exception("\"my result\" != resultStr1");
if (resultStr1 != resultStr2) throw new Exception("resultStr1 != resultStr2");

gid = wfNameReturnEmpty + " " + Guid.NewGuid().ToString("N");
result1 = await _globalTransaction.Execute(wfNameReturnEmpty, gid, Encoding.UTF8.GetBytes(req), true);
result2 = await _globalTransaction.Execute(wfNameReturnEmpty, gid, Encoding.UTF8.GetBytes(req), true);
if (null != result1) throw new Exception("String.Empty != resultStr1");
if (result1 != result2) throw new Exception("resultStr1 != resultStr2");

gid = wfNameReturnNull + " " + Guid.NewGuid().ToString("N");
result1 = await _globalTransaction.Execute(wfNameReturnNull, gid, Encoding.UTF8.GetBytes(req), true);
result2 = await _globalTransaction.Execute(wfNameReturnNull, gid, Encoding.UTF8.GetBytes(req), true);
if (null != result1) throw new Exception("String.Empty != resultStr1");
if (result1 != result2) throw new Exception("resultStr1 != resultStr2");

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Workflow Error");
return Ok(TransResponse.BuildFailureResponse());
}
}

[HttpPost("wf-saga")]
public async Task<IActionResult> Saga(CancellationToken cancellationToken)
{
Expand Down
14 changes: 10 additions & 4 deletions src/Dtmworkflow/Workflow.Imp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
var status = reply.Transaction.Status;
if (status == DtmCommon.Constant.StatusSucceed)
{
var sRes = Convert.FromBase64String(reply.Transaction.Result);
var sRes = reply.Transaction.Result != null
? Convert.FromBase64String(reply.Transaction.Result)
: null;
return sRes;
}
else if (status == DtmCommon.Constant.StatusFailed)
Expand Down Expand Up @@ -75,11 +77,15 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
}

if (err == null || err is DtmCommon.DtmFailureException)
{
{
// The DTM server does not distinguish between byte[0] and null,
// when execute the second time (eg. executeByQs), the previous query result returned is null.
await this.Submit(res, err, default);
}

return res;

// Before the DTM server side (v1.18.0) distinguish between byte[0] and null,
// in order to be compatible with the same return values for twice call, byte[] has been uniformly changed to return null.
return (res != null && res.Length == 0) ? null : res;
}

private async Task SaveResult(string branchId, string op, StepResult sr)
Expand Down
112 changes: 109 additions & 3 deletions tests/Dtmworkflow.Tests/WorkflowHttpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,114 @@ public async void Commit_Should_Be_Executed()
rollBackFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Never);
commitFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Once);
}

[Fact]
public async Task Execute_Result_Should_Be_WfFunc2()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusPrepared, null);
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Result_Should_Be_WfFunc2);
var gid = Guid.NewGuid().ToString("N");

wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));

var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);

Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Result_Should_Be_Previous()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, "return value from previous");
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Result_Should_Be_Previous);
var gid = Guid.NewGuid().ToString("N");

wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));

var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);

Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Again_Result_Should_Be_Previous()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient1 = new Mock<IDtmClient>();
var httpClient2 = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

// first
SetupPrepareWorkflow(httpClient1, DtmCommon.Constant.StatusPrepared, null);
var wf = SetupWorkFlow(httpClient1, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfName = nameof(Execute_Again_Result_Should_Be_Previous);
var gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));

// again
SetupPrepareWorkflow(httpClient2, DtmCommon.Constant.StatusSucceed, "return value from previous");
wf = SetupWorkFlow(httpClient2, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Again_Result_StringEmpty()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

// again
SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, null);
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfName = nameof(Execute_Again_Result_StringEmpty);
var gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Null(res);
}

private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string result, List<DtmProgressDto> progressDtos = null)
private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string? result, List<DtmProgressDto> progressDtos = null)

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

Cannot convert null literal to non-nullable reference type.
{
var httpResp = new HttpResponseMessage(HttpStatusCode.OK);
httpResp.Content = new StringContent(JsonSerializer.Serialize(
Expand All @@ -265,9 +371,9 @@ private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, st
Transaction = new DtmTransactionDto
{
Status = status,
Result = Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
Result = result == null ? null : Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
},
Progresses = progressDtos
Progresses = progressDtos ?? []
}));
httpClient.Setup(x => x.PrepareWorkflow(It.IsAny<DtmCommon.TransBase>(), It.IsAny<CancellationToken>())).Returns(Task.FromResult(httpResp));
}
Expand Down

0 comments on commit 563015e

Please sign in to comment.