Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Cosmos): Use Azure.Cosmos STJ impl #467

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `Equinox.CosmosStore`: Use `Microsoft.Azure.Cosmos` integrated `System.Text.Json` support; added ability to specify `serializerOptions` [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore`: Group metrics by Container Name [#449](https://github.com/jet/equinox/pull/449)
- `Equinox.CosmosStore`: Group metrics by Category; split out `Tip` activity [#453](https://github.com/jet/equinox/pull/453)
- `Equinox.CosmosStore`: Support Ingesting unfolds [#460](https://github.com/jet/equinox/pull/460)
Expand All @@ -21,6 +22,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448)
- `Equinox.CosmosStore`: Update `System.Text.Json` dep to `6.0.10` per [CVE-2024-43485](https://github.com/advisories/GHSA-8g4q-xg66-9fp4) [#470](https://github.com/jet/equinox/pull/470)
- `Equinox.CosmosStore`: Minimum `Microsoft.Azure.Cosmos` requirement updated to `3.43.0` to avail of integrated `System.Text.Json` support [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore.CosmosStoreConnector`: Removed mandatory `requestTimeout` argument [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.MessageDb`: Up min `Npgsql` to v `7.0.7` as `7.0.0` is on CVE blacklist

### Removed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.35.4`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.0`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
- `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`)
- `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`)
Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ module Cosmos =
// - In hot-warm scenarios, the Archive Container will frequently be within the same account and hence can share a CosmosClient
// For these typical purposes, CosmosStoreClient.Connect should be used to establish the Client, not custom wiring as we have here
let createConnector (a: Arguments) connectionString =
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Timeout, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode)
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode, timeout = a.Timeout)
let connect (log: ILogger) (a: Arguments) =
let primaryConnector, primaryDatabase, primaryContainer as primary = createConnector a a.Connection, a.Database, a.Container
logContainer log "Primary" (a.Mode, primaryConnector.Endpoint, primaryDatabase, primaryContainer)
Expand Down
75 changes: 46 additions & 29 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ type EventBody = JsonElement

/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
type Event = // TODO for STJ v5: All fields required unless explicitly optional
type Event =
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: DateTimeOffset // ISO 8601
t: DateTimeOffset // Will be saved ISO 8601 formatted

/// The Case (Event Type); used to drive deserialization
c: string // required
/// The Case (Event Type); used to drive deserialization (Required)
c: string

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB
d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work
d: EventBody // Can be Json Null for Nullary cases

/// Optional metadata, as UTF-8 encoded json, ready to emit directly
m: EventBody // TODO for STJ v5: Optional, not serialized if missing
m: EventBody

/// Optional correlationId
correlationId: string // TODO for STJ v5: Optional, not serialized if missing
correlationId: string

/// Optional causationId
causationId: string } // TODO for STJ v5: Optional, not serialized if missing
causationId: string }
interface IEventData<EventBody> with
member x.EventType = x.c
member x.Data = x.d
Expand All @@ -42,20 +42,20 @@ type Event = // TODO for STJ v5: All fields required unless explicitly optional

/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
[<NoEquality; NoComparison>]
type Batch = // TODO for STJ v5: All fields required unless explicitly optional
type Batch =
{ /// CosmosDB-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// Technically not required if running in single partition mode, but being over simplistic would be more confusing
p: string // "{streamName}"

/// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string
/// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it
/// NB Tip uses a well known value here while it's actively 'open'
/// There's no way to usefully ORDER BY on it; hence we have i shadowing it and use that instead
/// NB Tip uses a well known value ("-1") for the `id`; that document lives for the life of the stream
id: string // "{index}"

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64 // {index}
Expand Down Expand Up @@ -90,25 +90,25 @@ type Unfold =

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
m: EventBody } // TODO for STJ v5: Optional, not serialized if missing
m: EventBody }
// Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering
static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |]

/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
/// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc
[<NoEquality; NoComparison>]
type Tip = // TODO for STJ v5: All fields required unless explicitly optional
type Tip =
{ /// Partition key, as per Batch
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
p: string // "{streamName}"

/// Document Id within partition, as per Batch
id: string // "{-1}" - Well known IdConstant used while this remains the pending batch
id: string // "{-1}" - Well known Id Constant used for the tail document (the only one that get mutated)

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64
Expand Down Expand Up @@ -1161,14 +1161,17 @@ type DiscoveryMode =
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container
type CosmosClientFactory(options) =
[<Obsolete "Will be removed in V5; please use the overload that includes `serializerOptions`">]
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions())
/// Default when rendering/parsing Batch/Tip/Event/Unfold - omitting null values
static member val DefaultJsonSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = Serialization.JsonIgnoreCondition.WhenWritingNull)
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
Expand Down Expand Up @@ -1200,28 +1203,42 @@ type Discovery =
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
( // CosmosDB endpoint/credentials specification.
type CosmosStoreConnector(discovery: Discovery, factory: CosmosClientFactory) =
let discoveryMode = discovery.ToDiscoveryMode()
new(// CosmosDB endpoint/credentials specification.
discovery: Discovery,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s
// NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default
[<O; D null>] ?timeout: TimeSpan,
// System.Text.Json SerializerOptions used for rendering Batches, Events and Unfolds internally
// NOTE as Events and/or Unfolds are serialized to `JsonElement`, there should rarely be a need to control the options at this level
[<O; D null>] ?serializerOptions: System.Text.Json.JsonSerializerOptions,
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let factory =
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
let o =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
UseSystemTextJsonSerializerWithOptions = defaultArg serializerOptions CosmosClientFactory.DefaultJsonSerializerOptions)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
timeout |> Option.iter (fun x -> o.RequestTimeout <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o
CosmosStoreConnector(discovery, CosmosClientFactory o)

[<Obsolete "For backcompat only; will be removed in V5">]
new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests,
?mode, ?defaultConsistencyLevel, ?customize) =
CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?defaultConsistencyLevel = defaultConsistencyLevel, timeout = requestTimeout, ?customize = customize)

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options
Expand Down
1 change: 1 addition & 0 deletions src/Equinox.CosmosStore/CosmosStoreSerialization.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module JsonElement =
if value.ValueKind = JsonValueKind.Null then value
else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement

[<System.Obsolete "Unused internal type; will be removed in V5">]
type CosmosJsonSerializer(options : JsonSerializerOptions) =
inherit Microsoft.Azure.Cosmos.CosmosSerializer()

Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.43.1" />
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update all to max 3.43 ver

<PackageReference Include="System.Text.Json" Version="6.0.10" />
</ItemGroup>

Expand Down
3 changes: 1 addition & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ let discoverConnection () =

let createConnector (log: Serilog.ILogger) =
let name, discovery = discoverConnection ()
let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3.,
maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.)
let connector = CosmosStoreConnector(discovery, 9, TimeSpan.FromMinutes 1.)
log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint)
connector

Expand Down