Skip to content

Commit

Permalink
Update doc to refer to Propulsion.Tool facilities
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 2, 2019
1 parent 8e57077 commit 7728239
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 93 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added
### Changed

- Updated README.md to refer to `propulsion init` and `propulsion project` (formerly `eqx` `initAux` and `project`) (jet/propulsion#17)[https://github.com/jet/propulsion/pull/17]
- `eqx project` now uses environment variables `PROPULSION_KAFKA_`* instead of `EQUINOX_`* [#143](https://github.com/jet/equinox/pull/143)

### Removed

- `eqx project` - `ChangeFeedProcessor` and Kafka support - All projection management logic now lives in the `Propulsion` libraries [#138](https://github.com/jet/equinox/pull/138)
- `eqx initAux` - now `propulsion init` (jet/propulsion#17)[https://github.com/jet/propulsion/pull/17]

### Fixed

Expand Down
45 changes: 0 additions & 45 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,51 +734,6 @@ match res with
| c -> failwithf "conflict %A" c
```

# Projectors

See [this medium post regarding some patterns used at Jet in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8) for a broad overview of the reasons one might consider employing a projection system.

# `Equinox.Cosmos` Projection facility

An integral part of the `Equinox.Cosmos` featureset is the ability to project events based on the [Azure DocumentDb ChangeFeed mechanism](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed). Key elements involved in realizing this are:
- the [storage model needs to be designed in such a way that the aforementioned processor can do its job efficiently](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#cosmos-storage-model)
- there needs to be an active ChangeFeed Processor per collection which monitors events being written, tracking the position of the most recently propagated events

In CosmosDb, every document lives within a [logical partition, which is then hosted by a variable number of processor instances entitled _physical partitions_](https://docs.microsoft.com/en-gb/azure/cosmos-db/partition-data) (`Equinox.Cosmos` documents pertaining to an individual stream bear the same partition key in order to ensure correct ordering guarantees for the purposes of projection). Each front end processor has responsibility for a particular subset range of the partition key space.

The ChangeFeed’s real world manifestation is as a long running Processor per frontend processor that repeatedly tails a query across the set of documents being managed by a given partition host (subject to topology changes - new processors can come and go, with the assigned ranges shuffling to balance the load per processor). e.g. if you allocate 30K RU/s to a collection and/or store >20GB of data, it will have at least 3 processors, each handling 1/3 of the partition key space, and running a change feed from that is a matter of maintaining 3 continuous queries, with a continuation token each being held/leased/controlled by a given Change Feed Processor.

## Effect of ChangeFeed on Request Charges

It should be noted that the ChangeFeed is not special-cased by CosmosDb itself in any meaningful way - something somewhere is going to be calling a DocumentDb API queries, paying Request Charges for the privilege (even a tail request based on a continuation token yielding zero documents incurs a charge). Its important to consider that every event written by `Equinox.Cosmos` into the CosmosDb collection will induce an approximately equivalent cost due to the fact that a freshly inserted document will be included in the next batch propagated by the Processor (each update of a document also ‘moves’ that document from it’s present position in the change order past the the notional tail of the ChangeFeed). Thus each insert/update also induces an (unavoidable) request charge based on the fact that the document will be included aggregate set of touched documents being surfaced per batch transferred from the ChangeFeed (charging is per KiB or part thereof). _The effect of this cost is multipled by the number of ChangeFeedProcessor instances one is running._

## Change Feed Processors

The CosmosDb ChangeFeed’s real world manifestation is a continuous query per DocumentDb Physical Partition node processor.

For .NET, this is wrapped in a set of APIs presented within the standard `Microsoft.Azure.DocumentDb[.Core]` APIset (for example, the [`Sagan` library](https://github.com/jet/sagan) is built based on this, _but there be dragons; implementing a correct one you can trust, with tests, reliability and good performance is no trivial undertaking_).

A ChangeFeed _Processor_ consists of (per CosmosDb processor/range)
- a host process running somewhere that will run the query and then do something with the results before marking off progress
- a continuous query across the set of documents that fall within the partition key range hosted by a given physical partition host

The implementation in this repo uses [Microsoft’s .NET `ChangeFeedProcessor` implementation](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet), which is a proven component used for diverse purposes including as the underlying substrate for various Azure Functions wiring (_though NOT bug free at the present time_).

See the [PR that added the initial support for CosmosDb Projections](https://github.com/jet/equinox/pull/87) and [the QuickStart](README.md#quickstart) for instructions.

# Feeding to Kafka

While [Kafka is not for Event Sourcing](https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c), if you have the scale to run automate the care and feeding of Kafka infrastructure, it can a great toof for the job of Replicating events and/or Rich Events in a scalable manner.

The [Apache Kafka intro docs](https://kafka.apache.org/intro) provide a clear terse overview of the design and attendant benefits this brings to bear.

As noted in the [Effect of ChangeFeed on Request Charges](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#effect-of-changefeed-on-request-charges) section, it can make sense to replicate a subset of the ChangeFeed to a Kafka topic (both for projections being consumed within a Bounded Context and for cases where you are generating a Pubished Notification Event) purely from the point of view of optimising request charges (and not needing to consider projections when considering how to scale up provisioning for load). Other benefits are mechanical sympathy based - Kafka can be the right tool for the job in scaling out traversal of events for a variety of use cases given one has sufficient traffic to warrant the complexity.

See the [PR that added the initial support for CosmosDb Projections](https://github.com/jet/equinox/pull/87) and [the QuickStart](README.md#quickstart) for instructions.

- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf

# Roadmap

# Things that are incomplete and/or require work
Expand Down
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
# run as a single-node cluster to allow connection logic to use cluster mode as for a commercial cluster
/usr/local/bin/eventstored --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778
```
3. generate sample app with EventStore wiring from template and start
Expand Down Expand Up @@ -237,19 +236,19 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
dotnet run -p Web
```
4. (**`2.0.0-rc*`**) Use `eqx` tool to run a CosmosDb ChangeFeedProcessor
4. Use `propulsion` tool to run a CosmosDb ChangeFeedProcessor
```powershell
# TEMP: need to uninstall and use --version flag while this is in RC
dotnet tool uninstall Equinox.Tool -g
dotnet tool install Equinox.Tool -g --version 2.0.0-rc*
dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g --version 1.0.1-rc*
eqx initAux -ru 400 cosmos # generates a -aux collection for the ChangeFeedProcessor to maintain consumer group progress within
propulsion init -ru 400 cosmos # generates a -aux collection for the ChangeFeedProcessor to maintain consumer group progress within
# -v for verbose ChangeFeedProcessor logging
# `projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
eqx -v project projector1 stats cosmos
propulsion -v project projector1 stats cosmos
```
5. Generate a CosmosDb ChangeFeedProcessor sample `.fsproj` (without Kafka producer/consumer), using `Propulsion.Cosmos`
Expand All @@ -266,7 +265,20 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour
dotnet run -- projector2 cosmos
```
6. Generate CosmosDb [Kafka Projector and Consumer](DOCUMENTATION.md#feeding-to-kafka) `.fsproj`ects (using `Propulsion.Kafka`)
6. Use `propulsion` tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic
```powershell
$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
# `-v` for verbose logging
# `projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -v project projector3 -l 5 kafka temp-topic cosmos
```
7. Generate CosmosDb [Kafka Projector and Consumer](DOCUMENTATION.md#feeding-to-kafka) `.fsproj`ects (using `Propulsion.Kafka`)
```powershell
cat readme.md # more complete instructions regarding the code
Expand Down
5 changes: 0 additions & 5 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ param(
[Alias("cd")][string] $cosmosDatabase=$env:EQUINOX_COSMOS_DATABASE,
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("ca")][switch][bool] $cosmosProvisionAux,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs="-t:Build"
)
Expand Down Expand Up @@ -40,10 +39,6 @@ if ($skipCosmos) {
# -P: inhibit creation of stored proc (everything in the repo should work without it due to auto-provisioning)
cliCosmos @("init", "-ru", "400", "-P")
$deprovisionCosmos=$true
if ($cosmosProvisionAux) {
warn "Provisioning cosmos aux collection for projector..."
cliCosmos @("initAux", "-ru", "400")
}
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos

Expand Down
11 changes: 0 additions & 11 deletions tools/Equinox.Tool/Infrastructure/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,20 @@ module Equinox.Tool.Infrastructure.Prelude
open System
open System.Diagnostics
open System.Text
open System.Threading

type Exception with
// https://github.com/fsharp/fslang-suggestions/issues/660
member this.Reraise () =
(System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw ()
Unchecked.defaultof<_>

#nowarn "21" // re AwaitKeyboardInterrupt
#nowarn "40" // re AwaitKeyboardInterrupt
type Async with
/// <summary>
/// Raises an exception using Async's continuation mechanism directly.
/// </summary>
/// <param name="exn">Exception to be raised.</param>
static member Raise (exn : #exn) = Async.FromContinuations(fun (_,ec,_) -> ec exn)

/// Asynchronously awaits the next keyboard interrupt event
static member AwaitKeyboardInterrupt () : Async<unit> =
Async.FromContinuations(fun (sc,_,_) ->
let isDisposed = ref 0
let rec callback _ = Tasks.Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
and d : IDisposable = System.Console.CancelKeyPress.Subscribe callback
in ())

/// <summary>
/// Gets the result of given task so that in the event of exception
/// the actual user exception is raised as opposed to being wrapped
Expand Down
26 changes: 1 addition & 25 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Arguments =
| [<AltCommandLine("-l")>] LogFile of string
| [<CliPrefix(CliPrefix.None); Last; Unique>] Run of ParseResults<TestArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Init of ParseResults<InitArguments>
| [<CliPrefix(CliPrefix.None); Last; Unique>] InitAux of ParseResults<InitAuxArguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
Expand All @@ -29,7 +28,6 @@ type Arguments =
| LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)."
| Run _ -> "Run a load test"
| Init _ -> "Initialize Store/Collection (presently only relevant for `cosmos`; also handles adjusting RU/s provisioning adjustment)."
| InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the Projector)."
and [<NoComparison>]InitArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<AltCommandLine("-D")>] Shared
Expand All @@ -49,16 +47,6 @@ and [<NoComparison>]InitDbArguments =
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Database."
| SkipStoredProc -> "Inhibit creation of stored procedure in cited Collection."
| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]InitAuxArguments =
| [<AltCommandLine("-ru"); Mandatory>] Rus of int
| [<AltCommandLine("-s")>] Suffix of string
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Storage.Cosmos.Arguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Aux Collection."
| Suffix _ -> "Specify Collection Name suffix (default: `-aux`)."

| Cosmos _ -> "Cosmos Connection parameters."
and [<NoComparison>]WebArguments =
| [<AltCommandLine("-u")>] Endpoint of string
Expand Down Expand Up @@ -237,17 +225,6 @@ module CosmosInit =
let! conn = connector.Connect("equinox-tool", discovery)
return! init log conn.Client (dbName,collName) mode skipStoredProc
| _ -> failwith "please specify a `cosmos` endpoint" }
let aux (log: ILogger, verboseConsole, maybeSeq) (iargs: ParseResults<InitAuxArguments>) = async {
match iargs.TryGetSubCommand() with
| Some (InitAuxArguments.Cosmos sargs) ->
let storeLog = createStoreLog (sargs.Contains Storage.Cosmos.Arguments.VerboseStore) verboseConsole maybeSeq
let discovery, dbName, baseCollName, connector = Storage.Cosmos.connection (log,storeLog) (Storage.Cosmos.Info sargs)
let auxCollName = let collSuffix = iargs.GetResult(InitAuxArguments.Suffix,"-aux") in baseCollName + collSuffix
let rus = iargs.GetResult(InitAuxArguments.Rus)
log.Information("Provisioning Lease/`aux` Collection {collName} for {rus:n0} RU/s", auxCollName, rus)
let! conn = connector.Connect("equinox-tool", discovery)
return! initAux conn.Client (dbName,auxCollName) rus
| _ -> failwith "please specify a `cosmos` endpoint" }

[<EntryPoint>]
let main argv =
Expand All @@ -261,11 +238,10 @@ let main argv =
let log = createDomainLog verbose verboseConsole maybeSeq
match args.GetSubCommand() with
| Init iargs -> CosmosInit.containerAndOrDb (log, verboseConsole, maybeSeq) iargs |> Async.RunSynchronously
| InitAux iargs -> CosmosInit.aux (log, verboseConsole, maybeSeq) iargs |> Async.RunSynchronously
| Run rargs ->
let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName
LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs
| _ -> failwith "Please specify a valid subcommand :- init, initAux, project or run"
| _ -> failwith "Please specify a valid subcommand :- init or run"
0
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
| Storage.MissingArg msg -> eprintfn "%s" msg; 1
Expand Down

0 comments on commit 7728239

Please sign in to comment.