Skip to content

Commit

Permalink
LogStore work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Nov 9, 2024
1 parent a324462 commit 3c33df3
Show file tree
Hide file tree
Showing 18 changed files with 644 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore
public ICorrelationStore CorrelationStore => _inner.CorrelationStore;
public Utilities Utilities => _inner.Utilities;
public IMigrator Migrator => _inner.Migrator;
public ILogStore LogStore => _inner.LogStore;
public Task Initialize() => _inner.Initialize();

public Task<bool> CreateFunction(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;

[TestClass]
public class LogStoreTests : TestTemplates.LogStoreTests
{
[TestMethod]
public override Task SunshineScenarioTest()
=> SunshineScenarioTest(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Shouldly;

namespace Cleipnir.ResilientFunctions.Tests.TestTemplates;

public abstract class LogStoreTests
{
public abstract Task SunshineScenarioTest();
protected async Task SunshineScenarioTest(Task<IFunctionStore> storeTask)
{
var logStore = (await storeTask).LogStore;
var storedId = TestStoredId.Create();

var entries = await logStore.GetEntries(storedId);
entries.ShouldBeEmpty();

var owner1 = new Owner(1);
var msg1 = "hallo world".ToUtf8Bytes();
var position1 = await logStore.Append(storedId, msg1, owner1);
var msg2 = "hallo again".ToUtf8Bytes();
var position2 = await logStore.Append(storedId, msg2, owner1);
var owner2 = new Owner(2);
var msg3 = "hallo from owner2".ToUtf8Bytes();
var position3 = await logStore.Append(storedId, msg3, owner2);

entries = await logStore.GetEntries(storedId);
entries.Count.ShouldBe(3);

entries[0].Position.ShouldBe(position1);
entries[0].Owner.ShouldBe(owner1);
entries[0].Content.ShouldBe(msg1);

entries[1].Position.ShouldBe(position2);
entries[1].Owner.ShouldBe(owner1);
entries[1].Content.ShouldBe(msg2);

entries[2].Position.ShouldBe(position3);
entries[2].Owner.ShouldBe(owner2);
entries[2].Content.ShouldBe(msg3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class CrashableFunctionStore : IFunctionStore
public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore;
public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities;
public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator;
public ILogStore LogStore => _crashed ? throw new TimeoutException() : _inner.LogStore;

public CrashableFunctionStore(IFunctionStore inner)
{
Expand Down
6 changes: 6 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/Helpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Cleipnir.ResilientFunctions.Storage;

internal static class Helpers
{
public static int ToInt(this string value) => int.Parse(value);
}
1 change: 1 addition & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface IFunctionStore
public ICorrelationStore CorrelationStore { get; }
public Utilities Utilities { get; }
public IMigrator Migrator { get; }
public ILogStore LogStore { get; }
public Task Initialize();

Task<bool> CreateFunction(
Expand Down
22 changes: 22 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/ILogStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.Storage;

public interface ILogStore
{
public Task<Position> Update(StoredId id, Position position, byte[] content, Owner owner);
public Task Delete(StoredId id, Position position);
public Task<Position> Append(StoredId id, byte[] content, Owner owner);
public Task<IReadOnlyList<Position>> Append(StoredId id, IReadOnlyList<Tuple<Owner, Content>> contents);
public Task<IReadOnlyList<StoredLogEntry>> GetEntries(StoredId id);
public Task<IReadOnlyList<StoredLogEntry>> GetEntries(StoredId id, Position offset);
public Task<MaxPositionAndEntries?> GetEntries(StoredId id, Position offset, Owner owner);
}

public record MaxPositionAndEntries(Position MaxPosition, IReadOnlyList<StoredLogEntry> Entries);
public record StoredLogEntry(Owner Owner, Position Position, byte[] Content);
public record Owner(int Value);
public record Position(string Value);
public record Content(byte[] Value);
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore
public Utilities Utilities { get; }

public IMigrator Migrator { get; } = new InMemoryMigrator();
public ILogStore LogStore { get; } = new InMemoryLogStore();

public Task Initialize() => Task.CompletedTask;

Expand Down
103 changes: 103 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryLogStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;

namespace Cleipnir.ResilientFunctions.Storage;

public class InMemoryLogStore : ILogStore
{
private record LogState(Owner Owner, byte[] Content);

private readonly Dictionary<StoredId, Dictionary<Position, LogState>> _logStates = new();
private readonly object _sync = new();

public Task<Position> Update(StoredId id, Position position, byte[] content, Owner owner)
{
lock (_sync)
GetDictionary(id)[position] = new LogState(owner, content);

return position.ToTask();
}

public Task Delete(StoredId id, Position position)
{
lock (_sync)
GetDictionary(id).Remove(position);

return Task.CompletedTask;
}

public Task<Position> Append(StoredId id, byte[] content, Owner owner)
{
lock (_sync)
{
var dict = GetDictionary(id);
var position = dict.Count == 0
? new Position("0")
: new Position((dict.Keys.Select(k => int.Parse(k.Value)).Max() + 1).ToString());

dict[position] = new LogState(owner, content);
return position.ToTask();
}
}

public Task<IReadOnlyList<Position>> Append(StoredId id, IReadOnlyList<Tuple<Owner, Content>> contents)
{
return contents
.Select(tuple => Append(id, tuple.Item2.Value, tuple.Item1).Result)
.ToList()
.CastTo<IReadOnlyList<Position>>()
.ToTask();
}

public Task<IReadOnlyList<StoredLogEntry>> GetEntries(StoredId id)
{
lock (_sync)
return GetDictionary(id)
.Select(kv => new { Position = kv.Key, Content = kv.Value.Content, Owner = kv.Value.Owner })
.Select(a => new StoredLogEntry(a.Owner, a.Position, a.Content))
.ToList()
.CastTo<IReadOnlyList<StoredLogEntry>>()
.ToTask();
}

public Task<IReadOnlyList<StoredLogEntry>> GetEntries(StoredId id, Position offset)
{
return GetEntries(id)
.Result
.Where(e => int.Parse(e.Position.Value) > int.Parse(offset.Value))
.ToList()
.CastTo<IReadOnlyList<StoredLogEntry>>()
.ToTask();
}

public Task<MaxPositionAndEntries?> GetEntries(StoredId id, Position offset, Owner owner)
{
lock (_sync)
{
var allEntries = GetEntries(id).Result;
if (allEntries.Count == 0)
return default(MaxPositionAndEntries).ToTask();

var entries = allEntries
.Where(e => e.Owner == owner)
.Where(e => int.Parse(e.Position.Value) > int.Parse(offset.Value))
.ToList()
.CastTo<IReadOnlyList<StoredLogEntry>>();

var maxPosition = entries.Max(e => e.Position);
return new MaxPositionAndEntries(maxPosition!, entries).CastTo<MaxPositionAndEntries?>().ToTask();
}
}

private Dictionary<Position, LogState> GetDictionary(StoredId id)
{
lock (_sync)
if (!_logStates.TryGetValue(id, out var logState))
return _logStates[id] = new Dictionary<Position, LogState>();
else
return logState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
[assembly: InternalsVisibleTo("Cleipnir.Flows.Sample.Presentation.AspNet")]
[assembly: InternalsVisibleTo("Cleipnir.Flows.Sample.Presentation")]

[assembly: InternalsVisibleTo("Cleipnir.ResilientFunctions.MySQL")]
[assembly: InternalsVisibleTo("Cleipnir.ResilientFunctions.MySQL")]
[assembly: InternalsVisibleTo("Cleipnir.ResilientFunctions.PostgreSQL")]
1 change: 1 addition & 0 deletions Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class CrashableFunctionStore : IFunctionStore
public ICorrelationStore CorrelationStore => _inner.CorrelationStore;
public Utilities Utilities => _inner.Utilities;
public IMigrator Migrator => _inner.Migrator;
public ILogStore LogStore => _inner.LogStore;

public CrashableFunctionStore(IFunctionStore inner) => _inner = inner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class MySqlFunctionStore : IFunctionStore
public ICorrelationStore CorrelationStore => _correlationStore;

public IMigrator Migrator => _migrator;
public ILogStore LogStore => throw new NotImplementedException();
private readonly MySqlMigrator _migrator;

public Utilities Utilities { get; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests;

[TestClass]
public class LogStoreTests : ResilientFunctions.Tests.TestTemplates.LogStoreTests
{
[TestMethod]
public override Task SunshineScenarioTest()
=> SunshineScenarioTest(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class PostgreSqlFunctionStore : IFunctionStore

private readonly ICorrelationStore _correlationStore;
public ICorrelationStore CorrelationStore => _correlationStore;

private readonly PostgresSqlLogStore _logStore;
public ILogStore LogStore => _logStore;

public Utilities Utilities { get; }
public IMigrator Migrator => _migrator;
private readonly PostgreSqlMigrator _migrator;
Expand All @@ -45,6 +49,7 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "")
_effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName);
_timeoutStore = new PostgreSqlTimeoutStore(connectionString, _tableName);
_correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName);
_logStore = new PostgresSqlLogStore(connectionString, _tableName);
_typeStore = new PostgreSqlTypeStore(connectionString, _tableName);
_postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName);
_migrator = new PostgreSqlMigrator(connectionString, _tableName);
Expand All @@ -70,6 +75,7 @@ public async Task Initialize()
await _effectsStore.Initialize();
await _timeoutStore.Initialize();
await _correlationStore.Initialize();
await _logStore.Initialize();
await _typeStore.Initialize();
await using var conn = await CreateConnection();
_initializeSql ??= $@"
Expand Down Expand Up @@ -110,6 +116,7 @@ public async Task TruncateTables()
await _effectsStore.Truncate();
await _correlationStore.Truncate();
await _typeStore.Truncate();
await _logStore.Truncate();

await using var conn = await CreateConnection();
_truncateTableSql ??= $"TRUNCATE TABLE {_tableName}";
Expand Down
Loading

0 comments on commit 3c33df3

Please sign in to comment.