Skip to content

Commit

Permalink
Merge pull request datakaveri#456 from ananjaykumar2/subscription/app…
Browse files Browse the repository at this point in the history
…end-query-extension

modify append subscription & subscription auditing flow
  • Loading branch information
pranavrd authored Nov 23, 2023
2 parents a0d9e64 + b048421 commit b9c0074
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 105 deletions.
156 changes: 82 additions & 74 deletions src/main/java/iudx/resource/server/apiserver/ApiServerVerticle.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,85 @@
package iudx.resource.server.apiserver;

import static iudx.resource.server.apiserver.response.ResponseUtil.generateResponse;
import static iudx.resource.server.apiserver.util.Constants.*;
import static iudx.resource.server.apiserver.util.Constants.ADMIN;
import static iudx.resource.server.apiserver.util.Constants.API;
import static iudx.resource.server.apiserver.util.Constants.API_ENDPOINT;
import static iudx.resource.server.apiserver.util.Constants.APPLICATION_JSON;
import static iudx.resource.server.apiserver.util.Constants.CONTENT_TYPE;
import static iudx.resource.server.apiserver.util.Constants.DID;
import static iudx.resource.server.apiserver.util.Constants.DRL;
import static iudx.resource.server.apiserver.util.Constants.ENCODED_CIPHER_TEXT;
import static iudx.resource.server.apiserver.util.Constants.ENCODED_KEY;
import static iudx.resource.server.apiserver.util.Constants.ENCRYPTED_DATA;
import static iudx.resource.server.apiserver.util.Constants.ENDT;
import static iudx.resource.server.apiserver.util.Constants.ENTITY_QUERY;
import static iudx.resource.server.apiserver.util.Constants.HEADER_ACCEPT;
import static iudx.resource.server.apiserver.util.Constants.HEADER_ALLOW_ORIGIN;
import static iudx.resource.server.apiserver.util.Constants.HEADER_CONTENT_LENGTH;
import static iudx.resource.server.apiserver.util.Constants.HEADER_CONTENT_TYPE;
import static iudx.resource.server.apiserver.util.Constants.HEADER_HOST;
import static iudx.resource.server.apiserver.util.Constants.HEADER_OPTIONS;
import static iudx.resource.server.apiserver.util.Constants.HEADER_ORIGIN;
import static iudx.resource.server.apiserver.util.Constants.HEADER_PUBLIC_KEY;
import static iudx.resource.server.apiserver.util.Constants.HEADER_REFERER;
import static iudx.resource.server.apiserver.util.Constants.HEADER_RESPONSE_FILE_FORMAT;
import static iudx.resource.server.apiserver.util.Constants.HEADER_TOKEN;
import static iudx.resource.server.apiserver.util.Constants.ID;
import static iudx.resource.server.apiserver.util.Constants.IID;
import static iudx.resource.server.apiserver.util.Constants.IUDXQUERY_OPTIONS;
import static iudx.resource.server.apiserver.util.Constants.JSON_ALIAS;
import static iudx.resource.server.apiserver.util.Constants.JSON_CONSUMER;
import static iudx.resource.server.apiserver.util.Constants.JSON_COUNT;
import static iudx.resource.server.apiserver.util.Constants.JSON_ID;
import static iudx.resource.server.apiserver.util.Constants.JSON_INSTANCEID;
import static iudx.resource.server.apiserver.util.Constants.JSON_NAME;
import static iudx.resource.server.apiserver.util.Constants.JSON_SEARCH_TYPE;
import static iudx.resource.server.apiserver.util.Constants.JSON_TITLE;
import static iudx.resource.server.apiserver.util.Constants.JSON_TYPE;
import static iudx.resource.server.apiserver.util.Constants.LIMITPARAM;
import static iudx.resource.server.apiserver.util.Constants.MIME_APPLICATION_JSON;
import static iudx.resource.server.apiserver.util.Constants.MIME_TEXT_HTML;
import static iudx.resource.server.apiserver.util.Constants.MSG_INVALID_NAME;
import static iudx.resource.server.apiserver.util.Constants.MSG_SUB_TYPE_NOT_FOUND;
import static iudx.resource.server.apiserver.util.Constants.NO_CONTENT;
import static iudx.resource.server.apiserver.util.Constants.OFFSETPARAM;
import static iudx.resource.server.apiserver.util.Constants.RESET_PWD;
import static iudx.resource.server.apiserver.util.Constants.RESOURCE_GROUP;
import static iudx.resource.server.apiserver.util.Constants.RESPONSE_SIZE;
import static iudx.resource.server.apiserver.util.Constants.ROUTE_DOC;
import static iudx.resource.server.apiserver.util.Constants.ROUTE_STATIC_SPEC;
import static iudx.resource.server.apiserver.util.Constants.STARTT;
import static iudx.resource.server.apiserver.util.Constants.SUBSCRIPTION_ID;
import static iudx.resource.server.apiserver.util.Constants.SUB_TYPE;
import static iudx.resource.server.apiserver.util.Constants.USER_ID;
import static iudx.resource.server.apiserver.util.Util.errorResponse;
import static iudx.resource.server.authenticator.Constants.ROLE;
import static iudx.resource.server.cache.cachelmpl.CacheType.CATALOGUE_CACHE;
import static iudx.resource.server.common.Constants.*;
import static iudx.resource.server.common.Constants.CACHE_SERVICE_ADDRESS;
import static iudx.resource.server.common.Constants.PG_SERVICE_ADDRESS;
import static iudx.resource.server.common.HttpStatusCode.BAD_REQUEST;
import static iudx.resource.server.common.HttpStatusCode.NOT_FOUND;
import static iudx.resource.server.common.HttpStatusCode.UNAUTHORIZED;
import static iudx.resource.server.common.ResponseUrn.*;
import static iudx.resource.server.common.ResponseUrn.BACKING_SERVICE_FORMAT_URN;
import static iudx.resource.server.common.ResponseUrn.INVALID_PARAM_URN;
import static iudx.resource.server.common.ResponseUrn.INVALID_TEMPORAL_PARAM_URN;
import static iudx.resource.server.common.ResponseUrn.MISSING_TOKEN_URN;
import static iudx.resource.server.common.ResponseUrn.RESOURCE_NOT_FOUND_URN;
import static iudx.resource.server.common.ResponseUrn.YET_NOT_IMPLEMENTED_URN;
import static iudx.resource.server.common.ResponseUrn.fromCode;
import static iudx.resource.server.database.archives.Constants.ITEM_TYPES;
import static iudx.resource.server.databroker.util.Constants.ENTITIES;
import static iudx.resource.server.metering.util.Constants.*;
import static iudx.resource.server.metering.util.Constants.DELEGATOR_ID;
import static iudx.resource.server.metering.util.Constants.EPOCH_TIME;
import static iudx.resource.server.metering.util.Constants.ISO_TIME;
import static iudx.resource.server.metering.util.Constants.PROVIDER_ID;
import static iudx.resource.server.metering.util.Constants.TYPE_KEY;

