Skip to content

Commit

Permalink
Cherry-pick V3 stored proc enhancements (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Sep 29, 2020
1 parent 6ed1bf1 commit e3c45d8
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241)
- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242)

### Removed
### Fixed
Expand All @@ -27,7 +28,6 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Cosmos`: Tweaked log messages


<a name="2.3.0-rc1"></a>
## [2.3.0-rc1] - 2020-08-31

Expand Down
91 changes: 55 additions & 36 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -404,23 +404,27 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] }
let [<Literal>] private sprocName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change
let [<Literal>] private sprocName = "EquinoxRollingUnfolds4" // NB need to rename/number for any breaking change
let [<Literal>] private sprocBody = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// 2a Verify no current Tip; if so - incoming req.e and defines the `n`ext position / unfolds
// 2b If we already have a tip, move position forward, replace unfolds
// 3 insert a new document containing the events as part of the same batch of work
// 3a in some cases, there are only changes to the `u`nfolds and no `e`vents, in which case no write should happen
// Manages the merging of the supplied Request Batch into the stream
// 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// High level end-states:
// 1a if there is a Tip, but are only changes to the `u`nfolds (and no `e`vents) -> update Tip only
// 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip
// 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip
// 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch
function sync(req, expIndex, expEtag) {
if (!req) throw new Error("Missing req argument");
const collection = getContext().getCollection();
const collectionLink = collection.getSelfLink();
const collectionLink = __.getSelfLink();
const response = getContext().getResponse();
// Locate the Tip (-1) batch for this stream (which may not exist)
const tipDocId = collection.getAltLink() + "/docs/" + req.id;
const isAccepted = collection.readDocument(tipDocId, {}, function (err, current) {
const tipDocId = __.getAltLink() + "/docs/" + req.id;
const isAccepted = __.readDocument(tipDocId, {}, function (err, current) {
// Verify we dont have a conflicting write
if (expIndex === -1) {
// For Any mode, we always do an append operation
Expand All @@ -429,7 +433,10 @@ function sync(req, expIndex, expEtag) {
// If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled
response.setBody({ etag: null, n: 0, conflicts: [] });
} else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) {
// if we're working based on etags, the `u`nfolds very likely to bear relevant info as state-bearing unfolds
// Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle;
// yielding [] triggers the client to go loading the events itself
// if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds
// if there are no `u`nfolds, we need to be careful not to yield `conflicts: null`, as that signals a successful write (see below)
response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [] });
} else {
Expand All @@ -438,34 +445,46 @@ function sync(req, expIndex, expEtag) {
});
if (!isAccepted) throw new Error("readDocument not Accepted");
function executeUpsert(current) {
function executeUpsert(tip) {
function callback(err, doc) {
if (err) throw err;
response.setBody({ etag: doc._etag, n: doc.n, conflicts: null });
}
var tip;
if (!current) {
tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u };
const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback);
if (!tipAccepted) throw new Error("Unable to create Tip.");
} else {
// TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches
const n = current.n + req.e.length;
tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u };
if (tip) {
Array.prototype.push.apply(tip.e, req.e);
tip.n = tip.i + tip.e.length;
// If there are events, calve them to their own batch (this behavior is to simplify CFP consumer impl)
if (tip.e.length > 0) {
const batch = { id: tip.i.toString(), p: tip.p, i: tip.i, n: tip.n, e: tip.e }
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to remove Tip markings.");
tip.i = tip.n;
tip.e = [];
}
// as we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place
const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback);
if (!tipAccepted) throw new Error("Unable to replace Tip.");
}
// if there's only a state update involved, we don't do an event-batch write (if we did, they'd trigger uniqueness violations)
if (req.e.length) {
// For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to
// a) guarantee an item per write (multiple consecutive updates can be 'squashed')
// b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item)
const i = tip.n - req.e.length;
const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e };
const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to insert Batch.");
// TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches
// Replace all the unfolds // TODO: should remove only unfolds being superseded
tip.u = req.u;
// As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place
const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback);
if (!isAccepted) throw new Error("Unable to replace Tip batch.");
} else {
// NOTE we write the batch first (more consistent RU cost than writing tip first)
if (req.e.length > 0) {
const batch = { id: "0", p: req.p, i: 0, n: req.e.length, e: req.e };
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to create Batch 0.");
req.i = batch.n;
req.e = [];
} else {
req.i = 0;
}
req.n = req.i + req.e.length;
const isAccepted = __.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
}
}
}"""
Expand Down
18 changes: 9 additions & 9 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName index <| TestEvents.Create(0,1)
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 32 // 31.22 // WAS 10
verifyRequestChargesMax 33 // 32.27 // WAS 10
// Clear the counters
capture.Clear()

let! res = Events.append ctx streamName 1L <| TestEvents.Create(1,5)
test <@ AppendResult.Ok 6L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
// We didnt request small batches or splitting so it's not dramatically more expensive to write N events
verifyRequestChargesMax 39 // 38.21 // was 11
verifyRequestChargesMax 39 // 38.74 // was 11
}

// It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms
Expand Down Expand Up @@ -121,7 +121,7 @@ type Tests(testOutputHelper) =
test <@ [EqxAct.Append] = capture.ExternalCalls @>
pos <- pos + int64 appendBatchSize
pos =! res
verifyRequestChargesMax 42 // 41.12 // 46 // 44.07 observed
verifyRequestChargesMax 46 // 45.16
capture.Clear()

let! res = Events.getNextIndex ctx streamName
Expand All @@ -134,7 +134,7 @@ type Tests(testOutputHelper) =
pos <- pos + 42L
pos =! res
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 48 // 47.02 // WAS 20
verifyRequestChargesMax 46 // 45.42 // 47.02 // WAS 20
capture.Clear()

let! res = Events.getNextIndex ctx streamName
Expand All @@ -149,12 +149,12 @@ type Tests(testOutputHelper) =
let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100
let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount))
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 465 // 463.01 observed
verifyRequestChargesMax 149 // 148.11 // 463.01 observed
capture.Clear()

let! pos = ctx.Sync(stream,?position=None)
test <@ [EqxAct.Tip] = capture.ExternalCalls @>
verifyRequestChargesMax 45 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded)
verifyRequestChargesMax 5 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded)
capture.Clear()

let! _pos = ctx.Sync(stream,pos)
Expand All @@ -174,15 +174,15 @@ type Tests(testOutputHelper) =
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
// The response aligns with a normal conflict in that it passes the entire set of conflicting events ()
test <@ AppendResult.Conflict (0L,[||]) = res @>
verifyRequestChargesMax 7 // 6.6 // WAS 5
verifyRequestChargesMax 6 // 5.5 // WAS 5
capture.Clear()

// Now write at the correct position
let expected = TestEvents.Create(1,1)
let! res = Events.append ctx streamName 0L expected
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 32 // 30.42 WAS 11 // 10.33
verifyRequestChargesMax 33 // 32.05 WAS 11 // 10.33
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
Expand All @@ -200,7 +200,7 @@ type Tests(testOutputHelper) =
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
verifyRequestChargesMax 7 // 6.64
verifyRequestChargesMax 6 // 5.63 // 6.64
capture.Clear()
}

Expand Down
6 changes: 3 additions & 3 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Tests(testOutputHelper) =
// Extra roundtrip required after maxItemsPerRequest is exceeded
let expectedBatchesOfItems = max 1 ((i-1) / maxItemsPerRequest)
test <@ i = i && List.replicate expectedBatchesOfItems EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 61 // 57.09 [5.24 + 54.78] // 5.5 observed for read
verifyRequestChargesMax 60 // 59.27 [3.28; 55.99] // 5.5 observed for read
capture.Clear()

// Validate basic operation; Key side effect: Log entries will be emitted to `capture`
Expand All @@ -103,7 +103,7 @@ type Tests(testOutputHelper) =

let expectedResponses = transactions/maxItemsPerRequest + 1
test <@ List.replicate expectedResponses EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @>
verifyRequestChargesMax 11 // 10.01
verifyRequestChargesMax 8 // 7.74 // 10.01
}

[<AutoData(MaxTest = 2, SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down Expand Up @@ -375,4 +375,4 @@ type Tests(testOutputHelper) =
capture.Clear()
do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1
test <@ [EqxAct.Append] = capture.ExternalCalls @>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.DocumentDB" Version="2.11.6" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.11.6" Condition=" '$(TargetFramework)' != 'net461' " />
<PackageReference Include="FsCheck.xUnit" Version="2.14.0" />
<PackageReference Include="JsonDiffPatch.Net" Version="2.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.3.0" />
Expand Down

0 comments on commit e3c45d8

Please sign in to comment.