Skip to content

Commit

Permalink
sample: sample for workflow ExecuteByQS(http)&& correct typo in Workf…
Browse files Browse the repository at this point in the history
…lowGlobalTransaction class name (#92)

* sample: sample for workflow ExecuteByQS

- Implement wf-crash endpoint to simulate workflow crash
- Implement wf-resume endpoint for workflow http callback
- Modify WorlflowGlobalTransaction.cs to add Exists method for debug only.

* refactor(Dtmworkflow): correct typo in WorkflowGlobalTransaction class name

- Rename WorlflowGlobalTransaction to WorkflowGlobalTransaction
- Update references in ServiceCollectionExtensions, WfTestController, and tests

* test: add unit tests for WorkflowGlobalTransaction.Exists method
  • Loading branch information
wooln authored Dec 27, 2024
1 parent 9926030 commit 2150dcc
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 16 deletions.
85 changes: 83 additions & 2 deletions samples/DtmSample/Controllers/WfTestController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -21,10 +23,10 @@ public class WfTestController : ControllerBase
{

private readonly ILogger<WfTestController> _logger;
private readonly WorlflowGlobalTransaction _globalTransaction;
private readonly WorkflowGlobalTransaction _globalTransaction;
private readonly AppSettings _settings;

public WfTestController(ILogger<WfTestController> logger, IOptions<AppSettings> optionsAccs, WorlflowGlobalTransaction transaction)
public WfTestController(ILogger<WfTestController> logger, IOptions<AppSettings> optionsAccs, WorkflowGlobalTransaction transaction)
{
_logger = logger;
_settings = optionsAccs.Value;
Expand Down Expand Up @@ -253,5 +255,84 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
return Ok(TransResponse.BuildFailureResponse());
}
}


private static readonly string wfNameForResume = "wfNameForResume";

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("wf-crash")]
public async Task<IActionResult> Crash(CancellationToken cancellationToken)
{
if (!_globalTransaction.Exists(wfNameForResume))
{
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
// manual stop application
Environment.Exit(0);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

var req = JsonSerializer.Serialize(new TransRequest("1", -30));
await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true);

return Ok(TransResponse.BuildSucceedResponse());
}

[HttpPost("wf-resume")]
public async Task<IActionResult> WfResume(CancellationToken cancellationToken)
{
try
{
if (!_globalTransaction.Exists(wfNameForResume))
{
// register again after manual crash by Environment.Exit(0);
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

// prepared call ExecuteByQS
using var bodyMemoryStream = new MemoryStream();
await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken);
byte[] bytes = bodyMemoryStream.ToArray();
string body = Encoding.UTF8.GetString(bytes);
_logger.LogDebug($"body: {body}");

await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray());

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Workflow Error");
return Ok(TransResponse.BuildFailureResponse());
}
}
}
}
2 changes: 1 addition & 1 deletion samples/DtmSample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void ConfigureServices(IServiceCollection services)
dtm.SqlDbType = Configuration.GetValue<string>("AppSettings:SqlDbType");
dtm.BarrierSqlTableName = Configuration.GetValue<string>("AppSettings:BarrierSqlTableName");
dtm.DtmGrpcUrl = Configuration.GetValue<string>("AppSettings:DtmGrpcUrl");
dtm.HttpCallback = "";
dtm.HttpCallback = $"{Configuration.GetValue<string>("AppSettings:BusiUrl")}/wf-resume";
dtm.GrpcCallback = "";
});

Expand Down
4 changes: 2 additions & 2 deletions src/Dtmworkflow/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
services.AddDtmGrpc(setupAction);

services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorlflowGlobalTransaction>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

return services;
}
Expand All @@ -32,7 +32,7 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
services.AddDtmGrpc(configuration, sectionName);

services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorlflowGlobalTransaction>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

