Skip to content

Commit

Permalink
Revert "Use timestamp TaS conditions from Document V1 handler when po…
Browse files Browse the repository at this point in the history
…ssible"
  • Loading branch information
hmusum authored Oct 9, 2024
1 parent 4d14ac6 commit d690b40
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1285,55 +1285,35 @@ private interface VisitCallback {
default void onStart(JsonResponse response, boolean fullyApplied) throws IOException { }

/** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */
default void onDocument(JsonResponse response, Document document, DocumentId removeId, long persistedTimestamp, Runnable ack, Consumer<String> onError) { }
default void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) { }

/** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
default void onEnd(JsonResponse response) throws IOException { }
}

@FunctionalInterface
private interface VisitProcessingCallback {
Result apply(DocumentId id, long persistedTimestamp, DocumentOperationParameters params);
}

private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
TestAndSetCondition condition, String route) {
visitAndProcess(request, parameters, true, handler, route, (id, timestamp, operationParameters) -> {
visitAndProcess(request, parameters, true, handler, route, (id, operationParameters) -> {
DocumentRemove remove = new DocumentRemove(id);
// If the backend provided a persisted timestamp, we set a condition that specifies _both_ the
// original selection and the timestamp. If the backend supports timestamp-predicated TaS operations,
// it will ignore the selection entirely and only look at the timestamp. If it does not, it will fall
// back to evaluating the selection, which preserves legacy behavior.
if (timestamp != 0) {
remove.setCondition(TestAndSetCondition.ofRequiredTimestampWithSelectionFallback(
timestamp, condition.getSelection()));
} else {
remove.setCondition(condition);
}
remove.setCondition(condition);
return asyncSession.remove(remove, operationParameters);
});
}

private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, boolean fullyApplied,
ResponseHandler handler, DocumentUpdate protoUpdate, String route) {
visitAndProcess(request, parameters, fullyApplied, handler, route, (id, timestamp, operationParameters) -> {
DocumentUpdate update = new DocumentUpdate(protoUpdate);
// See `visitAndDelete()` for rationale for sending down a timestamp _and_ the original condition.
if (timestamp != 0) {
update.setCondition(TestAndSetCondition.ofRequiredTimestampWithSelectionFallback(
timestamp, protoUpdate.getCondition().getSelection()));
} // else: use condition already set from protoUpdate
update.setId(id);
return asyncSession.update(update, operationParameters);
visitAndProcess(request, parameters, fullyApplied, handler, route, (id, operationParameters) -> {
DocumentUpdate update = new DocumentUpdate(protoUpdate);
update.setId(id);
return asyncSession.update(update, operationParameters);
});
}

private void visitAndProcess(HttpRequest request, VisitorParameters parameters, boolean fullyApplied,
ResponseHandler handler,
String route, VisitProcessingCallback operation) {
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
visit(request, parameters, false, fullyApplied, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, DocumentId removeId,
long persistedTimestamp, Runnable ack, Consumer<String> onError) {
@Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
outstanding.decrementAndGet();
Expand All @@ -1352,7 +1332,7 @@ private void visitAndProcess(HttpRequest request, VisitorParameters parameters,
}
});
visitOperations.offer(() -> {
Result result = operation.apply(document.getId(), persistedTimestamp, operationParameters);
Result result = operation.apply(document.getId(), operationParameters);
if (result.type() == Result.ResultType.TRANSIENT_ERROR)
return false;

Expand All @@ -1377,8 +1357,7 @@ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, Re

response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, DocumentId removeId,
long persistedTimestamp, Runnable ack, Consumer<String> onError) {
@Override public void onDocument(JsonResponse response, Document document, DocumentId removeId, Runnable ack, Consumer<String> onError) {
try {
if (streamed) {
CompletionHandler completion = new CompletionHandler() {
Expand Down Expand Up @@ -1478,21 +1457,13 @@ private void visit(HttpRequest request, VisitorParameters parameters, boolean st
@Override public void onMessage(Message m, AckToken token) {
Document document = null;
DocumentId removeId = null;
long persistedTimestamp = 0;
if (m instanceof PutDocumentMessage put) {
document = put.getDocumentPut().getDocument();
persistedTimestamp = put.getPersistedTimestamp();
} else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) {
removeId = remove.getDocumentId();
persistedTimestamp = remove.getPersistedTimestamp();
} else {
throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName());
}
if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument();
else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId();
else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName());
locallyReceivedDocCount.getAndAdd(1);
callback.onDocument(response,
document,
removeId,
persistedTimestamp,
() -> ack(token),
errorMessage -> {
error.set(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,71 +1018,6 @@ public void testResponses() {
driver.close();
}

@Test
public void batch_update_rewrites_tas_condition_with_timestamp_predicate_if_provided_by_backend() {
var driver = new RequestHandlerTestDriver(handler); // try-with-resources hangs the test on assertion failure, which isn't optimal
List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null));
long backendTimestamp = 1234567890;

access.expect(tokens.subList(2, 3));
access.expect(parameters -> {
var put = new PutDocumentMessage(new DocumentPut(doc3));
put.setPersistedTimestamp(backendTimestamp);
parameters.getLocalDataHandler().onMessage(put, tokens.get(2));
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "Won't care");
});
access.session.expect((update, parameters) -> {
// TaS condition should now have _both_ the original selection and the exact backend timestamp.
var expectedCondition = TestAndSetCondition.ofRequiredTimestampWithSelectionFallback(backendTimestamp, "optimist");
assertEquals(expectedCondition, ((DocumentUpdate) update).getCondition());
parameters.responseHandler().get().handleResponse(new UpdateResponse(0, false));
return new Result();
});
var response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=optimist&cluster=content&timeChunk=10", PUT,
"""
{
"fields": {
"artist": { "assign": "Jahn Teigen" }
}
}""");
assertSameJson("""
{
"pathId": "/document/v1/space/music/docid",
"documentCount": 1
}""",
response.readAll());
assertEquals(200, response.getStatus());
}

@Test
public void batch_remove_rewrites_tas_condition_with_timestamp_predicate_if_provided_by_backend() {
var driver = new RequestHandlerTestDriver(handler); // try-with-resources hangs the test on assertion failure, which isn't optimal
List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null), new AckToken(null));
long backendTimestamp = 1234567890;

access.expect(tokens.subList(2, 3));
access.expect(parameters -> {
var put = new PutDocumentMessage(new DocumentPut(doc3.getDataType(), doc3.getId())); // Only the document ID
put.setPersistedTimestamp(backendTimestamp);
parameters.getLocalDataHandler().onMessage(put, tokens.get(2));
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "Won't care");
});
access.session.expect((remove, parameters) -> {
var expectedCondition = TestAndSetCondition.ofRequiredTimestampWithSelectionFallback(backendTimestamp, "pessimist");
assertEquals(expectedCondition, ((DocumentRemove) remove).getCondition());
parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId()));
return new Result();
});
var response = driver.sendRequest("http://localhost/document/v1/?selection=pessimist&cluster=content&timeChunk=10", DELETE);
assertSameJson("""
{
"pathId": "/document/v1/",
"documentCount": 1
}""",
response.readAll());
assertEquals(200, response.getStatus());
}

private void doTestVisitRequestWithParams(String httpReqParams, Consumer<VisitorParameters> paramChecker) {
try (var driver = new RequestHandlerTestDriver(handler)) {
access.expect(parameters -> {
Expand Down

0 comments on commit d690b40

Please sign in to comment.