import io.netty.handler.codec.http.HttpConstants;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.vertx.core.*;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
Expand Down Expand Up @@ -81,21 +139,23 @@
* <p>The API Server verticle implements the IUDX Resource Server APIs. It handles the API requests
* from the clients and interacts with the associated Service to respond.
*
* @version 1.0
* @see io.vertx.core.Vertx
* @see io.vertx.core.AbstractVerticle
* @see io.vertx.core.http.HttpServer
* @see io.vertx.ext.web.Router
* @see io.vertx.servicediscovery.ServiceDiscovery
* @see io.vertx.servicediscovery.types.EventBusService
* @see io.vertx.spi.cluster.hazelcast.HazelcastClusterManager
* @version 1.0
* @since 2020-05-31
*/
public class ApiServerVerticle extends AbstractVerticle {

private static final Logger LOGGER = LogManager.getLogger(ApiServerVerticle.class);

/** Service addresses */
/**
* Service addresses
*/
private static final String DATABASE_SERVICE_ADDRESS = "iudx.rs.database.service";

private static final String BROKER_SERVICE_ADDRESS = "iudx.rs.broker.service";
Expand Down Expand Up @@ -768,7 +828,7 @@ public void handlePostEntitiesQuery(RoutingContext routingContext) {
/**
* Execute a count query in DB
*
* @param json valid json query
* @param json valid json query
* @param response HttpServerResponse
*/
private void executeCountQuery(
Expand Down Expand Up @@ -813,7 +873,7 @@ private void executeCountQuery(
/**
* Execute a search query in DB
*
* @param json valid json query
* @param json valid json query
* @param response HttpServerResponse
*/
private void executeSearchQuery(
Expand Down Expand Up @@ -1024,18 +1084,8 @@ private void handleSubscriptions(RoutingContext routingContext) {
subHandler -> {
if (subHandler.succeeded()) {
LOGGER.info("Success: Handle Subscription request;");
JsonObject object =subHandler.result().getJsonArray("results").getJsonObject(0);
routingContext.data().put(RESPONSE_SIZE, 0);

JsonObject message =
new JsonObject()
.put(JSON_EVENT_TYPE, EVENTTYPE_CREATED)
.put(USER_ID, jsonObj.getString(USER_ID))
.put(SUBSCRIPTION_ID, object.getString(ID))
.put(SUB_TYPE, subscriptionType)
.put("resource", jsonObj.getJsonArray(ENTITIES).getString(0));

Future.future(fu -> updateAuditTable(message));
Future.future(fu -> updateAuditTable(routingContext));
handleSuccessResponse(
response, ResponseType.Created.getCode(), subHandler.result().toString());
} else {
Expand Down Expand Up @@ -1081,23 +1131,24 @@ private void appendSubscription(RoutingContext routingContext) {
JsonObject jsonObj = requestJson.copy();
jsonObj.put(USER_ID, authInfo.getString(USER_ID));
Future<JsonObject> subsReq =
subsService.appendSubscription(jsonObj, databroker, postgresService, authInfo);
subsService.appendSubscription(jsonObj, databroker, postgresService, authInfo,
cacheService);
subsReq.onComplete(
subsRequestHandler -> {
if (subsRequestHandler.succeeded()) {
LOGGER.debug("Success: Appending subscription");
routingContext.data().put(RESPONSE_SIZE, 0);

JsonObject message =
/*JsonObject message =
new JsonObject()
.put("resource", jsonObj.getJsonArray(ENTITIES).getString(0))
.put(SUBSCRIPTION_ID, jsonObj.getString(SUBSCRIPTION_ID))
.put(SUB_TYPE, jsonObj.getString(SUB_TYPE))
.put(USER_ID, jsonObj.getString(USER_ID))
.put(SUBSCRIPTION_ID, subsId)
.put(JSON_EVENT_TYPE, EVENTTYPE_APPEND);
.put(JSON_EVENT_TYPE, EVENTTYPE_APPEND);*/

Future.future(fu -> updateAuditTable(message));
Future.future(fu -> updateAuditTable(routingContext));
handleSuccessResponse(
response,
ResponseType.Created.getCode(),
Expand Down Expand Up @@ -1147,17 +1198,7 @@ private void updateSubscription(RoutingContext routingContext) {
if (subsRequestHandler.succeeded()) {
LOGGER.info("result : " + subsRequestHandler.result());
routingContext.data().put(RESPONSE_SIZE, 0);

JsonObject message =
new JsonObject()
.put("resource", jsonObj.getJsonArray(ENTITIES).getString(0))
.put(SUB_TYPE, subscriptionType)
.put(SUBSCRIPTION_ID, jsonObj.getString(SUBSCRIPTION_ID))
.put(USER_ID, jsonObj.getString(USER_ID))
.put(SUBSCRIPTION_ID, subsId)
.put(JSON_EVENT_TYPE, EVENTTYPE_UPDATE);

Future.future(fu -> updateAuditTable(message));
Future.future(fu -> updateAuditTable(routingContext));
handleSuccessResponse(
response, ResponseType.Created.getCode(), subsRequestHandler.result().toString());
} else {
Expand Down Expand Up @@ -1294,15 +1335,7 @@ private void deleteSubscription(RoutingContext routingContext) {
subHandler -> {
if (subHandler.succeeded()) {
routingContext.data().put(RESPONSE_SIZE, 0);

JsonObject message =
new JsonObject()
.put(SUBSCRIPTION_ID, jsonObj.getString(SUBSCRIPTION_ID))
.put(USER_ID, jsonObj.getString(USER_ID))
.put(SUB_TYPE, jsonObj.getString(SUB_TYPE))
.put(JSON_EVENT_TYPE, EVENTTYPE_DELETED);

Future.future(fu -> updateAuditTable(message));
Future.future(fu -> updateAuditTable(routingContext));
handleSuccessResponse(
response, ResponseType.Ok.getCode(), subHandler.result().toString());
} else {
Expand Down Expand Up @@ -1421,7 +1454,7 @@ public void getAdapterDetails(RoutingContext routingContext) {
* publish heartbeat details to Rabbit MQ.
*
* @param routingContext routingContext Note: This is too frequent an operation to have info or
* error level logs
* error level logs
*/
public void publishHeartbeat(RoutingContext routingContext) {
LOGGER.trace("Info: publishHeartbeat method starts;");
Expand Down Expand Up @@ -1544,9 +1577,9 @@ private void getAllAdaptersForUsers(RoutingContext routingContext) {
/**
* handle HTTP response.
*
* @param response HttpServerResponse object
* @param response HttpServerResponse object
* @param statusCode Http status code for response
* @param result String of response
* @param result String of response
*/
private void handleSuccessResponse(HttpServerResponse response, int statusCode, String result) {
response.putHeader(CONTENT_TYPE, APPLICATION_JSON).setStatusCode(statusCode).end(result);
Expand Down Expand Up @@ -1593,7 +1626,7 @@ private void handleResponse(
* part of the parameter</i>.
*
* @param routingContext RoutingContext Object
* @param response HttpServerResponse
* @param response HttpServerResponse
* @return Optional Optional of Map
*/
private Optional<MultiMap> getQueryParams(
Expand Down Expand Up @@ -1688,31 +1721,6 @@ private Future<Void> updateAuditTable(RoutingContext context) {
return promise.future();
}

/**
* Sends the message from Subscription APIs to Auditing Server by sending messages to RMQ Exchange
* named auditing and by adding origin field in the message for subscription related further
* processing in the auditing server
*
* @param message JsonObject message to be sent to the exchange
* @return Future object
*/
private Future<Void> updateAuditTable(JsonObject message) {
message.put(ORIGIN, ORIGIN_SERVER_SUBSCRIPTION);
Promise<Void> promise = Promise.promise();
meteringService.insertMeteringValuesInRmq(
message,
handler -> {
if (handler.succeeded()) {
LOGGER.info("message published in RMQ.");
promise.complete();
} else {
LOGGER.error("failed to publish message in RMQ.");
promise.complete();
}
});
return promise.future();
}

private Future<JsonObject> getEntityName(JsonObject request) {
Promise<JsonObject> promise = Promise.promise();
String getEntityNameQuery = ENTITY_QUERY.replace("$0", request.getString(SUBSCRIPTION_ID));
Expand Down
Loading

0 comments on commit b9c0074

Please sign in to comment.