Skip to content

Commit

Permalink
Improved samples
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Jan 5, 2025
1 parent ef998d4 commit 836e5bf
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Cleipnir.Flows.Sample.MicrosoftOpen.Clients;
public interface ILogisticsClient
{
Task<TrackAndTrace> ShipProducts(Guid customerId, IEnumerable<Guid> productIds);
Task CancelShipment(TrackAndTrace trackAndTrace);
}

public record TrackAndTrace(string Value);
Expand All @@ -18,4 +19,12 @@ public Task<TrackAndTrace> ShipProducts(Guid customerId, IEnumerable<Guid> produ
return new TrackAndTrace(Guid.NewGuid().ToString());
}
);

public Task CancelShipment(TrackAndTrace trackAndTrace)
=> Task.Delay(ClientSettings.Delay).ContinueWith(_ =>
{
Log.Logger.ForContext<ILogisticsClient>().Information("LOGISTICS_SERVER: Products shipment cancelled");
return new TrackAndTrace(Guid.NewGuid().ToString());
}
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public interface IPaymentProviderClient
Task Reserve(Guid transactionId, Guid customerId, decimal amount);
Task Capture(Guid transactionId);
Task CancelReservation(Guid transactionId);
Task Reverse(Guid transactionId);
bool IsServiceDown();
}

Expand All @@ -26,5 +27,10 @@ public Task CancelReservation(Guid transactionId)
Log.Logger.ForContext<IPaymentProviderClient>().Information("PAYMENT_PROVIDER: Reservation cancelled")
);

public Task Reverse(Guid transactionId)
=> Task.Delay(ClientSettings.Delay).ContinueWith(_ =>
Log.Logger.ForContext<IPaymentProviderClient>().Information("PAYMENT_PROVIDER: Reservation reversed")
);

public bool IsServiceDown() => false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Cleipnir.ResilientFunctions.Helpers;

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.BankTransfer;

public interface IBankCentralClient
{
Task PostTransaction(Guid transactionId, string account, decimal amount);
Task<decimal> GetAvailableFunds(string account);
}