namespace Dtmworkflow
{
public class WorlflowGlobalTransaction
public class WorkflowGlobalTransaction
{
private readonly Dictionary<string, WfItem> _handlers;
private readonly IWorkflowFactory _workflowFactory;
private readonly ILogger _logger;

public WorlflowGlobalTransaction(IWorkflowFactory workflowFactory, ILoggerFactory loggerFactory)
public WorkflowGlobalTransaction(IWorkflowFactory workflowFactory, ILoggerFactory loggerFactory)
{
this._handlers = new Dictionary<string, WfItem>();
this._workflowFactory = workflowFactory;
this._logger = loggerFactory.CreateLogger<WorlflowGlobalTransaction>();
this._logger = loggerFactory.CreateLogger<WorkflowGlobalTransaction>();
}

public async Task<byte[]> Execute(string name, string gid, byte[] data, bool isHttp = true)
Expand Down Expand Up @@ -51,7 +51,7 @@ public void Register(string name, WfFunc2 handler, params Action<Workflow>[] cus
Custom = custom.ToList()
});
}

#if NET5_0_OR_GREATER
public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query, byte[] body)
{
Expand All @@ -61,5 +61,12 @@ public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query,
await Execute(op, gid, body, true);
}
#endif

#if DEBUG // for sample only
public bool Exists(string name)
{
return this._handlers.ContainsKey(name);
}
#endif
}
}
38 changes: 38 additions & 0 deletions tests/Dtmworkflow.Tests/WorkflowGlobalTransactionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Dtmcli;
using Dtmgrpc;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;

namespace Dtmworkflow.Tests;

public class WorkflowGlobalTransactionTest
{
#if DEBUG
[Fact]
public void Exists()
{
var factory = new Mock<IWorkflowFactory>();
var wf = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

Assert.Throws<System.ArgumentNullException>(() => wf.Exists(null));
Assert.False(wf.Exists(string.Empty));
Assert.False(wf.Exists("my-wf1"));
Assert.False(wf.Exists("my-wf2"));
Assert.False(wf.Exists("my-wf3"));

wf.Register("my-wf1", (workflow, data) => null);
wf.Register("my-wf2", (workflow, data) => null);

Assert.Throws<System.ArgumentNullException>(() => wf.Exists(null));
Assert.False(wf.Exists(string.Empty));
Assert.True(wf.Exists("my-wf1"));
Assert.True(wf.Exists("my-wf2"));
Assert.False(wf.Exists("my-wf3"));

var wf2 = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
Assert.False(wf2.Exists("my-wf1"));
Assert.False(wf2.Exists("my-wf2"));
Assert.False(wf2.Exists("my-wf3"));
}
#endif
}
14 changes: 7 additions & 7 deletions tests/Dtmworkflow.Tests/WorkflowHttpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async void Execute_Should_Succeed_When_PWF_Succeed()

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Should_Succeed_When_PWF_Succeed);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -56,7 +56,7 @@ public async void Execute_Should_Throw_DtmFailureException_When_PWF_Failed()

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Should_Throw_DtmFailureException_When_PWF_Failed);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -90,7 +90,7 @@ public async void Execute_Should_Succeed_When_PWF_Submitted_And_Progress_Not_Fai

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Should_Succeed_When_PWF_Submitted_And_Progress_Not_Failed);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -125,7 +125,7 @@ public async void Execute_Should_ThrowException_When_WfFunc2_ThrowException()

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Should_ThrowException_When_WfFunc2_ThrowException);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -159,7 +159,7 @@ public async void Execute_Should_Return_Null_When_WfFunc2_ThrowDtmFailureExcepti

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Should_Return_Null_When_WfFunc2_ThrowDtmFailureException);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -192,7 +192,7 @@ public async void Rollback_Should_Be_Executed()

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Rollback_Should_Be_Executed);
var gid = Guid.NewGuid().ToString("N");
Expand Down Expand Up @@ -232,7 +232,7 @@ public async void Commit_Should_Be_Executed()

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorlflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Commit_Should_Be_Executed);
var gid = Guid.NewGuid().ToString("N");
Expand Down

0 comments on commit 2150dcc

Please sign in to comment.