Skip to content

Commit

Permalink
Updates for DSP protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmarino committed Apr 18, 2024
1 parent 7a7c5d2 commit 45d6930
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package org.eclipse.dataspacetck.core.api.message;

import com.apicatalog.jsonld.JsonLd;
import com.apicatalog.jsonld.JsonLdError;
import com.apicatalog.jsonld.document.JsonDocument;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -25,8 +24,13 @@
import jakarta.json.JsonObject;
import jakarta.json.JsonStructure;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import static com.apicatalog.jsonld.JsonLd.compact;
import static com.apicatalog.jsonld.JsonLd.expand;

/**
* Provides a configured {@link ObjectMapper} for serializing and deserializing JSON-LD messages.
*/
Expand All @@ -49,18 +53,30 @@ public void setupModule(SetupContext context) {

public static String serialize(Object object) {
try {
return MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
var compacted = compact(JsonDocument.of(MAPPER.convertValue(object, JsonObject.class)), EMPTY_CONTEXT).get();
return MAPPER.writeValueAsString(compacted);
} catch (JsonProcessingException | JsonLdError e) {
throw new RuntimeException(e);
}
}

public static Map<String, Object> processJsonLd(InputStream stream) {
try {
return processJsonLd(JsonDocument.of(MAPPER.readValue(stream, JsonObject.class)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
public static Map<String, Object> processJsonLd(Map<String, Object> message) {
var document = JsonDocument.of(MAPPER.convertValue(message, JsonObject.class));
return processJsonLd(JsonDocument.of(MAPPER.convertValue(message, JsonObject.class)));
}

@SuppressWarnings("unchecked")
private static Map<String, Object> processJsonLd(JsonDocument document) {
try {
var expanded = JsonLd.expand(document).get();
var compacted = JsonLd.compact(JsonDocument.of(MAPPER.convertValue(expanded.getFirst(), JsonObject.class)), EMPTY_CONTEXT).get();
var expanded = expand(document).get().getFirst();
var compacted = compact(JsonDocument.of(MAPPER.convertValue(expanded, JsonObject.class)), EMPTY_CONTEXT).get();
return MAPPER.convertValue(compacted, Map.class);
} catch (JsonLdError e) {
throw new RuntimeException(e);
Expand All @@ -69,4 +85,6 @@ public static Map<String, Object> processJsonLd(Map<String, Object> message) {

private MessageSerializer() {
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
*/
@SuppressWarnings("unused")
public class DspSystemLauncher implements SystemLauncher {
private static final String CVF_LOCAL_CONNECTOR = "dataspacetck.dsp.local.connector";
private static final String CVF_THREAD_POOL = "dataspacetck.dsp.thread.pool";
private static final String LOCAL_CONNECTOR_CONFIG = "dataspacetck.dsp.local.connector";
private static final String CONNECTOR_BASE_URL_CONFIG = "dataspacetck.dsp.connector.http.url";
private static final String THREAD_POOL_CONFIG = "dataspacetck.dsp.thread.pool";

private ExecutorService executor;
private String baseConnectorUrl;
private boolean useLocalConnector;

private Map<String, Connector> clientConnectors = new ConcurrentHashMap<>();
Expand All @@ -55,8 +57,14 @@ public class DspSystemLauncher implements SystemLauncher {

@Override
public void start(SystemConfiguration configuration) {
executor = newFixedThreadPool(configuration.getPropertyAsInt(CVF_THREAD_POOL, 10));
useLocalConnector = configuration.getPropertyAsBoolean(CVF_LOCAL_CONNECTOR, false);
executor = newFixedThreadPool(configuration.getPropertyAsInt(THREAD_POOL_CONFIG, 10));
useLocalConnector = configuration.getPropertyAsBoolean(LOCAL_CONNECTOR_CONFIG, false);
if (!useLocalConnector) {
baseConnectorUrl = configuration.getPropertyAsString(CONNECTOR_BASE_URL_CONFIG, null);
if (baseConnectorUrl == null) {
throw new RuntimeException("Required configuration not set: " + CONNECTOR_BASE_URL_CONFIG);
}
}
}

@Override
Expand Down Expand Up @@ -114,7 +122,7 @@ private NegotiationClient createNegotiationClient(String scopeId) {
if (useLocalConnector) {
return new NegotiationClientImpl(providerConnectors.computeIfAbsent(scopeId, k2 -> new Connector()));
}
return new NegotiationClientImpl();
return new NegotiationClientImpl(baseConnectorUrl);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_CONSUMER_PID_EXPANDED;
import static org.eclipse.dataspacetck.dsp.system.api.message.MessageFunctions.createOfferAck;
import static org.eclipse.dataspacetck.dsp.system.api.message.MessageFunctions.stringProperty;

/**
Expand Down Expand Up @@ -91,10 +92,11 @@ public ContractNegotiation createNegotiation(String datasetId) {
/**
* Processes an offer received from the provider.
*/
public void handleProviderOffer(Map<String, Object> offer) {
public Map<String, Object> handleProviderOffer(Map<String, Object> offer) {
var id = stringProperty(DSPACE_PROPERTY_CONSUMER_PID_EXPANDED, offer);
var negotiation = findById(id);
negotiation.storeOffer(offer, ContractNegotiation.State.OFFERED);
return createOfferAck(negotiation.getCorrelationId(), negotiation.getId(), ContractNegotiation.State.OFFERED);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,38 @@ public static Response postJson(String url, Object message) {
var requestBody = RequestBody.create(serialize(message), MediaType.get("application/json"));
var httpRequest = new Request.Builder()
.url(url)
.header("Authorization", "{\"region\": \"any\", \"audience\": \"any\", \"clientId\":\"any\"}") // WORKAROUND: REMOVE - claims
.post(requestBody)
.build();

var httpClient = new OkHttpClient.Builder().build();
try {
var response = httpClient.newCall(httpRequest).execute();
if (404 == response.code()) {
throw new RuntimeException("Unexpected callback received: " + url);
throw new AssertionError("Unexpected 404 received for request: " + url);
} else if (!response.isSuccessful()) {
throw new RuntimeException("Unexpected response code: " + response.code());
throw new AssertionError("Unexpected response code: " + response.code());
}
return response;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Response getJson(String url) {
var httpRequest = new Request.Builder()
.url(url)
.header("Authorization", "{\"region\": \"any\", \"audience\": \"any\", \"clientId\":\"any\"}")
.get()
.build();

var httpClient = new OkHttpClient.Builder().build();
try {
var response = httpClient.newCall(httpRequest).execute();
if (404 == response.code()) {
throw new AssertionError("Unexpected 404 received for request: " + url);
} else if (!response.isSuccessful()) {
throw new AssertionError("Unexpected response code: " + response.code());
}
return response;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface DspConstants {

String DSPACE_PROPERTY_STATE = DSPACE_NAMESPACE_PREFIX + "state";

String DSPACE_PROPERTY_STATE_EXPANDED = DSPACE_NAMESPACE + "state";

String DSPACE_PROPERTY_EVENT_TYPE = DSPACE_NAMESPACE_PREFIX + "eventType";

String DSPACE_PROPERTY_CALLBACK_ADDRESS = DSPACE_NAMESPACE_PREFIX + "callbackAddress";
Expand All @@ -56,6 +58,4 @@ public interface DspConstants {

String DSPACE_PROPERTY_OFFER_EXPANDED = DSPACE_NAMESPACE + "offer";

String DSPACE_PROPERTY_TARGET = DSPACE_NAMESPACE_PREFIX + "target";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.eclipse.dataspacetck.dsp.system.api.message;

import org.eclipse.dataspacetck.dsp.system.api.statemachine.ContractNegotiation.State;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
Expand All @@ -38,7 +39,6 @@
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_PROVIDER_PID;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_REASON;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_STATE;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_TARGET;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.ID;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.TYPE;
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_AGREEMENT_TYPE;
Expand All @@ -48,6 +48,7 @@
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_PROPERTY_ACTION;
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_PROPERTY_CONSTRAINTS;
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_PROPERTY_PERMISSION;
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_PROPERTY_TARGET;
import static org.eclipse.dataspacetck.dsp.system.api.message.OdrlConstants.ODRL_USE;

/**
Expand All @@ -62,9 +63,10 @@ public static Map<String, Object> createContractRequest(String consumerPid, Stri

var offer = new LinkedHashMap<String, Object>();
offer.put(ID, offerId);
offer.put(ODRL_PROPERTY_TARGET, targetId);
offer.put(TYPE, ODRL_NAMESPACE + "Offer"); // WORKAROUND: REMOVE - @type

message.put(DSPACE_PROPERTY_OFFER, offer);
message.put(DSPACE_PROPERTY_TARGET, targetId);

message.put(DSPACE_PROPERTY_CALLBACK_ADDRESS, callbackAddress);

Expand All @@ -83,7 +85,7 @@ public static Map<String, Object> createCounterOffer(String providerId, String c
}

public static Map<String, Object> createTermination(String providerId, String consumerId, String code, String... reasons) {
var message = createBaseMessage(DSPACE_NAMESPACE_PREFIX + "ContractNegotiationTermination");
var message = createBaseMessage(DSPACE_NAMESPACE_PREFIX + "ContractNegotiationTerminationMessage");
message.put(CONTEXT, createContext());

message.put(DSPACE_PROPERTY_PROVIDER_PID, providerId);
Expand Down Expand Up @@ -130,7 +132,7 @@ public static Map<String, Object> createOffer(String providerId, String consumer
message.put(DSPACE_PROPERTY_CONSUMER_PID, consumerId);

var offer = new LinkedHashMap<String, Object>();
offer.put(TYPE, ODRL_OFFER_TYPE);
offer.put(TYPE, ODRL_OFFER_TYPE); // WORKAROUND: REMOVE @type
offer.put(ID, offerId);
var permissions = Map.of(ODRL_PROPERTY_ACTION, ODRL_USE, ODRL_PROPERTY_CONSTRAINTS, emptyList());
offer.put(ODRL_PROPERTY_PERMISSION, List.of(permissions));
Expand All @@ -151,23 +153,34 @@ public static Map<String, Object> createAgreement(String providerId, String cons
var offer = new LinkedHashMap<String, Object>();
offer.put(TYPE, ODRL_AGREEMENT_TYPE);
offer.put(ID, agreementId);
offer.put(DSPACE_PROPERTY_TARGET, target);
offer.put(ODRL_PROPERTY_TARGET, target);
offer.put(ODRL_PROPERTY_PERMISSION, List.of(permissions));

message.put(DSPACE_NAMESPACE_PREFIX + "agreement", offer);

return message;
}

public static Map<String, Object> createNegotiationResponse(String id, String state) {
public static Map<String, Object> createNegotiationResponse(String providerPid, String consumerPid, String state) {
var message = createBaseMessage(DSPACE_NAMESPACE_PREFIX + "ContractNegotiation");
var context = createContext();
message.put(CONTEXT, context);
message.put(ID, id);
message.put(DSPACE_PROPERTY_PROVIDER_PID, providerPid);
message.put(DSPACE_PROPERTY_CONSUMER_PID, consumerPid);
message.put(DSPACE_PROPERTY_STATE, state);
return message;
}

public static Map<String, Object> createOfferAck(String providerId, String consumerId, State state) {
var message = createBaseMessage(DSPACE_NAMESPACE_PREFIX + "ContractNegotiation");
var context = createContext();
message.put(CONTEXT, context);
message.put(DSPACE_PROPERTY_PROVIDER_PID, providerId);
message.put(DSPACE_PROPERTY_CONSUMER_PID, consumerId);
message.put(DSPACE_PROPERTY_STATE, state.toString());
return message;
}

public static Map<String, Object> mapProperty(String key, Map<String, Object> map) {
var value = requireNonNull(map.get(key), "No value for: " + key);
//noinspection unchecked
Expand All @@ -191,9 +204,12 @@ private static Map<String, Object> createContext() {
var context = new LinkedHashMap<String, Object>();
context.put(DSPACE_NAMESPACE_KEY, DSPACE_NAMESPACE);
context.put(ODRL_NAMESPACE_KEY, ODRL_NAMESPACE);
context.put("target", "odrl:target");
context.put("odrl:target", Map.of("@type", "@id"));
return context;
}

private MessageFunctions() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@
* ODRL constants.
*/
public interface OdrlConstants {

String ODRL_NAMESPACE = "http://www.w3.org/ns/odrl/2/";

String ODRL_NAMESPACE_KEY = "odrl";

String ODRL_NAMESPACE_PREFIX = ODRL_NAMESPACE_KEY + ":";

String ODRL_AGREEMENT_TYPE = ODRL_NAMESPACE_PREFIX + "Agreement";

String ODRL_OFFER_TYPE = ODRL_NAMESPACE_PREFIX + "Offer";

String ODRL_PROPERTY_ACTION = ODRL_NAMESPACE_PREFIX + "action";

String ODRL_USE = ODRL_NAMESPACE_PREFIX + "use";

String ODRL_PROPERTY_CONSTRAINTS = ODRL_NAMESPACE_PREFIX + "constraints";

String ODRL_PROPERTY_PERMISSION = ODRL_NAMESPACE_PREFIX + "permission";

String ODRL_PROPERTY_TARGET = ODRL_NAMESPACE_PREFIX + "target";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_NAMESPACE_PREFIX;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.ID;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_PROVIDER_PID_EXPANDED;
import static org.eclipse.dataspacetck.dsp.system.api.message.DspConstants.DSPACE_PROPERTY_STATE_EXPANDED;
import static org.eclipse.dataspacetck.dsp.system.api.message.MessageFunctions.createAcceptedEvent;
import static org.eclipse.dataspacetck.dsp.system.api.message.MessageFunctions.createContractRequest;
import static org.eclipse.dataspacetck.dsp.system.api.message.MessageFunctions.createCounterOffer;
Expand Down Expand Up @@ -80,7 +81,7 @@ public NegotiationPipeline sendRequest(String datasetId, String offerId, String
var contractRequest = createContractRequest(clientNegotiation.getId(), offerId, targetId, endpoint.getAddress());

var response = negotiationClient.contractRequest(contractRequest);
var correlationId = stringProperty(ID, response);
var correlationId = stringProperty(DSPACE_PROPERTY_PROVIDER_PID_EXPANDED, response);
connector.getConsumerNegotiationManager().contractRequested(clientNegotiation.getId(), correlationId);
});
return this;
Expand Down Expand Up @@ -140,13 +141,13 @@ public NegotiationPipeline thenWait(String description, Callable<Boolean> condit
return this;
}

public NegotiationPipeline expectOffer(Consumer<Map<String, Object>> action) {
public NegotiationPipeline expectOffer(Function<Map<String, Object>, Map<String, Object>> action) {
stages.add(() ->
endpoint.registerHandler(NEGOTIATIONS_OFFER_PATH, offer -> {
//noinspection unchecked
action.accept((Map<String, Object>) offer);
var negotiation = action.apply((Map<String, Object>) offer);
endpoint.deregisterHandler(NEGOTIATIONS_OFFER_PATH);
return null;
return negotiation;
}));
return this;
}
Expand Down Expand Up @@ -202,7 +203,8 @@ public NegotiationPipeline thenVerifyState(ContractNegotiation.State state) {
public NegotiationPipeline thenVerifyProviderState(ContractNegotiation.State state) {
stages.add(() -> {
var providerNegotiation = negotiationClient.getNegotiation(clientNegotiation.getCorrelationId());
assertEquals(state, ContractNegotiation.State.valueOf(stringProperty(DSPACE_NAMESPACE_PREFIX + "state", providerNegotiation).toUpperCase())); // TODO JSON-LD
var actual = stringProperty(DSPACE_PROPERTY_STATE_EXPANDED, providerNegotiation);
assertEquals(state.toString(), actual);
});
return this;
}
Expand Down
Loading

0 comments on commit 45d6930

Please sign in to comment.