From 77282399fc78b34802830b18e7e598c98a0c45af Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 2 Aug 2019 04:19:02 +0100 Subject: [PATCH] Update doc to refer to Propulsion.Tool facilities --- CHANGELOG.md | 2 + DOCUMENTATION.md | 45 ------------------- README.md | 26 ++++++++--- build.ps1 | 5 --- .../Infrastructure/Infrastructure.fs | 11 ----- tools/Equinox.Tool/Program.fs | 26 +---------- 6 files changed, 22 insertions(+), 93 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 629f66af6..5fdd8c646 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,13 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed +- Updated README.md to refer to `propulsion init` and `propulsion project` (formerly `eqx` `initAux` and `project`) (jet/propulsion#17)[https://github.com/jet/propulsion/pull/17] - `eqx project` now uses environment variables `PROPULSION_KAFKA_`* instead of `EQUINOX_`* [#143](https://github.com/jet/equinox/pull/143) ### Removed - `eqx project` - `ChangeFeedProcessor` and Kafka support - All projection management logic now lives in the `Propulsion` libraries [#138](https://github.com/jet/equinox/pull/138) +- `eqx initAux` - now `propulsion init` (jet/propulsion#17)[https://github.com/jet/propulsion/pull/17] ### Fixed diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 0f199144b..e9d0eca80 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -734,51 +734,6 @@ match res with | c -> failwithf "conflict %A" c ``` -# Projectors - -See [this medium post regarding some patterns used at Jet in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8) for a broad overview of the reasons one might consider employing a projection system. - -# `Equinox.Cosmos` Projection facility - - An integral part of the `Equinox.Cosmos` featureset is the ability to project events based on the [Azure DocumentDb ChangeFeed mechanism](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed). Key elements involved in realizing this are: -- the [storage model needs to be designed in such a way that the aforementioned processor can do its job efficiently](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#cosmos-storage-model) -- there needs to be an active ChangeFeed Processor per collection which monitors events being written, tracking the position of the most recently propagated events - -In CosmosDb, every document lives within a [logical partition, which is then hosted by a variable number of processor instances entitled _physical partitions_](https://docs.microsoft.com/en-gb/azure/cosmos-db/partition-data) (`Equinox.Cosmos` documents pertaining to an individual stream bear the same partition key in order to ensure correct ordering guarantees for the purposes of projection). Each front end processor has responsibility for a particular subset range of the partition key space. - -The ChangeFeed’s real world manifestation is as a long running Processor per frontend processor that repeatedly tails a query across the set of documents being managed by a given partition host (subject to topology changes - new processors can come and go, with the assigned ranges shuffling to balance the load per processor). e.g. if you allocate 30K RU/s to a collection and/or store >20GB of data, it will have at least 3 processors, each handling 1/3 of the partition key space, and running a change feed from that is a matter of maintaining 3 continuous queries, with a continuation token each being held/leased/controlled by a given Change Feed Processor. - -## Effect of ChangeFeed on Request Charges - -It should be noted that the ChangeFeed is not special-cased by CosmosDb itself in any meaningful way - something somewhere is going to be calling a DocumentDb API queries, paying Request Charges for the privilege (even a tail request based on a continuation token yielding zero documents incurs a charge). Its important to consider that every event written by `Equinox.Cosmos` into the CosmosDb collection will induce an approximately equivalent cost due to the fact that a freshly inserted document will be included in the next batch propagated by the Processor (each update of a document also ‘moves’ that document from it’s present position in the change order past the the notional tail of the ChangeFeed). Thus each insert/update also induces an (unavoidable) request charge based on the fact that the document will be included aggregate set of touched documents being surfaced per batch transferred from the ChangeFeed (charging is per KiB or part thereof). _The effect of this cost is multipled by the number of ChangeFeedProcessor instances one is running._ - -## Change Feed Processors - -The CosmosDb ChangeFeed’s real world manifestation is a continuous query per DocumentDb Physical Partition node processor. - -For .NET, this is wrapped in a set of APIs presented within the standard `Microsoft.Azure.DocumentDb[.Core]` APIset (for example, the [`Sagan` library](https://github.com/jet/sagan) is built based on this, _but there be dragons; implementing a correct one you can trust, with tests, reliability and good performance is no trivial undertaking_). - -A ChangeFeed _Processor_ consists of (per CosmosDb processor/range) -- a host process running somewhere that will run the query and then do something with the results before marking off progress -- a continuous query across the set of documents that fall within the partition key range hosted by a given physical partition host - -The implementation in this repo uses [Microsoft’s .NET `ChangeFeedProcessor` implementation](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet), which is a proven component used for diverse purposes including as the underlying substrate for various Azure Functions wiring (_though NOT bug free at the present time_). - -See the [PR that added the initial support for CosmosDb Projections](https://github.com/jet/equinox/pull/87) and [the QuickStart](README.md#quickstart) for instructions. - -# Feeding to Kafka - -While [Kafka is not for Event Sourcing](https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c), if you have the scale to run automate the care and feeding of Kafka infrastructure, it can a great toof for the job of Replicating events and/or Rich Events in a scalable manner. - -The [Apache Kafka intro docs](https://kafka.apache.org/intro) provide a clear terse overview of the design and attendant benefits this brings to bear. - -As noted in the [Effect of ChangeFeed on Request Charges](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#effect-of-changefeed-on-request-charges) section, it can make sense to replicate a subset of the ChangeFeed to a Kafka topic (both for projections being consumed within a Bounded Context and for cases where you are generating a Pubished Notification Event) purely from the point of view of optimising request charges (and not needing to consider projections when considering how to scale up provisioning for load). Other benefits are mechanical sympathy based - Kafka can be the right tool for the job in scaling out traversal of events for a variety of use cases given one has sufficient traffic to warrant the complexity. - -See the [PR that added the initial support for CosmosDb Projections](https://github.com/jet/equinox/pull/87) and [the QuickStart](README.md#quickstart) for instructions. - -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md -- https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf - # Roadmap # Things that are incomplete and/or require work diff --git a/README.md b/README.md index a2ae97751..f3e28f1db 100644 --- a/README.md +++ b/README.md @@ -204,7 +204,6 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour # run as a single-node cluster to allow connection logic to use cluster mode as for a commercial cluster /usr/local/bin/eventstored --gossip-on-single-node --discover-via-dns 0 --ext-http-port=30778 ``` - 3. generate sample app with EventStore wiring from template and start @@ -237,19 +236,19 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour dotnet run -p Web ``` -4. (**`2.0.0-rc*`**) Use `eqx` tool to run a CosmosDb ChangeFeedProcessor +4. Use `propulsion` tool to run a CosmosDb ChangeFeedProcessor ```powershell # TEMP: need to uninstall and use --version flag while this is in RC - dotnet tool uninstall Equinox.Tool -g - dotnet tool install Equinox.Tool -g --version 2.0.0-rc* + dotnet tool uninstall Propulsion.Tool -g + dotnet tool install Propulsion.Tool -g --version 1.0.1-rc* - eqx initAux -ru 400 cosmos # generates a -aux collection for the ChangeFeedProcessor to maintain consumer group progress within + propulsion init -ru 400 cosmos # generates a -aux collection for the ChangeFeedProcessor to maintain consumer group progress within # -v for verbose ChangeFeedProcessor logging # `projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently # stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka) # cosmos specifies source overrides (using defaults in step 1 in this instance) - eqx -v project projector1 stats cosmos + propulsion -v project projector1 stats cosmos ``` 5. Generate a CosmosDb ChangeFeedProcessor sample `.fsproj` (without Kafka producer/consumer), using `Propulsion.Cosmos` @@ -266,7 +265,20 @@ While Equinox is implemented in F#, and F# is a great fit for writing event-sour dotnet run -- projector2 cosmos ``` -6. Generate CosmosDb [Kafka Projector and Consumer](DOCUMENTATION.md#feeding-to-kafka) `.fsproj`ects (using `Propulsion.Kafka`) +6. Use `propulsion` tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic + + ```powershell + $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + # `-v` for verbose logging + # `projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently + # `-l 5` to report ChangeFeed lags every 5 minutes + # `kafka` specifies one wants to emit to Kafka + # `temp-topic` is the topic to emit to + # `cosmos` specifies source overrides (using defaults in step 1 in this instance) + propulsion -v project projector3 -l 5 kafka temp-topic cosmos + ``` + + 7. Generate CosmosDb [Kafka Projector and Consumer](DOCUMENTATION.md#feeding-to-kafka) `.fsproj`ects (using `Propulsion.Kafka`) ```powershell cat readme.md # more complete instructions regarding the code diff --git a/build.ps1 b/build.ps1 index f2a058682..be5ff210f 100644 --- a/build.ps1 +++ b/build.ps1 @@ -7,7 +7,6 @@ param( [Alias("cd")][string] $cosmosDatabase=$env:EQUINOX_COSMOS_DATABASE, [Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION, [Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection, - [Alias("ca")][switch][bool] $cosmosProvisionAux, [Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos, [string] $additionalMsBuildArgs="-t:Build" ) @@ -40,10 +39,6 @@ if ($skipCosmos) { # -P: inhibit creation of stored proc (everything in the repo should work without it due to auto-provisioning) cliCosmos @("init", "-ru", "400", "-P") $deprovisionCosmos=$true - if ($cosmosProvisionAux) { - warn "Provisioning cosmos aux collection for projector..." - cliCosmos @("initAux", "-ru", "400") - } } $env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos diff --git a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs index db8d0d050..c47b700c2 100644 --- a/tools/Equinox.Tool/Infrastructure/Infrastructure.fs +++ b/tools/Equinox.Tool/Infrastructure/Infrastructure.fs @@ -4,7 +4,6 @@ module Equinox.Tool.Infrastructure.Prelude open System open System.Diagnostics open System.Text -open System.Threading type Exception with // https://github.com/fsharp/fslang-suggestions/issues/660 @@ -12,8 +11,6 @@ type Exception with (System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture this).Throw () Unchecked.defaultof<_> -#nowarn "21" // re AwaitKeyboardInterrupt -#nowarn "40" // re AwaitKeyboardInterrupt type Async with /// /// Raises an exception using Async's continuation mechanism directly. @@ -21,14 +18,6 @@ type Async with /// Exception to be raised. static member Raise (exn : #exn) = Async.FromContinuations(fun (_,ec,_) -> ec exn) - /// Asynchronously awaits the next keyboard interrupt event - static member AwaitKeyboardInterrupt () : Async = - Async.FromContinuations(fun (sc,_,_) -> - let isDisposed = ref 0 - let rec callback _ = Tasks.Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore - and d : IDisposable = System.Console.CancelKeyPress.Subscribe callback - in ()) - /// /// Gets the result of given task so that in the event of exception /// the actual user exception is raised as opposed to being wrapped diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 3f8b6190b..20241dbbe 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -20,7 +20,6 @@ type Arguments = | [] LogFile of string | [] Run of ParseResults | [] Init of ParseResults - | [] InitAux of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -29,7 +28,6 @@ type Arguments = | LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)." | Run _ -> "Run a load test" | Init _ -> "Initialize Store/Collection (presently only relevant for `cosmos`; also handles adjusting RU/s provisioning adjustment)." - | InitAux _ -> "Initialize auxilliary store (presently only relevant for `cosmos`, when you intend to run the Projector)." and []InitArguments = | [] Rus of int | [] Shared @@ -49,16 +47,6 @@ and []InitDbArguments = member a.Usage = a |> function | Rus _ -> "Specify RU/s level to provision for the Database." | SkipStoredProc -> "Inhibit creation of stored procedure in cited Collection." - | Cosmos _ -> "Cosmos Connection parameters." -and []InitAuxArguments = - | [] Rus of int - | [] Suffix of string - | [] Cosmos of ParseResults - interface IArgParserTemplate with - member a.Usage = a |> function - | Rus _ -> "Specify RU/s level to provision for the Aux Collection." - | Suffix _ -> "Specify Collection Name suffix (default: `-aux`)." - | Cosmos _ -> "Cosmos Connection parameters." and []WebArguments = | [] Endpoint of string @@ -237,17 +225,6 @@ module CosmosInit = let! conn = connector.Connect("equinox-tool", discovery) return! init log conn.Client (dbName,collName) mode skipStoredProc | _ -> failwith "please specify a `cosmos` endpoint" } - let aux (log: ILogger, verboseConsole, maybeSeq) (iargs: ParseResults) = async { - match iargs.TryGetSubCommand() with - | Some (InitAuxArguments.Cosmos sargs) -> - let storeLog = createStoreLog (sargs.Contains Storage.Cosmos.Arguments.VerboseStore) verboseConsole maybeSeq - let discovery, dbName, baseCollName, connector = Storage.Cosmos.connection (log,storeLog) (Storage.Cosmos.Info sargs) - let auxCollName = let collSuffix = iargs.GetResult(InitAuxArguments.Suffix,"-aux") in baseCollName + collSuffix - let rus = iargs.GetResult(InitAuxArguments.Rus) - log.Information("Provisioning Lease/`aux` Collection {collName} for {rus:n0} RU/s", auxCollName, rus) - let! conn = connector.Connect("equinox-tool", discovery) - return! initAux conn.Client (dbName,auxCollName) rus - | _ -> failwith "please specify a `cosmos` endpoint" } [] let main argv = @@ -261,11 +238,10 @@ let main argv = let log = createDomainLog verbose verboseConsole maybeSeq match args.GetSubCommand() with | Init iargs -> CosmosInit.containerAndOrDb (log, verboseConsole, maybeSeq) iargs |> Async.RunSynchronously - | InitAux iargs -> CosmosInit.aux (log, verboseConsole, maybeSeq) iargs |> Async.RunSynchronously | Run rargs -> let reportFilename = args.GetResult(LogFile,programName+".log") |> fun n -> System.IO.FileInfo(n).FullName LoadTest.run log (verbose,verboseConsole,maybeSeq) reportFilename rargs - | _ -> failwith "Please specify a valid subcommand :- init, initAux, project or run" + | _ -> failwith "Please specify a valid subcommand :- init or run" 0 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | Storage.MissingArg msg -> eprintfn "%s" msg; 1