Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick V3 stored proc enhancements #242

Merged
merged 6 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- `Cosmos`: Reorganize Sync log message text, merge with Sync Conflict message [#241](https://github.com/jet/equinox/pull/241)
- `Cosmos`: Converge Stored Procedure Impl with `tip-isa-batch` impl from V3 (minor Request Charges cost reduction) [#242](https://github.com/jet/equinox/pull/242)

### Removed
### Fixed
Expand All @@ -27,7 +28,6 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Cosmos`: Tweaked log messages


<a name="2.3.0-rc1"></a>
## [2.3.0-rc1] - 2020-08-31

Expand Down
91 changes: 55 additions & 36 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -404,23 +404,27 @@ module Sync =
// NB don't nest in a private module, or serialization will fail miserably ;)
[<CLIMutable; NoEquality; NoComparison; Newtonsoft.Json.JsonObject(ItemRequired=Newtonsoft.Json.Required.AllowNull)>]
type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] }
let [<Literal>] private sprocName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change
let [<Literal>] private sprocName = "EquinoxRollingUnfolds4" // NB need to rename/number for any breaking change
let [<Literal>] private sprocBody = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// 2a Verify no current Tip; if so - incoming req.e and defines the `n`ext position / unfolds
// 2b If we already have a tip, move position forward, replace unfolds
// 3 insert a new document containing the events as part of the same batch of work
// 3a in some cases, there are only changes to the `u`nfolds and no `e`vents, in which case no write should happen
// Manages the merging of the supplied Request Batch into the stream
// 0 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// High level end-states:
// 1a if there is a Tip, but are only changes to the `u`nfolds (and no `e`vents) -> update Tip only
// 1b if there is a Tip, but incoming request includes an event -> generate a batch document + create empty Tip
// 2a if stream empty, but incoming request includes an event -> generate a batch document + create empty Tip
// 2b if no current Tip, and no events being written -> the incoming `req` becomes the Tip batch
function sync(req, expIndex, expEtag) {
if (!req) throw new Error("Missing req argument");
const collection = getContext().getCollection();
const collectionLink = collection.getSelfLink();
const collectionLink = __.getSelfLink();
const response = getContext().getResponse();
// Locate the Tip (-1) batch for this stream (which may not exist)
const tipDocId = collection.getAltLink() + "/docs/" + req.id;
const isAccepted = collection.readDocument(tipDocId, {}, function (err, current) {
const tipDocId = __.getAltLink() + "/docs/" + req.id;
const isAccepted = __.readDocument(tipDocId, {}, function (err, current) {
// Verify we dont have a conflicting write
if (expIndex === -1) {
// For Any mode, we always do an append operation
Expand All @@ -429,7 +433,10 @@ function sync(req, expIndex, expEtag) {
// If there is no Tip page, the writer has no possible reason for writing at an index other than zero, and an etag exp must be fulfilled
response.setBody({ etag: null, n: 0, conflicts: [] });
} else if (current && ((expIndex === -2 && expEtag !== current._etag) || (expIndex !== -2 && expIndex !== current.n))) {
// if we're working based on etags, the `u`nfolds very likely to bear relevant info as state-bearing unfolds
// Where possible, we extract conflicting events from e and/or u in order to avoid another read cycle;
// yielding [] triggers the client to go loading the events itself
// if we're working based on etags, the `u`nfolds likely bear relevant info as state-bearing unfolds
// if there are no `u`nfolds, we need to be careful not to yield `conflicts: null`, as that signals a successful write (see below)
response.setBody({ etag: current._etag, n: current.n, conflicts: current.u || [] });
} else {
Expand All @@ -438,34 +445,46 @@ function sync(req, expIndex, expEtag) {
});
if (!isAccepted) throw new Error("readDocument not Accepted");
function executeUpsert(current) {
function executeUpsert(tip) {
function callback(err, doc) {
if (err) throw err;
response.setBody({ etag: doc._etag, n: doc.n, conflicts: null });
}
var tip;
if (!current) {
tip = { p: req.p, id: req.id, i: req.e.length, n: req.e.length, e: [], u: req.u };
const tipAccepted = collection.createDocument(collectionLink, tip, { disableAutomaticIdGeneration: true }, callback);
if (!tipAccepted) throw new Error("Unable to create Tip.");
} else {
// TODO Carry forward `u` items not in `req`, together with supporting catchup events from preceding batches
const n = current.n + req.e.length;
tip = { p: current.p, id: current.id, i: n, n: n, e: [], u: req.u };
if (tip) {
Array.prototype.push.apply(tip.e, req.e);
tip.n = tip.i + tip.e.length;
// If there are events, calve them to their own batch (this behavior is to simplify CFP consumer impl)
if (tip.e.length > 0) {
const batch = { id: tip.i.toString(), p: tip.p, i: tip.i, n: tip.n, e: tip.e }
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to remove Tip markings.");
tip.i = tip.n;
tip.e = [];
}
// as we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place
const tipAccepted = collection.replaceDocument(current._self, tip, { etag: current._etag }, callback);
if (!tipAccepted) throw new Error("Unable to replace Tip.");
}
// if there's only a state update involved, we don't do an event-batch write (if we did, they'd trigger uniqueness violations)
if (req.e.length) {
// For now, always do an Insert, as Change Feed mechanism does not yet afford us a way to
// a) guarantee an item per write (multiple consecutive updates can be 'squashed')
// b) with metadata sufficient for us to determine the items added (only etags, no way to convey i/n in feed item)
const i = tip.n - req.e.length;
const batch = { p: tip.p, id: i.toString(), i: i, n: tip.n, e: req.e };
const batchAccepted = collection.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to insert Batch.");
// TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches
// Replace all the unfolds // TODO: should remove only unfolds being superseded
tip.u = req.u;
// As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place
const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback);
if (!isAccepted) throw new Error("Unable to replace Tip batch.");
} else {
// NOTE we write the batch first (more consistent RU cost than writing tip first)
if (req.e.length > 0) {
const batch = { id: "0", p: req.p, i: 0, n: req.e.length, e: req.e };
const batchAccepted = __.createDocument(collectionLink, batch, { disableAutomaticIdGeneration: true });
if (!batchAccepted) throw new Error("Unable to create Batch 0.");
req.i = batch.n;
req.e = [];
} else {
req.i = 0;
}
req.n = req.i + req.e.length;
const isAccepted = __.createDocument(collectionLink, req, { disableAutomaticIdGeneration: true }, callback);
if (!isAccepted) throw new Error("Unable to create Tip batch.");
}
}
}"""
Expand Down
18 changes: 9 additions & 9 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type Tests(testOutputHelper) =
let! res = Events.append ctx streamName index <| TestEvents.Create(0,1)
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 32 // 31.22 // WAS 10
verifyRequestChargesMax 33 // 32.27 // WAS 10
// Clear the counters
capture.Clear()

let! res = Events.append ctx streamName 1L <| TestEvents.Create(1,5)
test <@ AppendResult.Ok 6L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
// We didnt request small batches or splitting so it's not dramatically more expensive to write N events
verifyRequestChargesMax 39 // 38.21 // was 11
verifyRequestChargesMax 39 // 38.74 // was 11
}

// It's conceivable that in the future we might allow zero-length batches as long as a sync mechanism leveraging the etags and unfolds update mechanisms
Expand Down Expand Up @@ -121,7 +121,7 @@ type Tests(testOutputHelper) =
test <@ [EqxAct.Append] = capture.ExternalCalls @>
pos <- pos + int64 appendBatchSize
pos =! res
verifyRequestChargesMax 42 // 41.12 // 46 // 44.07 observed
verifyRequestChargesMax 46 // 45.16
capture.Clear()

let! res = Events.getNextIndex ctx streamName
Expand All @@ -134,7 +134,7 @@ type Tests(testOutputHelper) =
pos <- pos + 42L
pos =! res
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 48 // 47.02 // WAS 20
verifyRequestChargesMax 46 // 45.42 // 47.02 // WAS 20
capture.Clear()

let! res = Events.getNextIndex ctx streamName
Expand All @@ -149,12 +149,12 @@ type Tests(testOutputHelper) =
let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100
let! _pos = ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount))
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 465 // 463.01 observed
verifyRequestChargesMax 149 // 148.11 // 463.01 observed
capture.Clear()

let! pos = ctx.Sync(stream,?position=None)
test <@ [EqxAct.Tip] = capture.ExternalCalls @>
verifyRequestChargesMax 45 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded)
verifyRequestChargesMax 5 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded)
capture.Clear()

let! _pos = ctx.Sync(stream,pos)
Expand All @@ -174,15 +174,15 @@ type Tests(testOutputHelper) =
test <@ [EqxAct.Resync] = capture.ExternalCalls @>
// The response aligns with a normal conflict in that it passes the entire set of conflicting events ()
test <@ AppendResult.Conflict (0L,[||]) = res @>
verifyRequestChargesMax 7 // 6.6 // WAS 5
verifyRequestChargesMax 6 // 5.5 // WAS 5
capture.Clear()

// Now write at the correct position
let expected = TestEvents.Create(1,1)
let! res = Events.append ctx streamName 0L expected
test <@ AppendResult.Ok 1L = res @>
test <@ [EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 32 // 30.42 WAS 11 // 10.33
verifyRequestChargesMax 33 // 32.05 WAS 11 // 10.33
capture.Clear()

// Try overwriting it (a competing consumer would see the same)
Expand All @@ -200,7 +200,7 @@ type Tests(testOutputHelper) =
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
verifyRequestChargesMax 7 // 6.64
verifyRequestChargesMax 6 // 5.63 // 6.64
capture.Clear()
}

Expand Down
6 changes: 3 additions & 3 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Tests(testOutputHelper) =
// Extra roundtrip required after maxItemsPerRequest is exceeded
let expectedBatchesOfItems = max 1 ((i-1) / maxItemsPerRequest)
test <@ i = i && List.replicate expectedBatchesOfItems EqxAct.ResponseBackward @ [EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @>
verifyRequestChargesMax 61 // 57.09 [5.24 + 54.78] // 5.5 observed for read
verifyRequestChargesMax 60 // 59.27 [3.28; 55.99] // 5.5 observed for read
capture.Clear()

// Validate basic operation; Key side effect: Log entries will be emitted to `capture`
Expand All @@ -103,7 +103,7 @@ type Tests(testOutputHelper) =

let expectedResponses = transactions/maxItemsPerRequest + 1
test <@ List.replicate expectedResponses EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @>
verifyRequestChargesMax 11 // 10.01
verifyRequestChargesMax 8 // 7.74 // 10.01
}

[<AutoData(MaxTest = 2, SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
Expand Down Expand Up @@ -375,4 +375,4 @@ type Tests(testOutputHelper) =
capture.Clear()
do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1
test <@ [EqxAct.Append] = capture.ExternalCalls @>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.DocumentDB" Version="2.11.6" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.11.6" Condition=" '$(TargetFramework)' != 'net461' " />
<PackageReference Include="FsCheck.xUnit" Version="2.14.0" />
<PackageReference Include="JsonDiffPatch.Net" Version="2.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.3.0" />
Expand Down