public class BankCentralClient : IBankCentralClient
{
public Task PostTransaction(Guid transactionId, string account, decimal amount)
{
Console.WriteLine($"POSTING: {amount} to {account} account");
return Task.Delay(1_000).ContinueWith(_ => true);
}

public Task<decimal> GetAvailableFunds(string account) => 100M.ToTask();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.BankTransfer;

public record Transfer(
Guid TransactionId,
string FromAccount,
string ToAccount,
decimal Amount
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Cleipnir.ResilientFunctions.Domain;

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.BankTransfer;

[GenerateFlows]
public class TransferFlow(IBankCentralClient bankCentralClient) : Flow<Transfer>
{
public override async Task Run(Transfer transfer)
{
var availableFunds = await bankCentralClient.GetAvailableFunds(transfer.FromAccount);
if (availableFunds <= transfer.Amount)
throw new InvalidOperationException("Insufficient funds on from account");

await bankCentralClient.PostTransaction(
transfer.TransactionId,
transfer.FromAccount,
-transfer.Amount
);

await bankCentralClient.PostTransaction(
transfer.TransactionId,
transfer.ToAccount,
transfer.Amount
);
}

private DistributedSemaphore DistributedLock(string account)
=> Workflow.Semaphores.Create("BankTransfer", account, maximumCount: 1);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.Diagnostics;
using Cleipnir.Flows.Sample.MicrosoftOpen.Clients;
using Cleipnir.ResilientFunctions;
using Cleipnir.ResilientFunctions.Domain;

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.Batch;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.MessageDriven.Other;

public class Bus(Solution.MessageDrivenOrderFlows flows)
public class Bus(MessageDrivenOrderFlows flows)
{
private readonly List<Func<EventsAndCommands, Task>> _subscribers = new();
private readonly object _lock = new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public static void AddInMemoryBus(this IServiceCollection services)
{
services.AddSingleton<Bus>(p =>
{
var orderFlows = p.GetRequiredService<Solution.MessageDrivenOrderFlows>();
var orderFlows = p.GetRequiredService<MessageDrivenOrderFlows>();
var bus = new Bus(orderFlows);

var emailService = new EmailServiceStub(bus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.MessageDriven.Solution;

[GenerateFlows]
public class MessageDrivenOrderFlow(Bus bus) : Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = await Effect.Capture(Guid.NewGuid);
var transactionId = await Capture(Guid.NewGuid);

await ReserveFunds(order, transactionId);
var reservation = await Message<FundsReserved, FundsReservationFailed>(TimeSpan.FromSeconds(10));
Expand All @@ -34,17 +33,17 @@ private async Task CleanUp(FailedAt failedAt, Order order, Guid transactionId)
switch (failedAt)
{
case FailedAt.FundsReserved:
await CancelFundsReservation(order, transactionId);
break;
case FailedAt.ProductsShipped:
await CancelFundsReservation(order, transactionId);
await CancelProductsShipment(order);
break;
case FailedAt.FundsCaptured:
await CancelFundsReservation(order, transactionId);
await CancelProductsShipment(order);
break;
case FailedAt.OrderConfirmationEmailSent:
//we accept this failure without cleaning up
await ReversePayment(order, transactionId);
await CancelProductsShipment(order);
break;
default:
throw new ArgumentOutOfRangeException(nameof(failedAt), failedAt, null);
Expand Down Expand Up @@ -73,4 +72,6 @@ private Task CancelProductsShipment(Order order)
=> Capture(() => bus.Send(new CancelProductsShipment(order.OrderId)));
private Task CancelFundsReservation(Order order, Guid transactionId)
=> Capture(() => bus.Send(new CancelFundsReservation(order.OrderId, transactionId)));
private Task ReversePayment(Order order, Guid transactionId)
=> Capture(() => bus.Send(new ReverseTransaction(order.OrderId, transactionId)));
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cleipnir.Flows.Sample.MicrosoftOpen.Clients;
using Cleipnir.Flows.Sample.MicrosoftOpen.Flows.MessageDriven;
using Polly;
using Polly.Retry;

Expand All @@ -20,7 +21,7 @@ public override async Task Run(Order order)
await paymentProviderClient.Capture(transactionId);
await emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace, order.ProductIds);
}

#region Polly

private ResiliencePipeline Pipeline { get; } = new ResiliencePipelineBuilder()
Expand All @@ -36,4 +37,41 @@ public override async Task Run(Order order)
).Build();

#endregion

#region CleanUp

private async Task CleanUp(FailedAt failedAt, Guid transactionId, TrackAndTrace? trackAndTrace)
{
switch (failedAt)
{
case FailedAt.FundsReserved:
break;
case FailedAt.ProductsShipped:
await paymentProviderClient.CancelReservation(transactionId);
break;
case FailedAt.FundsCaptured:
await paymentProviderClient.Reverse(transactionId);
await logisticsClient.CancelShipment(trackAndTrace!);
break;
case FailedAt.OrderConfirmationEmailSent:
//we accept this failure without cleaning up
break;
default:
throw new ArgumentOutOfRangeException(nameof(failedAt), failedAt, null);
}

throw new OrderProcessingException($"Order processing failed at: '{failedAt}'");
}

private record StepAndCleanUp(Func<Task> Work, Func<Task> CleanUp);

private enum FailedAt
{
FundsReserved,
ProductsShipped,
FundsCaptured,
OrderConfirmationEmailSent,
}

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.Rpc.Solution;

[GenerateFlows]
public class OrderFlow(
IPaymentProviderClient paymentProviderClient,
IEmailClient emailClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Cleipnir.Flows.Sample.MicrosoftOpen.Clients;

namespace Cleipnir.Flows.Sample.MicrosoftOpen.Flows.Rpc.Solution;

public class OrderFlowWithCleanUp(
IPaymentProviderClient paymentProviderClient,
IEmailClient emailClient,
ILogisticsClient logisticsClient)
: Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = Guid.NewGuid();

TrackAndTrace? trackAndTrace = null;
var steps = new StepAndCleanUp[]
{
new(
Work: () => paymentProviderClient.Reserve(transactionId, order.CustomerId, order.TotalPrice),
CleanUp: () => CleanUp(FailedAt.ProductsShipped, transactionId, trackAndTrace: null)
),
new(
Work: async () =>
{
trackAndTrace = await Capture(async () =>
await logisticsClient.ShipProducts(order.CustomerId, order.ProductIds)
);
},
CleanUp: () => CleanUp(FailedAt.ProductsShipped, transactionId, trackAndTrace: null)
),
new(
Work: () => paymentProviderClient.Capture(transactionId),
CleanUp: () => CleanUp(FailedAt.FundsCaptured, transactionId, trackAndTrace)
),
new (
Work: () => emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace!, order.ProductIds),
CleanUp: () => CleanUp(FailedAt.OrderConfirmationEmailSent, transactionId, trackAndTrace)
)
};

foreach (var step in steps)
try
{
await step.Work();
}
catch (Exception)
{
await step.CleanUp();
throw;
}
}

private async Task CleanUp(FailedAt failedAt, Guid transactionId, TrackAndTrace? trackAndTrace)
{
switch (failedAt)
{
case FailedAt.FundsReserved:
break;
case FailedAt.ProductsShipped:
await paymentProviderClient.CancelReservation(transactionId);
break;
case FailedAt.FundsCaptured:
await paymentProviderClient.Reverse(transactionId);
await logisticsClient.CancelShipment(trackAndTrace!);
break;
case FailedAt.OrderConfirmationEmailSent:
//we accept this failure without cleaning up
break;
default:
throw new ArgumentOutOfRangeException(nameof(failedAt), failedAt, null);
}
}

private record StepAndCleanUp(Func<Task> Work, Func<Task> CleanUp);

private enum FailedAt
{
FundsReserved,
ProductsShipped,
FundsCaptured,
OrderConfirmationEmailSent,
}
}

0 comments on commit 836e5bf

Please sign in to comment.