Skip to content

Commit

Permalink
feat: improve DataFlow termination (#3560)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt authored Oct 25, 2023
1 parent e90ff5e commit 8d7e364
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

Expand Down Expand Up @@ -119,7 +120,7 @@ public StatusResult<Void> terminate(String dataFlowId) {
if (terminateResult.failed()) {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
}
dataFlow.transitToCompleted();
dataFlow.transitToTerminated();
store.save(dataFlow);
return StatusResult.success();
} else {
Expand All @@ -140,6 +141,10 @@ private boolean processReceived(DataFlow dataFlow) {
return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request))
.entityRetrieve(id -> store.findById(id))
.onSuccess((f, r) -> {
if (f.getState() == TERMINATED.code()) {
return;
}

if (r.succeeded()) {
f.transitToCompleted();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY;
Expand Down Expand Up @@ -146,7 +147,7 @@ void terminate_shouldTerminateDataFlow() {
var result = manager.terminate("dataFlowId");

assertThat(result).isSucceeded();
verify(store).save(argThat(d -> d.getState() == COMPLETED.code()));
verify(store).save(argThat(d -> d.getState() == TERMINATED.code()));
verify(transferService).terminate(dataFlow);
}

Expand Down Expand Up @@ -215,6 +216,24 @@ void received_shouldStartTransferAndTransitionToCompleted_whenTransferSucceeds()
});
}

@Test
void received_shouldStartTransferAndNotTransitionToCompleted_whenTransferSucceedsBecauseItsTermination() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
var terminatedDataFlow = dataFlowBuilder().state(TERMINATED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(RECEIVED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
when(store.findById(any())).thenReturn(terminatedDataFlow);
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.canHandle(any())).thenReturn(true);
when(transferService.transfer(any())).thenReturn(completedFuture(StreamResult.success()));

manager.start();

await().untilAsserted(() -> {
verify(transferService).transfer(isA(DataFlowRequest.class));
verify(store, never()).save(any());
});
}

@Test
void received_shouldStartTransferAndTransitionToFailed_whenTransferFails() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED;

/**
* Entity that represent a Data Plane Transfer Flow
Expand Down Expand Up @@ -108,6 +109,10 @@ public void transitToNotified() {
transitionTo(NOTIFIED.code());
}

public void transitToTerminated() {
transitionTo(TERMINATED.code());
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder extends StatefulEntity.Builder<DataFlow, Builder> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum DataFlowStates {
NOT_TRACKED(0),
RECEIVED(100),
COMPLETED(200),
TERMINATED(250),
FAILED(300),
NOTIFIED(400);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.test.e2e.annotations.KafkaIntegrationTest;
import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant;
Expand All @@ -45,13 +46,13 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.constraints.NotNull;

import static java.lang.String.format;
import static java.time.Duration.ZERO;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED;
Expand Down Expand Up @@ -136,12 +137,12 @@ void kafkaToHttpTransfer() throws JsonProcessingException {

await().atMost(TIMEOUT).untilAsserted(() -> {
var state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(TERMINATED.name());
assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED);
});

destinationServer.clear(request)
.when(request).respond(response());
await().pollDelay(2, TimeUnit.SECONDS).atMost(TIMEOUT).untilAsserted(() -> {
await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> {
destinationServer.verify(request, never());
});

Expand All @@ -167,11 +168,11 @@ void kafkaToKafkaTransfer() {

await().atMost(TIMEOUT).untilAsserted(() -> {
var state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(TERMINATED.name());
assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED);
});

consumer.poll(ZERO);
await().pollDelay(5, TimeUnit.SECONDS).atMost(TIMEOUT).untilAsserted(() -> {
await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> {
var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(SINK_TOPIC);
assertThat(recordsFound).isEmpty();
});
Expand Down

0 comments on commit 8d7e364

Please sign in to comment.