Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 7, 2024
1 parent 0125145 commit 067ffbd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 44 deletions.
2 changes: 1 addition & 1 deletion samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ module Cosmos =
// - In hot-warm scenarios, the Archive Container will frequently be within the same account and hence can share a CosmosClient
// For these typical purposes, CosmosStoreClient.Connect should be used to establish the Client, not custom wiring as we have here
let createConnector (a: Arguments) connectionString =
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Timeout, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode)
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode, timeout = a.Timeout)
let connect (log: ILogger) (a: Arguments) =
let primaryConnector, primaryDatabase, primaryContainer as primary = createConnector a a.Connection, a.Database, a.Container
logContainer log "Primary" (a.Mode, primaryConnector.Endpoint, primaryDatabase, primaryContainer)
Expand Down
83 changes: 42 additions & 41 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ type EventBody = JsonElement

/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
type Event = // TODO for STJ v5: All fields required unless explicitly optional
type Event =
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: DateTimeOffset // ISO 8601
t: DateTimeOffset // Will be saved ISO 8601 formatted

/// The Case (Event Type); used to drive deserialization
c: string // required
/// The Case (Event Type); used to drive deserialization (Required)
c: string

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB
d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work
d: EventBody // Can be Json Null for Nullary cases

/// Optional metadata, as UTF-8 encoded json, ready to emit directly
m: EventBody // TODO for STJ v5: Optional, not serialized if missing
m: EventBody

/// Optional correlationId
correlationId: string // TODO for STJ v5: Optional, not serialized if missing
correlationId: string

/// Optional causationId
causationId: string } // TODO for STJ v5: Optional, not serialized if missing
causationId: string }
interface IEventData<EventBody> with
member x.EventType = x.c
member x.Data = x.d
Expand All @@ -42,20 +42,20 @@ type Event = // TODO for STJ v5: All fields required unless explicitly optional

/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
[<NoEquality; NoComparison>]
type Batch = // TODO for STJ v5: All fields required unless explicitly optional
type Batch =
{ /// CosmosDB-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// Technically not required if running in single partition mode, but being over simplistic would be more confusing
p: string // "{streamName}"

/// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string
/// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it
/// NB Tip uses a well known value here while it's actively 'open'
/// There's no way to usefully ORDER BY on it; hence we have i shadowing it and use that instead
/// NB Tip uses a well known value ("-1") for the `id`; that document lives for the life of the stream
id: string // "{index}"

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64 // {index}
Expand Down Expand Up @@ -90,25 +90,25 @@ type Unfold =

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
m: EventBody } // TODO for STJ v5: Optional, not serialized if missing
m: EventBody }
// Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering
static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |]

/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
/// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc
[<NoEquality; NoComparison>]
type Tip = // TODO for STJ v5: All fields required unless explicitly optional
type Tip =
{ /// Partition key, as per Batch
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
p: string // "{streamName}"

/// Document Id within partition, as per Batch
id: string // "{-1}" - Well known IdConstant used while this remains the pending batch
id: string // "{-1}" - Well known Id Constant used for the tail document (the only one that get mutated)

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64
Expand Down Expand Up @@ -1162,18 +1162,17 @@ type DiscoveryMode =
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container
type CosmosClientFactory(options) =
static member CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan, serializerOptions) =
[<Obsolete "Will be removed in V5; please use the overload that includes `serializerOptions`">]
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
UseSystemTextJsonSerializerWithOptions = serializerOptions)
[<Obsolete "Will be removed in V5; please use the overload that includes `serializerOptions`">]
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, JsonSerializerOptions.Default)
o.RequestTimeout <- requestTimeout
o
RequestTimeout = requestTimeout,
UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions.Default)
/// Default when rendering/parsing Batch/Tip/Event/Unfold - omitting null values
static member val DefaultJsonSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = Serialization.JsonIgnoreCondition.WhenWritingNull)
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
Expand Down Expand Up @@ -1205,8 +1204,9 @@ type Discovery =
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
( // CosmosDB endpoint/credentials specification.
type CosmosStoreConnector(discovery: Discovery, factory: CosmosClientFactory) =
let discoveryMode = discovery.ToDiscoveryMode()
new(// CosmosDB endpoint/credentials specification.
discovery: Discovery,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
Expand All @@ -1215,30 +1215,31 @@ type CosmosStoreConnector
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// System.Text.Json SerializerOptions (Defaults to default options)
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s
// NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default
[<O; D null>] ?timeout: TimeSpan,
// System.Text.Json SerializerOptions used for rendering Batches, Events and Unfolds internally
// NOTE as Events and/or Unfolds are serialized to `JsonElement`, there should rarely be a need to control the options at this level
[<O; D null>] ?serializerOptions: System.Text.Json.JsonSerializerOptions,
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s
// NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default
[<O; D null>] ?requestTimeout: TimeSpan,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let serializerOptions = serializerOptions |> Option.defaultValue System.Text.Json.JsonSerializerOptions.Default
let factory =
let o = CosmosClientFactory.CreateDefaultOptions(maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, serializerOptions)
let o =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
UseSystemTextJsonSerializerWithOptions = defaultArg serializerOptions CosmosClientFactory.DefaultJsonSerializerOptions)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
requestTimeout |> Option.iter (fun x -> o.RequestTimeout <- x)
timeout |> Option.iter (fun x -> o.RequestTimeout <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o
CosmosStoreConnector(discovery, CosmosClientFactory o)

[<Obsolete "For backcompat only; will be removed in V5">]
new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests,
?mode, ?defaultConsistencyLevel, ?customize) =
CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?defaultConsistencyLevel = defaultConsistencyLevel, requestTimeout = requestTimeout, ?customize = customize,
?serializerOptions = None)
?defaultConsistencyLevel = defaultConsistencyLevel, timeout = requestTimeout, ?customize = customize)

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options
Expand Down
3 changes: 1 addition & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ let discoverConnection () =

let createConnector (log: Serilog.ILogger) =
let name, discovery = discoverConnection ()
let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3.,
maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.)
let connector = CosmosStoreConnector(discovery, 9, TimeSpan.FromMinutes 1.)
log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint)
connector

Expand Down

0 comments on commit 067ffbd

Please sign in to comment.