From e3c45d81f4729a2613cb9cc53be39c6abbfc89dc Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 29 Sep 2020 16:14:38 +0100 Subject: [PATCH] Cherry-pick V3 stored proc enhancements (#242) --- CHANGELOG.md | 2 +- src/Equinox.Cosmos/Cosmos.fs | 91 +++++++++++-------- .../CosmosCoreIntegration.fs | 18 ++-- .../CosmosIntegration.fs | 6 +- .../Equinox.Cosmos.Integration.fsproj | 2 + 5 files changed, 70 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f83929669..b4d368034 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed - `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241) +- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242) ### Removed ### Fixed @@ -27,7 +28,6 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Cosmos`: Tweaked log messages - ## [2.3.0-rc1] - 2020-08-31 diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index d80a3b626..b387ad766 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -404,23 +404,27 @@ module Sync = // NB don't nest in a private module, or serialization will fail miserably ;) [] type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] } - let [] private sprocName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change + let [] private sprocName = "EquinoxRollingUnfolds4" // NB need to rename/number for any breaking change let [] private sprocBody = """ -// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states -// 1 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index) -// 2a Verify no current Tip; if so - incoming req.e and defines the `n`ext position / unfolds -// 2b If we already have a tip, move position forward, replace unfolds -// 3 insert a new document containing the events as part of the same batch of work -// 3a in some cases, there are only changes to the `u`nfolds and no `e`vents, in which case no write should happen +// Manages the merging of the supplied Request Batch into the stream + +// 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index) + +// High level end-states: + +// 1a if there is a Tip, but are only changes to the `u`nfolds (and no `e`vents) -> update Tip only +// 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip + +// 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip +// 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch + function sync(req, expIndex, expEtag) { if (!req) throw new Error("Missing req argument"); - const collection = getContext().getCollection(); - const collectionLink = collection.getSelfLink(); + const collectionLink = __.getSelfLink(); const response = getContext().getResponse(); - // Locate the Tip (-1) batch for this stream (which may not exist) - const tipDocId = collection.getAltLink() + "/docs/" + req.id; - const isAccepted = collection.readDocument(tipDocId, {}, function (err, current) { + const tipDocId = __.getAltLink() + "/docs/" + req.id; + const isAccepted = __.readDocument(tipDocId, {}, function (err, current) { // Verify we dont have a conflicting write if (expIndex === -1) { // For Any mode, we always do an append operation @@ -429,7 +433,10 @@ function sync(req, expIndex, expEtag) { // If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled response.setBody({ etag: null, n: 0, conflicts: [] }); } else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) { - // if we're working based on etags, the `u`nfolds very likely to bear relevant info as state-bearing unfolds + // Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle; + // yielding [] triggers the client to go loading the events itself + + // if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds // if there are no `u`nfolds, we need to be careful not to yield `conflicts: null`, as that signals a successful write (see below) response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [] }); } else { @@ -438,34 +445,46 @@ function sync(req, expIndex, expEtag) { }); if (!isAccepted) throw new Error("readDocument not Accepted"); - function executeUpsert(current) { + function executeUpsert(tip) { function callback(err, doc) { if (err) throw err; response.setBody({ etag: doc._etag, n: doc.n, conflicts: null }); } - var tip; - if (!current) { - tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u }; - const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback); - if (!tipAccepted) throw new Error("Unable to create Tip."); - } else { - // TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches - const n = current.n + req.e.length; - tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u }; + if (tip) { + Array.prototype.push.apply(tip.e, req.e); + tip.n = tip.i + tip.e.length; + // If there are events, calve them to their own batch (this behavior is to simplify CFP consumer impl) + if (tip.e.length > 0) { + const batch = { id: tip.i.toString(), p: tip.p, i: tip.i, n: tip.n, e: tip.e } + const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); + if (!batchAccepted) throw new Error("Unable to remove Tip markings."); + + tip.i = tip.n; + tip.e = []; + } - // as we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place - const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback); - if (!tipAccepted) throw new Error("Unable to replace Tip."); - } - // if there's only a state update involved, we don't do an event-batch write (if we did, they'd trigger uniqueness violations) - if (req.e.length) { - // For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to - // a) guarantee an item per write (multiple consecutive updates can be 'squashed') - // b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item) - const i = tip.n - req.e.length; - const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e }; - const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); - if (!batchAccepted) throw new Error("Unable to insert Batch."); + // TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches + + // Replace all the unfolds // TODO: should remove only unfolds being superseded + tip.u = req.u; + // As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place + const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback); + if (!isAccepted) throw new Error("Unable to replace Tip batch."); + } else { + // NOTE we write the batch first (more consistent RU cost than writing tip first) + if (req.e.length > 0) { + const batch = { id: "0", p: req.p, i: 0, n: req.e.length, e: req.e }; + const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true }); + if (!batchAccepted) throw new Error("Unable to create Batch 0."); + + req.i = batch.n; + req.e = []; + } else { + req.i = 0; + } + req.n = req.i + req.e.length; + const isAccepted = __.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback); + if (!isAccepted) throw new Error("Unable to create Tip batch."); } } }""" diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 877f29541..dbc3d2b6d 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -46,7 +46,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName index <| TestEvents.Create(0,1) test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 32 // 31.22 // WAS 10 + verifyRequestChargesMax 33 // 32.27 // WAS 10 // Clear the counters capture.Clear() @@ -54,7 +54,7 @@ type Tests(testOutputHelper) = test <@ AppendResult.Ok 6L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> // We didnt request small batches or splitting so it's not dramatically more expensive to write N events - verifyRequestChargesMax 39 // 38.21 // was 11 + verifyRequestChargesMax 39 // 38.74 // was 11 } // It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms @@ -121,7 +121,7 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Append] = capture.ExternalCalls @> pos <- pos + int64 appendBatchSize pos =! res - verifyRequestChargesMax 42 // 41.12 // 46 // 44.07 observed + verifyRequestChargesMax 46 // 45.16 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -134,7 +134,7 @@ type Tests(testOutputHelper) = pos <- pos + 42L pos =! res test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 48 // 47.02 // WAS 20 + verifyRequestChargesMax 46 // 45.42 // 47.02 // WAS 20 capture.Clear() let! res = Events.getNextIndex ctx streamName @@ -149,12 +149,12 @@ type Tests(testOutputHelper) = let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount)) test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 465 // 463.01 observed + verifyRequestChargesMax 149 // 148.11 // 463.01 observed capture.Clear() let! pos = ctx.Sync(stream,?position=None) test <@ [EqxAct.Tip] = capture.ExternalCalls @> - verifyRequestChargesMax 45 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) + verifyRequestChargesMax 5 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) capture.Clear() let! _pos = ctx.Sync(stream,pos) @@ -174,7 +174,7 @@ type Tests(testOutputHelper) = test <@ [EqxAct.Resync] = capture.ExternalCalls @> // The response aligns with a normal conflict in that it passes the entire set of conflicting events () test <@ AppendResult.Conflict (0L,[||]) = res @> - verifyRequestChargesMax 7 // 6.6 // WAS 5 + verifyRequestChargesMax 6 // 5.5 // WAS 5 capture.Clear() // Now write at the correct position @@ -182,7 +182,7 @@ type Tests(testOutputHelper) = let! res = Events.append ctx streamName 0L expected test <@ AppendResult.Ok 1L = res @> test <@ [EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 32 // 30.42 WAS 11 // 10.33 + verifyRequestChargesMax 33 // 32.05 WAS 11 // 10.33 capture.Clear() // Try overwriting it (a competing consumer would see the same) @@ -200,7 +200,7 @@ type Tests(testOutputHelper) = #else test <@ [EqxAct.Conflict] = capture.ExternalCalls @> #endif - verifyRequestChargesMax 7 // 6.64 + verifyRequestChargesMax 6 // 5.63 // 6.64 capture.Clear() } diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 4cc15eaa2..dd750df1f 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -93,7 +93,7 @@ type Tests(testOutputHelper) = // Extra roundtrip required after maxItemsPerRequest is exceeded let expectedBatchesOfItems = max 1 ((i-1) / maxItemsPerRequest) test <@ i = i && List.replicate expectedBatchesOfItems EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> - verifyRequestChargesMax 61 // 57.09 [5.24 + 54.78] // 5.5 observed for read + verifyRequestChargesMax 60 // 59.27 [3.28; 55.99] // 5.5 observed for read capture.Clear() // Validate basic operation; Key side effect: Log entries will be emitted to `capture` @@ -103,7 +103,7 @@ type Tests(testOutputHelper) = let expectedResponses = transactions/maxItemsPerRequest + 1 test <@ List.replicate expectedResponses EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> - verifyRequestChargesMax 11 // 10.01 + verifyRequestChargesMax 8 // 7.74 // 10.01 } [] @@ -375,4 +375,4 @@ type Tests(testOutputHelper) = capture.Clear() do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index ce2a6041f..180a475d2 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -26,6 +26,8 @@ + +