From d6116fc997e43c2fec60b02cf15ec53eb7c2fb99 Mon Sep 17 00:00:00 2001 From: Rafael Magalhaes Date: Wed, 18 Dec 2024 23:02:03 +0000 Subject: [PATCH] fix: transfer type resolution on dp self registration --- .../transfer/flow/TransferTypeParserImpl.java | 2 +- .../DataplaneSelfRegistrationExtension.java | 2 +- .../DataplaneSelfRegistrationExtensionTest.java | 14 +++++++++++++- .../selector/spi/instance/DataPlaneInstance.java | 2 +- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java index 075de93d60c..751dfba9516 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java @@ -25,7 +25,7 @@ public class TransferTypeParserImpl implements TransferTypeParser { /** * Parses a compose transfer type string into a {@link TransferType}: - * {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL/Websocket} + * {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL-Websocket} * * @param transferType the transfer type string representation. * @return a {@link TransferType} diff --git a/extensions/data-plane/data-plane-self-registration/src/main/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtension.java b/extensions/data-plane/data-plane-self-registration/src/main/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtension.java index 8e291a81ea4..3e40aeda3d3 100644 --- a/extensions/data-plane/data-plane-self-registration/src/main/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtension.java +++ b/extensions/data-plane/data-plane-self-registration/src/main/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtension.java @@ -118,7 +118,7 @@ public void shutdown() { private @NotNull Stream toTransferTypes(FlowType pull, Set types, Set responseTypes) { var transferTypes = types.stream().map(it -> "%s-%s".formatted(it, pull)); - return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s/%s".formatted(it, pull, responseType)))); + return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s-%s".formatted(it, pull, responseType)))); } private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider { diff --git a/extensions/data-plane/data-plane-self-registration/src/test/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtensionTest.java b/extensions/data-plane/data-plane-self-registration/src/test/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtensionTest.java index f4f4e662044..f4cbb46a186 100644 --- a/extensions/data-plane/data-plane-self-registration/src/test/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtensionTest.java +++ b/extensions/data-plane/data-plane-self-registration/src/test/java/org/eclipse/edc/connector/dataplane/registration/DataplaneSelfRegistrationExtensionTest.java @@ -75,6 +75,7 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType")); when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType")); when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType")); + when(publicEndpointGeneratorService.supportedResponseTypes()).thenReturn(Set.of("responseType", "anotherResponseType")); when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success()); extension.initialize(context); @@ -87,7 +88,18 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio assertThat(dataPlaneInstance.getUrl()).isEqualTo(new URL("http://control/api/url/v1/dataflows")); assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType"); assertThat(dataPlaneInstance.getAllowedTransferTypes()) - .containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH"); + .containsExactlyInAnyOrder("anotherPullDestType-PULL-anotherResponseType", + "anotherSinkType-PUSH-anotherResponseType", + "anotherPullDestType-PULL", + "anotherSinkType-PUSH-responseType", + "anotherSinkType-PUSH", + "pullDestType-PULL", + "anotherPullDestType-PULL-responseType", + "pullDestType-PULL-anotherResponseType", + "sinkType-PUSH-anotherResponseType", + "pullDestType-PULL-responseType", + "sinkType-PUSH-responseType", + "sinkType-PUSH"); verify(healthCheckService).addStartupStatusProvider(any()); verify(healthCheckService).addLivenessProvider(any()); diff --git a/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java index c74897c8fae..a3e921d73dd 100644 --- a/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java +++ b/spi/data-plane-selector/data-plane-selector-spi/src/main/java/org/eclipse/edc/connector/dataplane/selector/spi/instance/DataPlaneInstance.java @@ -101,7 +101,7 @@ public String stateAsString() { public boolean canHandle(DataAddress sourceAddress, @Nullable String transferType) { Objects.requireNonNull(sourceAddress, "source cannot be null!"); Objects.requireNonNull(transferType, "transferType cannot be null!"); - // startsWith: the allowed transferType could be HttpData-PULL/someResponseChannel, and we only need to match the HttpData-PULL + // startsWith: the allowed transferType could be HttpData-PULL-someResponseChannel, and we only need to match the HttpData-PULL return allowedSourceTypes.contains(sourceAddress.getType()) && allowedTransferTypes.contains(transferType); }