Skip to content

Commit

Permalink
feat: enable federated catalog cache SQL variant
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Jul 18, 2024
1 parent 32344e2 commit b5370e5
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/FederatedCatalog/tree/main/extensions/store/sql/federated-catalog-cache-sql
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_NAME"
value: "federatedcatalog"
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
10 changes: 10 additions & 0 deletions charts/tractusx-connector/templates/deployment-controlplane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ spec:
- name: "EDC_DATASOURCE_EDR_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

# see extension https://github.com/eclipse-edc/FederatedCatalog/tree/main/extensions/store/sql/federated-catalog-cache-sql
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_NAME"
value: "federatedcatalog"
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_USER"
value: {{ .Values.postgresql.auth.username | required ".Values.postgresql.auth.username is required" | quote }}
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_PASSWORD"
value: {{ .Values.postgresql.auth.password | required ".Values.postgresql.auth.password is required" | quote }}
- name: "EDC_DATASOURCE_FEDERATEDCATALOG_URL"
value: {{ tpl .Values.postgresql.jdbcUrl . | quote }}

#############################
## IATP / STS / DIM CONFIG ##
#############################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/********************************************************************************
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

package org.eclipse.tractusx.edc.postgresql.migration;

public class FederatedCatalogCacheMigrationExtension extends AbstractPostgresqlMigrationExtension {
private static final String NAME_SUBSYSTEM = "federatedcatalog";

protected String getSubsystemName() {

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractPostgresqlMigrationExtension.getSubsystemName
; it is advisable to add an Override annotation.
return NAME_SUBSYSTEM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ org.eclipse.tractusx.edc.postgresql.migration.PolicyPostgresqlMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.PolicyMonitorPostgresqlMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.TransferProcessPostgresqlMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.BusinessGroupPostgresMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.EdrIndexPostgresqlMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.EdrIndexPostgresqlMigrationExtension
org.eclipse.tractusx.edc.postgresql.migration.FederatedCatalogCacheMigrationExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--
-- Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--

--
-- table: edc_federated_catalog
--

CREATE TABLE IF NOT EXISTS edc_federated_catalog
(
id VARCHAR PRIMARY KEY NOT NULL,
catalog JSON,
marked BOOLEAN DEFAULT FALSE
);


1 change: 1 addition & 0 deletions edc-tests/edc-controlplane/catalog-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
testImplementation(libs.edc.junit)
testImplementation(libs.restAssured)
testImplementation(libs.awaitility)
testImplementation(libs.edc.fc.spi.crawler)
testRuntimeOnly(libs.edc.transaction.local)

testCompileOnly(project(":edc-tests:runtime:runtime-memory"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/********************************************************************************
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

package org.eclipse.tractusx.edc.tests.catalog;

import jakarta.json.Json;
import org.eclipse.edc.crawler.spi.TargetNode;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.eclipse.tractusx.edc.tests.participant.TransferParticipant;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import static io.restassured.http.ContentType.JSON;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy;
import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.CONSUMER_BPN;
import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.CONSUMER_NAME;
import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.PROVIDER_BPN;
import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.PROVIDER_NAME;
import static org.eclipse.tractusx.edc.tests.participant.TractusxParticipantBase.ASYNC_POLL_INTERVAL;
import static org.eclipse.tractusx.edc.tests.participant.TractusxParticipantBase.ASYNC_TIMEOUT;
import static org.eclipse.tractusx.edc.tests.runtimes.Runtimes.memoryRuntime;
import static org.eclipse.tractusx.edc.tests.runtimes.Runtimes.pgRuntime;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;

public class FederatedCatalogTest {

protected static final TransferParticipant CONSUMER = TransferParticipant.Builder.newInstance()
.name(CONSUMER_NAME)
.id(CONSUMER_BPN)
.build();


protected static final TransferParticipant PROVIDER = TransferParticipant.Builder.newInstance()
.name(PROVIDER_NAME)
.id(PROVIDER_BPN)
.build();

protected static Map<String, String> withNodeList(Map<String, String> config, TransferParticipant... participants) {
var nodeList = Json.createArrayBuilder();
for (var participant : participants) {
nodeList.add(Json.createObjectBuilder()
.add("id", participant.getBpn())
.add("name", participant.getDid())
.add("url", participant.getProtocolEndpoint().getUrl().toString())
.add("supportedProtocols", Json.createArrayBuilder().add("dataspace-protocol-http")));
}
try {
var tempFile = Files.createTempFile("nodelist-", UUID.randomUUID().toString());
tempFile.toFile().deleteOnExit();
var stream = new ByteArrayInputStream(nodeList.build().toString().getBytes(StandardCharsets.UTF_8));
Files.copy(stream, tempFile, REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}

return config;
}

abstract static class Tests {

@Test
@DisplayName("Consumer gets cached catalog with provider entry")
void requestCatalog_fulfillsPolicy_shouldReturnOffer() {

// arrange
PROVIDER.createAsset("test-asset");
var ap = PROVIDER.createPolicyDefinition(noConstraintPolicy());
var cp = PROVIDER.createPolicyDefinition(noConstraintPolicy());
PROVIDER.createContractDefinition("test-asset", "test-def", ap, cp);


await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
// act
CONSUMER.getFederatedCatalog()
.log().ifError()
.statusCode(200)
.contentType(JSON)
.body("size()", is(1))
.body("[0].'http://www.w3.org/ns/dcat#dataset'.'@id'", equalTo("test-asset"));
});
}
}

static class TestTargetNodeDirectory implements TargetNodeDirectory {

private final List<TransferParticipant> participants;

TestTargetNodeDirectory(List<TransferParticipant> participants) {
this.participants = participants;
}

@Override
public List<TargetNode> getAll() {
return participants.stream()
.map(p -> new TargetNode(p.getDid(), p.getBpn(), p.getProtocolEndpoint().getUrl().toString(), List.of("dataspace-protocol-http")))
.collect(Collectors.toList());
}

@Override
public void insert(TargetNode node) {

}
}

@Nested
@EndToEndTest
class InMemory extends Tests {

@RegisterExtension
protected static final RuntimeExtension CONSUMER_RUNTIME = memoryRuntime(CONSUMER.getName(), CONSUMER.getBpn(), CONSUMER.getConfiguration());

@RegisterExtension
protected static final RuntimeExtension PROVIDER_RUNTIME = memoryRuntime(PROVIDER.getName(), PROVIDER.getBpn(), PROVIDER.getConfiguration());


static {
CONSUMER_RUNTIME.registerServiceMock(TargetNodeDirectory.class, new TestTargetNodeDirectory(List.of(PROVIDER)));
}
}

@Nested
@PostgresqlIntegrationTest
class Postgres extends Tests {

@RegisterExtension
protected static final RuntimeExtension CONSUMER_RUNTIME = pgRuntime(CONSUMER.getName(), CONSUMER.getBpn(), CONSUMER.getConfiguration());

@RegisterExtension
protected static final RuntimeExtension PROVIDER_RUNTIME = pgRuntime(PROVIDER.getName(), PROVIDER.getBpn(), PROVIDER.getConfiguration());

static {
CONSUMER_RUNTIME.registerServiceMock(TargetNodeDirectory.class, new TestTargetNodeDirectory(List.of(PROVIDER)));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ public abstract class TractusxParticipantBase extends IdentityParticipant {
private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control");
private final URI backendProviderProxy = URI.create("http://localhost:" + getFreePort() + "/events");
private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public");

protected ParticipantEdrApi edrs;
protected ParticipantDataApi data;
protected ParticipantConsumerDataPlaneApi dataPlane;

protected String did;
protected Endpoint federatedCatalog;

public void createAsset(String id) {
createAsset(id, new HashMap<>(), Map.of("type", "test-type"));
Expand All @@ -94,8 +93,8 @@ public Map<String, String> getConfiguration() {
put("web.http.management.path", managementEndpoint.getUrl().getPath());
put("web.http.control.port", String.valueOf(controlPlaneControl.getPort()));
put("web.http.control.path", controlPlaneControl.getPath());
put("web.http.catalog.port", String.valueOf(getFreePort()));
put("web.http.catalog.path", "/api/catalog");
put("web.http.catalog.port", String.valueOf(federatedCatalog.getUrl().getPort()));
put("web.http.catalog.path", federatedCatalog.getUrl().getPath());
put("edc.dsp.callback.address", protocolEndpoint.getUrl().toString());
put("edc.api.auth.key", MANAGEMENT_API_KEY);
put("web.http.public.path", "/api/public");
Expand All @@ -118,6 +117,8 @@ public Map<String, String> getConfiguration() {
put("tx.edc.iam.sts.dim.url", "http://sts.example.com");
put("tx.edc.iam.iatp.bdrs.server.url", "http://sts.example.com");
put("edc.dataplane.api.public.baseurl", "http://localhost:%d/api/public/v2/data".formatted(dataPlanePublic.getPort()));
put("edc.catalog.cache.execution.delay.seconds", "2");
put("edc.catalog.cache.execution.period.seconds", "2");
}
};
}
Expand Down Expand Up @@ -192,11 +193,11 @@ public ValidatableResponse getCatalog(TractusxParticipantBase provider) {
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "CatalogRequest")
.add("counterPartyId", provider.id)
.add("counterPartyAddress", provider.protocolEndpoint.getUrl().toString())
.add("counterPartyAddress", provider.federatedCatalog.getUrl().toString())
.add("protocol", protocol);


return managementEndpoint.baseRequest()
return provider.federatedCatalog.baseRequest()
.contentType(JSON)
.when()
.body(requestBodyBuilder.build())
Expand All @@ -205,6 +206,21 @@ public ValidatableResponse getCatalog(TractusxParticipantBase provider) {

}

public ValidatableResponse getFederatedCatalog() {
var requestBodyBuilder = createObjectBuilder()
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "QuerySpec");


return federatedCatalog.baseRequest()
.contentType(JSON)
.when()
.body(requestBodyBuilder.build())
.post("/v1alpha/catalog/query")
.then();

}

public static class Builder<P extends TractusxParticipantBase, B extends Builder<P, B>> extends Participant.Builder<P, B> {

protected Builder(P participant) {
Expand All @@ -222,6 +238,7 @@ public TractusxParticipantBase build() {
participant.did = "did:web:" + participant.name.toLowerCase();
}

participant.federatedCatalog = new Endpoint(URI.create("http://localhost:" + getFreePort() + "/api/catalog"), Map.of("x-api-key", MANAGEMENT_API_KEY));
super.managementEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/api/management"), Map.of("x-api-key", MANAGEMENT_API_KEY)));
super.protocolEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/protocol")));
super.timeout(ASYNC_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class PgRuntimeExtension extends ParticipantRuntimeExtension {
private static final String PASSWORD = "password";
private static final List<String> DATASOURCES = List.of("asset", "contractdefinition",
"contractnegotiation", "policy", "transferprocess", "bpn",
"policy-monitor", "edr", "dataplane", "accesstokendata");
"policy-monitor", "edr", "dataplane", "accesstokendata", "federatedcatalog");
private final PostgreSQLContainer<?> postgreSqlContainer;
private final String dbName;

Expand Down Expand Up @@ -95,15 +95,15 @@ public String jdbcUrl(String name) {
return baseJdbcUrl() + name + "?currentSchema=" + DB_SCHEMA_NAME;
}

public String baseJdbcUrl() {
return format("jdbc:postgresql://%s:%s/", postgreSqlContainer.getHost(), postgreSqlContainer.getFirstMappedPort());
}

private void createDatabase() {
try (var connection = DriverManager.getConnection(baseJdbcUrl() + "postgres", postgreSqlContainer.getUsername(), postgreSqlContainer.getPassword())) {
connection.createStatement().execute(String.format("create database %s;", postgreSqlContainer.getDatabaseName()));
} catch (SQLException ignored) {

}
}

public String baseJdbcUrl() {
return format("jdbc:postgresql://%s:%s/", postgreSqlContainer.getHost(), postgreSqlContainer.getFirstMappedPort());
}
}
Loading

0 comments on commit b5370e5

Please sign in to comment.