diff --git a/src/main/java/iudx/resource/server/apiserver/subscription/CallbackSubscription.java b/src/main/java/iudx/resource/server/apiserver/subscription/CallbackSubscription.java deleted file mode 100644 index 56d29badd..000000000 --- a/src/main/java/iudx/resource/server/apiserver/subscription/CallbackSubscription.java +++ /dev/null @@ -1,125 +0,0 @@ -package iudx.resource.server.apiserver.subscription; - -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.core.json.JsonObject; -import iudx.resource.server.databroker.DataBrokerService; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** class containing methods to create callback subscriptions in system. */ -public class CallbackSubscription implements Subscription { - - private static final Logger LOGGER = LogManager.getLogger(CallbackSubscription.class); - - private DataBrokerService databroker; - - public CallbackSubscription(DataBrokerService databroker) { - this.databroker = databroker; - } - - /** - * create a callback subscription. - * - *

{@inheritDoc} - */ - @Override - public Future create(JsonObject subscription) { - LOGGER.info("callback create() method started"); - Promise promise = Promise.promise(); - databroker.registerCallbackSubscription( - subscription, - handler -> { - if (handler.succeeded()) { - promise.complete(handler.result()); - } else { - promise.fail(handler.cause().getMessage()); - } - }); - return promise.future(); - } - - /** - * update a callback subscription. - * - *

{@inheritDoc} - */ - @Override - public Future update(JsonObject subscription) { - LOGGER.info("callback update() method started"); - Promise promise = Promise.promise(); - databroker.updateCallbackSubscription( - subscription, - handler -> { - if (handler.succeeded()) { - promise.complete(handler.result()); - } else { - promise.fail(handler.cause().getMessage()); - } - }); - return promise.future(); - } - - /** - * append a callback subscription. - * - *

{@inheritDoc} - */ - @Override - public Future append(JsonObject subscription) { - LOGGER.info("callback append() method started"); - Promise promise = Promise.promise(); - databroker.updateCallbackSubscription( - subscription, - handler -> { - if (handler.succeeded()) { - promise.complete(handler.result()); - } else { - promise.fail(handler.cause().getMessage()); - } - }); - return promise.future(); - } - - /** - * delete a callback subscription. - * - *

{@inheritDoc} - */ - @Override - public Future delete(JsonObject subscription) { - LOGGER.info("callback delete() method started"); - Promise promise = Promise.promise(); - databroker.deleteStreamingSubscription( - subscription, - handler -> { - if (handler.succeeded()) { - promise.complete(handler.result()); - } else { - promise.fail(handler.cause().getMessage()); - } - }); - return promise.future(); - } - - /** - * get a callback subscription. - * - *

{@inheritDoc} - */ - @Override - public Future get(JsonObject subscription) { - LOGGER.info("callback get() method started"); - Promise promise = Promise.promise(); - databroker.listCallbackSubscription( - subscription, - handler -> { - if (handler.succeeded()) { - promise.complete(handler.result()); - } else { - promise.fail(handler.cause().getMessage()); - } - }); - return promise.future(); - } -} diff --git a/src/main/java/iudx/resource/server/apiserver/subscription/SubsType.java b/src/main/java/iudx/resource/server/apiserver/subscription/SubsType.java index 101cce661..7201c130a 100644 --- a/src/main/java/iudx/resource/server/apiserver/subscription/SubsType.java +++ b/src/main/java/iudx/resource/server/apiserver/subscription/SubsType.java @@ -1,7 +1,6 @@ package iudx.resource.server.apiserver.subscription; public enum SubsType { - CALLBACK("CALLBACK"), STREAMING("STREAMING"); public final String type; @@ -9,5 +8,4 @@ public enum SubsType { SubsType(String type) { this.type = type; } - } diff --git a/src/main/java/iudx/resource/server/apiserver/subscription/SubscriptionService.java b/src/main/java/iudx/resource/server/apiserver/subscription/SubscriptionService.java index d89d2eb21..80e4986ef 100644 --- a/src/main/java/iudx/resource/server/apiserver/subscription/SubscriptionService.java +++ b/src/main/java/iudx/resource/server/apiserver/subscription/SubscriptionService.java @@ -29,19 +29,12 @@ public class SubscriptionService { /** * get the context of subscription according to the type passed in message body. * - * @param type type of subscription either streaming or callback * @param databroker databroker verticle object * @return an object of Subscription class */ - private Subscription getSubscriptionContext(SubsType type, DataBrokerService databroker) { - LOGGER.info("getSubscriptionContext() method started"); - if (type != null && type.equals(SubsType.CALLBACK)) { - LOGGER.info("callback subscription context"); - return new CallbackSubscription(databroker); - } else { - LOGGER.info("streaming subscription context"); - return new StreamingSubscription(databroker); - } + private Subscription getSubscriptionContext(DataBrokerService databroker) { + LOGGER.info("streaming subscription context"); + return new StreamingSubscription(databroker); } /** @@ -62,7 +55,7 @@ public Future createSubscription( Promise promise = Promise.promise(); SubsType subType = SubsType.valueOf(json.getString(SUB_TYPE)); if (subscription == null) { - subscription = getSubscriptionContext(subType, databroker); + subscription = getSubscriptionContext(databroker); } assertNotNull(subscription); subscription @@ -239,9 +232,8 @@ public Future deleteSubscription( JsonObject json, DataBrokerService databroker, PostgresService pgService) { LOGGER.info("deleteSubscription() method started"); Promise promise = Promise.promise(); - SubsType subType = SubsType.valueOf(json.getString(SUB_TYPE)); if (subscription == null) { - subscription = getSubscriptionContext(subType, databroker); + subscription = getSubscriptionContext(databroker); } assertNotNull(subscription); subscription @@ -284,9 +276,8 @@ public Future getSubscription( JsonObject json, DataBrokerService databroker, PostgresService pgService) { LOGGER.info("getSubscription() method started"); Promise promise = Promise.promise(); - SubsType subType = SubsType.valueOf(json.getString(SUB_TYPE)); if (subscription == null) { - subscription = getSubscriptionContext(subType, databroker); + subscription = getSubscriptionContext(databroker); } assertNotNull(subscription); subscription @@ -342,7 +333,7 @@ public Future appendSubscription( Promise promise = Promise.promise(); SubsType subType = SubsType.valueOf(json.getString(SUB_TYPE)); if (subscription == null) { - subscription = getSubscriptionContext(subType, databroker); + subscription = getSubscriptionContext(databroker); } assertNotNull(subscription); subscription diff --git a/src/main/java/iudx/resource/server/callback/CallbackService.java b/src/main/java/iudx/resource/server/callback/CallbackService.java deleted file mode 100644 index 602a472da..000000000 --- a/src/main/java/iudx/resource/server/callback/CallbackService.java +++ /dev/null @@ -1,74 +0,0 @@ -package iudx.resource.server.callback; - -import io.vertx.codegen.annotations.Fluent; -import io.vertx.codegen.annotations.GenIgnore; -import io.vertx.codegen.annotations.ProxyGen; -import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.json.JsonObject; - -/** - * - * - *

Callback Service

- * - *

The Callback Service. - * - *

The Callback Service in the IUDX Resource Server defines the operations to be performed with - * the IUDX Callback server. - * - * @version 1.0 - * @since 2020-05-31 - */ -@VertxGen -@ProxyGen -public interface CallbackService { - - @GenIgnore - static CallbackService createProxy(Vertx vertx, String address) { - return new CallbackServiceVertxEBProxy(vertx, address); - } - - /** - * The connectToCallbackNotificationQueue implements for getting message from - * "callback.notification" queue. - * - * @param request containing queueName. - * @return CallbackService which is a Service - */ - @Fluent - CallbackService connectToCallbackNotificationQueue( - JsonObject request, Handler> handler); - - /** - * The connectToCallbackDataQueue implements for getting message from "callback.data" queue. - * - * @param request containing queueName. - * @return CallbackService which is a Service - */ - @Fluent - CallbackService connectToCallbackDataQueue( - JsonObject request, Handler> handler); - - /** - * The queryCallBackDataBase implements for the query callBack database. - * - * @param request containing queueName. - * @return CallbackService which is a Service - */ - @Fluent - CallbackService queryCallBackDataBase( - JsonObject request, Handler> handler); - - /** - * The sendDataToCallBackSubscriber implements for sending data to callback database update info. - * - * @param request containing queueName. - * @return CallbackService which is a Service - */ - @Fluent - CallbackService sendDataToCallBackSubscriber( - JsonObject request, Handler> handler); -} diff --git a/src/main/java/iudx/resource/server/callback/CallbackServiceImpl.java b/src/main/java/iudx/resource/server/callback/CallbackServiceImpl.java deleted file mode 100644 index 5d144b883..000000000 --- a/src/main/java/iudx/resource/server/callback/CallbackServiceImpl.java +++ /dev/null @@ -1,715 +0,0 @@ -package iudx.resource.server.callback; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgPool; -import io.vertx.rabbitmq.QueueOptions; -import io.vertx.rabbitmq.RabbitMQClient; -import io.vertx.rabbitmq.RabbitMQConsumer; -import io.vertx.sqlclient.PoolOptions; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.SqlConnection; -import java.util.HashMap; -import org.apache.http.HttpStatus; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * - * - *

Callback Service Service Implementation.

- * - *

The Callback Service implementation in the IUDX Resource Server implements the definitions of - * the {@link iudx.resource.server.callback.CallbackService}. - * - * @version 1.0 - * @since 2020-07-15 - */ -public class CallbackServiceImpl implements CallbackService { - - private static final Logger LOGGER = LogManager.getLogger(CallbackServiceImpl.class); - static PgPool pgClient; - /* Cache */ - static HashMap pgCache; - private RabbitMQClient client; - private WebClient webClient; - private Vertx vertx; - private String databaseIp; - private int databasePort; - private String databaseName; - private String databaseUserName; - private String databasePassword; - private int databasePoolSize; - private PoolOptions poolOptions; - private PgConnectOptions connectOptions; - - /** - * This is a constructor which is used by the Callback Verticle to instantiate a RabbitMQ client. - * - * @param clientInstance which is a RabbitMQ client - * @param webClientInstance which is a Vertex Web client - * @param propObj which is a properties JsonObject - * @param vertxInstance which is a Vertx Instance - */ - public CallbackServiceImpl( - RabbitMQClient clientInstance, - WebClient webClientInstance, - JsonObject propObj, - Vertx vertxInstance) { - - LOGGER.trace("Got the RabbitMQ Client instance"); - client = clientInstance; - - JsonObject reqNotification = new JsonObject(); - reqNotification.put(Constants.QUEUE_NAME, "callback.notification"); - connectToCallbackNotificationQueue(reqNotification); - - JsonObject reqData = new JsonObject(); - reqData.put(Constants.QUEUE_NAME, "callback.data"); - connectToCallbackDataQueue(reqData); - - if (propObj != null && !propObj.isEmpty()) { - databaseIp = propObj.getString("callbackDatabaseIP"); - databasePort = propObj.getInteger("callbackDatabasePort"); - databaseName = propObj.getString("callbackDatabaseName"); - databaseUserName = propObj.getString("callbackDatabaseUserName"); - databasePassword = propObj.getString("callbackDatabasePassword"); - databasePoolSize = propObj.getInteger("callbackpoolSize"); - } - - webClient = webClientInstance; - vertx = vertxInstance; - } - - @Override - public CallbackService connectToCallbackNotificationQueue( - JsonObject request, Handler> handler) { - if (request != null && !request.isEmpty()) { - Future result = connectToCallbackNotificationQueue(request); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - handler.handle(Future.succeededFuture(resultHandler.result())); - } - if (resultHandler.failed()) { - LOGGER.error( - "connectToCallbackNotificationQueue resultHandler failed : " - + resultHandler.cause().getMessage().toString()); - handler.handle(Future.failedFuture(resultHandler.cause().getMessage().toString())); - } - }); - } - return this; - } - - /** - * connectToCallbackNotificationQueue Method. - * - *

This method execute tasks

- * - *
  • Connect to RabbitMQ callback.notification Queue (callback.notification) - *
  • Create RabbitMQConsumer for consuming queue messages - *
  • Get the database operation value from message - *
  • Query Database when database operation is create|update|delete - * - * @param request which is a JSON object - * @return response which is a Future object of promise of JSON type - */ - public Future connectToCallbackNotificationQueue(JsonObject request) { - JsonObject finalResponse = new JsonObject(); - Promise promise = Promise.promise(); - - if (request != null && !request.isEmpty()) { - /* Set Queue Options */ - QueueOptions options = - new QueueOptions().setMaxInternalQueueSize(1000).setKeepMostRecent(true); - /* Get Queue Name from request */ - String queueName = request.getString(Constants.QUEUE_NAME); - - client.start( - startHandler -> { - if (startHandler.succeeded()) { - /* Create a stream of messages from a queue */ - client.basicConsumer( - queueName, - options, - rabbitMQConsumerAsyncResult -> { - if (rabbitMQConsumerAsyncResult.succeeded()) { - RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result(); - mqConsumer.handler( - message -> { - /* Message from Queue */ - Buffer body = message.body(); - if (body != null) { - JsonObject currentBodyJsonObj = null; - String operation = null; - try { - /* Convert message body to JsonObject */ - currentBodyJsonObj = new JsonObject(body.toString()); - } catch (Exception e) { - LOGGER.error(Constants.JSON_PARSE_EXCEPTION, e.getCause()); - finalResponse.put( - Constants.MESSAGE, Constants.JSON_PARSE_EXCEPTION); - promise.fail(finalResponse.toString()); - } - - /* Get operation value from currentMessageJsonObj */ - operation = currentBodyJsonObj.getString(Constants.OPERATION); - - /* Check for operation */ - if (operation != null - && !operation.isEmpty() - && !operation.isBlank()) { - if (operation.equals(Constants.CREATE) - || operation.equals(Constants.UPDATE) - || operation.equals(Constants.DELETE)) { - - /* Create request object for Query DataBase */ - JsonObject requestObj = new JsonObject(); - requestObj.put(Constants.TABLE_NAME, "registercallback"); - - /* Query DataBase */ - Future result = queryCallBackDataBase(requestObj); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - LOGGER.debug( - Constants.DATABASE_QUERY_RESULT - + resultHandler.result()); - finalResponse.put( - Constants.DATABASE_QUERY_RESULT, - Constants.CACHE_UPDATE_SUCCESS); - } else { - LOGGER.error( - Constants.DATABASE_QUERY_RESULT + Constants.COLON, - resultHandler.cause()); - finalResponse.put( - Constants.DATABASE_QUERY_RESULT, - Constants.DATABASE_QUERY_FAIL); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error(Constants.DATABASE_OPERATION_INVALID); - finalResponse.put( - Constants.ERROR, Constants.DATABASE_OPERATION_INVALID); - promise.fail(finalResponse.toString()); - } - } else { - LOGGER.error(Constants.DATABASE_OPERATION_NOT_FOUND); - finalResponse.put( - Constants.ERROR, Constants.DATABASE_OPERATION_NOT_FOUND); - promise.fail(finalResponse.toString()); - } - } - }); - LOGGER.info(Constants.QUEUE_EMPTY); - finalResponse.put( - Constants.DATABASE_QUERY_RESULT, - Constants.CONNECT_TO_CALLBACK_NOTIFICATION_QUEUE); - promise.tryComplete(finalResponse); - /* - * Changed promise.complete(finalResponse) to - * promise.tryComplete(finalResponse) to avoid - * java.lang.IllegalStateException: Result is already complete - */ - } else { - LOGGER.error( - Constants.CONSUME_QUEUE_MESSAGE_FAIL + Constants.COLON + queueName); - LOGGER.error(Constants.ERROR + rabbitMQConsumerAsyncResult.cause()); - finalResponse.put( - Constants.ERROR, - Constants.CONSUME_QUEUE_MESSAGE_FAIL + Constants.COLON + queueName); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error(Constants.QUEUE_CONNECTION_FAIL + Constants.COLON + queueName); - finalResponse.put( - Constants.ERROR, Constants.QUEUE_CONNECTION_FAIL + Constants.COLON + queueName); - promise.fail(finalResponse.toString()); - } - }); - } - return promise.future(); - } - - @Override - public CallbackService connectToCallbackDataQueue( - JsonObject request, Handler> handler) { - if (request != null && !request.isEmpty()) { - Future result = connectToCallbackDataQueue(request); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - handler.handle(Future.succeededFuture(resultHandler.result())); - } - if (resultHandler.failed()) { - LOGGER.error( - "connectToCallbackDataQueue resultHandler failed : " - + resultHandler.cause().getMessage().toString()); - handler.handle(Future.failedFuture(resultHandler.cause().getMessage().toString())); - } - }); - } - return this; - } - - /** - * connectToCallbackDataQueue Method. - * - *

    This method execute tasks

    - * - *
  • Connect to RabbitMQ callback.data Queue (callback.data) - *
  • Create RabbitMQConsumer for consuming queue messages - *
  • Get the routing key of message - *
  • Get callbackUrl JsonObject from cache using routingKey - *
  • Send message data to callbackUrl - * - * @param request which is a JSON object - * @return response which is a Future object of promise of JSON type - */ - public Future connectToCallbackDataQueue(JsonObject request) { - - JsonObject finalResponse = new JsonObject(); - Promise promise = Promise.promise(); - - if (request != null && !request.isEmpty()) { - /* Set Queue Options */ - QueueOptions options = - new QueueOptions().setMaxInternalQueueSize(1000).setKeepMostRecent(true); - /* Get Queue Name from request */ - String queueName = request.getString(Constants.QUEUE_NAME); - - client.start( - startHandler -> { - if (startHandler.succeeded()) { - /* Create a stream of messages from a queue */ - client.basicConsumer( - queueName, - options, - rabbitMQConsumerAsyncResult -> { - if (rabbitMQConsumerAsyncResult.succeeded()) { - LOGGER.info(Constants.RABBITMQ_CONSUMER_CREATED); - RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result(); - mqConsumer.handler( - message -> { - /* Message from Queue */ - Buffer body = message.body(); - LOGGER.debug(Constants.MESSAGE + Constants.COLON + message.body()); - if (body != null) { - String routingKey = null; - JsonObject currentBodyJsonObj = null; - - /* Convert body message to JsonObject */ - try { - currentBodyJsonObj = new JsonObject(body.toString()); - } catch (Exception e) { - LOGGER.error(Constants.ERROR + Constants.COLON + e.getCause()); - finalResponse.put(Constants.ERROR, Constants.JSON_PARSE_EXCEPTION); - promise.fail(finalResponse.toString()); - } - - /* Get routingKey and currentMessageData from Message */ - routingKey = message.envelope().getRoutingKey(); - currentBodyJsonObj = new JsonObject(message.body().toString()); - - JsonObject callBackJsonObj = null; - - /* Get callback Object from Cache */ - callBackJsonObj = pgCache.get(routingKey); - - LOGGER.debug( - Constants.ROUTING_KEY - + Constants.COLON - + message.envelope().getRoutingKey()); - LOGGER.debug( - Constants.MESSAGE + Constants.COLON + currentBodyJsonObj); - - /* Creating Request Object */ - if (callBackJsonObj != null && !callBackJsonObj.isEmpty()) { - JsonObject requestObj = new JsonObject(); - requestObj.put(Constants.CALLBACK_JSON_OBJECT, callBackJsonObj); - requestObj.put( - Constants.CURRENT_MESSAGE_JSON_OBJECT, currentBodyJsonObj); - - /* Send data to callback Url */ - Future result = - sendDataToCallBackSubscriber(requestObj); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - LOGGER.debug( - Constants.CALLBACK_URL_RESPONSE - + Constants.COLON - + resultHandler.result()); - finalResponse.put( - Constants.SUCCESS, - Constants.DATA_SEND_TO_CALLBACK_URL_SUCCESS); - } else { - LOGGER.error( - Constants.CALLBACK_URL_RESPONSE - + resultHandler.cause()); - finalResponse.put( - Constants.ERROR, - Constants.DATA_SEND_TO_CALLBACK_URL_FAIL); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error( - Constants.NO_CALLBACK_URL_FOR_ROUTING_KEY - + Constants.COLON - + routingKey); - finalResponse.put( - Constants.ERROR, - Constants.NO_CALLBACK_URL_FOR_ROUTING_KEY + routingKey); - promise.fail(finalResponse.toString()); - } - } else { - LOGGER.error( - Constants.ERROR + Constants.COLON + Constants.MESSAGE_BODY_NULL); - finalResponse.put(Constants.ERROR, Constants.MESSAGE_BODY_NULL); - promise.fail(finalResponse.toString()); - } - }); - LOGGER.info(Constants.QUEUE_EMPTY); - finalResponse.put( - Constants.DATABASE_QUERY_RESULT, - Constants.CONNECT_TO_CALLBACK_DATA_QUEUE); - /* - * Changed promise.complete(finalResponse) to - * promise.tryComplete(finalResponse) to avoid - * java.lang.IllegalStateException: Result is already complete - */ - promise.tryComplete(finalResponse); - } else { - LOGGER.error( - Constants.ERROR - + Constants.CONSUME_QUEUE_MESSAGE_FAIL - + Constants.COLON - + queueName); - finalResponse.put( - Constants.ERROR, Constants.CONSUME_QUEUE_MESSAGE_FAIL + queueName); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error(Constants.QUEUE_CONNECTION_FAIL + Constants.COLON + queueName); - finalResponse.put(Constants.ERROR, Constants.QUEUE_CONNECTION_FAIL + queueName); - promise.fail(finalResponse.toString()); - } - }); - } - return promise.future(); - } - - @Override - public CallbackService sendDataToCallBackSubscriber( - JsonObject request, Handler> handler) { - if (request != null && !request.isEmpty()) { - Future result = sendDataToCallBackSubscriber(request); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - handler.handle(Future.succeededFuture(resultHandler.result())); - } - if (resultHandler.failed()) { - LOGGER.error( - "sendDataToCallBackSubscriber resultHandler failed : " - + resultHandler.cause().getMessage().toString()); - handler.handle(Future.failedFuture(resultHandler.cause().getMessage().toString())); - } - }); - } - return this; - } - - /** - * sendDataToCallBackSubscriber Method. - * - *

    This method execute tasks

    - * - *
  • Get callBackJsonObj and currentMessageJsonObj from request parameter - *
  • Get callBackUrl, userName and password from callBackJsonObj - *
  • Create instance of HttpRequest using webClient - *
  • Send message data [currentMessageJsonObj] to callbackUrl - * - * @param request which is a JSON object - * @return response which is a Future object of promise of JSON type - */ - public Future sendDataToCallBackSubscriber(JsonObject request) { - JsonObject finalResponse = new JsonObject(); - Promise promise = Promise.promise(); - - if (request != null && !request.isEmpty()) { - /* Getting entityObj */ - String callBackUrl = null; - String userName = null; - String password = null; - HttpRequest webRequest = null; - - JsonObject callBackJsonObj = request.getJsonObject(Constants.CALLBACK_JSON_OBJECT); - JsonObject currentMessageJsonObj = - request.getJsonObject(Constants.CURRENT_MESSAGE_JSON_OBJECT); - - if (callBackJsonObj != null && !callBackJsonObj.isEmpty()) { - callBackUrl = callBackJsonObj.getString(Constants.CALLBACK_URL); - userName = callBackJsonObj.getString(Constants.USER_NAME); - password = callBackJsonObj.getString(Constants.PASSWORD); - } - - try { - if (callBackUrl != null && !callBackUrl.isEmpty() && !callBackUrl.isBlank()) { - if (userName != null && password != null && !userName.isBlank() && !password.isBlank()) { - webRequest = - webClient.postAbs(callBackUrl.toString()).basicAuthentication(userName, password); - } else { - webRequest = webClient.postAbs(callBackUrl.toString()); - } - - if (webRequest != null) { - /* Set Request Header */ - webRequest.putHeader(Constants.CONTENT_TYPE, Constants.APPLICATION_JSON); - /* Send data to callback URL */ - webRequest.sendJsonObject( - currentMessageJsonObj, - handler -> { - if (handler.succeeded()) { - HttpResponse result = handler.result(); - if (result != null) { - int status = result.statusCode(); - if (status == HttpStatus.SC_OK) { - /* Callback URL 200 OK Status */ - String responseBody = result.bodyAsString(); - LOGGER.debug( - Constants.RESPONSE_BODY - + Constants.COLON - + Constants.NEW_LINE - + responseBody); - LOGGER.debug(Constants.STATUS + Constants.COLON + status); - finalResponse.put(Constants.TYPE, status); - finalResponse.put(Constants.TITLE, Constants.SUCCESS); - finalResponse.put(Constants.DETAIL, Constants.CALLBACK_SUCCESS); - LOGGER.debug(Constants.CALLBACK_URL_RESPONSE + finalResponse); - promise.complete(finalResponse); - } else if (status == HttpStatus.SC_NOT_FOUND) { - /* Callback URL not found */ - finalResponse.put(Constants.TYPE, status); - finalResponse.put(Constants.TITLE, Constants.FAILURE); - finalResponse.put(Constants.DETAIL, Constants.CALLBACK_URL_NOT_FOUND); - LOGGER.debug(Constants.CALLBACK_URL_RESPONSE + finalResponse); - promise.fail(finalResponse.toString()); - } else { - /* some other issue */ - finalResponse.put(Constants.TYPE, status); - finalResponse.put(Constants.TITLE, Constants.FAILURE); - finalResponse.put(Constants.DETAIL, result.statusMessage()); - LOGGER.debug(Constants.CALLBACK_URL_RESPONSE + finalResponse); - promise.fail(finalResponse.toString()); - } - } else { - LOGGER.error(Constants.ERROR + handler.cause().getMessage()); - finalResponse.put(Constants.ERROR, Constants.CALLBACK_URL_RESPONSE_NULL); - promise.fail(finalResponse.toString()); - } - } else { - LOGGER.error(Constants.ERROR + handler.cause().getMessage()); - finalResponse.put(Constants.ERROR, Constants.CONNECT_TO_CALLBACK_URL_FAIL); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error( - Constants.ERROR + Constants.COLON + Constants.CREATE_CALLBACK_REQUEST_OBJECT_FAIL); - finalResponse.put(Constants.ERROR, Constants.CREATE_CALLBACK_REQUEST_OBJECT_FAIL); - promise.fail(finalResponse.toString()); - } - } else { - LOGGER.error(Constants.CALLBACK_URL_INVALID); - finalResponse.put(Constants.ERROR, Constants.CALLBACK_URL_INVALID); - promise.fail(finalResponse.toString()); - } - } catch (Exception e) { - LOGGER.error(Constants.DATA_SEND_TO_CALLBACK_URL_FAIL + e.getCause()); - finalResponse.put(Constants.ERROR, Constants.DATA_SEND_TO_CALLBACK_URL_FAIL); - promise.fail(finalResponse.toString()); - } - } - return promise.future(); - } - - /* Create Cache for callback */ - private void createCache() { - pgCache = new HashMap(); - } - - /* Update Cache for callback */ - private void updateCache(String entity, JsonObject callBackDataObj) { - if (pgCache == null) { - createCache(); - } - - if (pgCache != null) { - pgCache.put(entity, callBackDataObj); - } - } - - /* Delete Cache for callback */ - private void clearCacheData() { - pgCache.clear(); - } - - @Override - public CallbackService queryCallBackDataBase( - JsonObject request, Handler> handler) { - if (request != null && !request.isEmpty()) { - Future result = queryCallBackDataBase(request); - result.onComplete( - resultHandler -> { - if (resultHandler.succeeded()) { - handler.handle(Future.succeededFuture(resultHandler.result())); - } - if (resultHandler.failed()) { - LOGGER.error("queryCallBackDataBase resultHandler failed : " + resultHandler.cause()); - handler.handle(Future.failedFuture(resultHandler.cause().getMessage())); - } - }); - } - return this; - } - - /** - * queryCallBackDataBase Method. - * - *

    This method execute tasks

    - * - *
  • Create instance of pgClient and Query callback database - *
  • Update Cache for entity and callBackDataObj - * - * @param request which is a JSON object - * @return response which is a Future object of promise of JSON type - */ - public Future queryCallBackDataBase(JsonObject request) { - JsonObject finalResponse = new JsonObject(); - Promise promise = Promise.promise(); - - /* Get table name for request object */ - String tableName = request.getString(Constants.TABLE_NAME); - - /* Set Connection Object */ - if (connectOptions == null) { - connectOptions = - new PgConnectOptions() - .setPort(databasePort) - .setHost(databaseIp) - .setDatabase(databaseName) - .setUser(databaseUserName) - .setPassword(databasePassword); - } - - /* Pool options */ - if (poolOptions == null) { - poolOptions = new PoolOptions().setMaxSize(databasePoolSize); - } - - /* Create the client pool */ - if (pgClient == null) { - pgClient = PgPool.pool(vertx, connectOptions, poolOptions); - } - if (pgClient != null) { - try { - /* Execute simple query */ - pgClient.getConnection( - handler -> { - if (handler.succeeded()) { - SqlConnection pgConnection = handler.result(); - pgConnection - .preparedQuery("SELECT * FROM " + tableName) - .execute( - action -> { - if (action.succeeded()) { - LOGGER.debug( - Constants.EXECUTING_SQL_QUERY + Constants.COLON + tableName); - /* Rows in Table */ - RowSet rows = action.result(); - LOGGER.debug(Constants.FETCH_DATA_FROM_DATABASE); - LOGGER.debug(Constants.ROWS + Constants.COLON + rows.size()); - - /* Clear Cache Data */ - if (pgCache != null) { - clearCacheData(); - LOGGER.debug("Cache Data Clear.....!!!"); - } - - /* Iterating Rows */ - for (Row row : rows) { - /* Getting entities, callBackUrl, userName and password from row */ - JsonObject callBackDataObj = new JsonObject(); - String callBackUrl = row.getString(1); - JsonArray entities = (JsonArray) row.getValue(2); - String userName = row.getString(6); - String password = row.getString(7); - - /* Iterating entities JsonArray for updating Cache */ - if (entities != null) { - entities.forEach( - entity -> { - /* Creating entityData */ - callBackDataObj.put(Constants.CALLBACK_URL, callBackUrl); - callBackDataObj.put(Constants.USER_NAME, userName); - callBackDataObj.put(Constants.PASSWORD, password); - /* Update Cache for each entity */ - if (entity != null) { - updateCache(entity.toString(), callBackDataObj); - } - }); - } - } - LOGGER.debug( - Constants.SUCCESS - + Constants.COLON - + Constants.CACHE_UPDATE_SUCCESS); - LOGGER.debug(Constants.CACHE_DATA + Constants.COLON + pgCache); - finalResponse.put(Constants.SUCCESS, Constants.CACHE_UPDATE_SUCCESS); - promise.complete(finalResponse); - } else { - LOGGER.error(Constants.ERROR + action.cause()); - LOGGER.error("", action.cause()); - finalResponse.put(Constants.ERROR, Constants.EXECUTE_QUERY_FAIL); - promise.fail(finalResponse.toString()); - } - }); - } else { - LOGGER.error(Constants.CONNECT_DATABASE_FAIL + handler.cause().getMessage()); - finalResponse.put(Constants.ERROR, Constants.CONNECT_DATABASE_FAIL); - promise.fail(finalResponse.toString()); - } - }); - - } catch (Exception e) { - LOGGER.error(Constants.CONNECT_DATABASE_FAIL, e.getCause()); - finalResponse.put(Constants.ERROR, Constants.CONNECT_DATABASE_FAIL); - promise.fail(finalResponse.toString()); - } finally { - pgClient.close(); - } - } else { - LOGGER.error(Constants.ERROR + Constants.COLON + Constants.CREATE_PG_CLIENT_OBJECT_FAIL); - finalResponse.put(Constants.ERROR, Constants.CREATE_PG_CLIENT_OBJECT_FAIL); - promise.fail(finalResponse.toString()); - } - return promise.future(); - } -} diff --git a/src/main/java/iudx/resource/server/callback/CallbackVerticle.java b/src/main/java/iudx/resource/server/callback/CallbackVerticle.java deleted file mode 100644 index 066e74f54..000000000 --- a/src/main/java/iudx/resource/server/callback/CallbackVerticle.java +++ /dev/null @@ -1,129 +0,0 @@ -package iudx.resource.server.callback; - -import io.vertx.core.AbstractVerticle; -import io.vertx.core.eventbus.MessageConsumer; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; -import io.vertx.rabbitmq.RabbitMQClient; -import io.vertx.rabbitmq.RabbitMQOptions; -import io.vertx.serviceproxy.ServiceBinder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class CallbackVerticle extends AbstractVerticle { - - private static final String CALLBACK_SERVICE_ADDRESS = "iudx.rs.callback.service"; - private static final Logger LOGGER = LogManager.getLogger(CallbackVerticle.class); - private RabbitMQOptions config; - private RabbitMQClient client; - private String dataBrokerIp; - private int dataBrokerPort; - private int dataBrokerManagementPort; - private String dataBrokerVhost; - private String dataBrokerUserName; - private String dataBrokerPassword; - private int connectionTimeout; - private int requestedHeartbeat; - private int handshakeTimeout; - private int requestedChannelMax; - private int networkRecoveryInterval; - private CallbackService callback; - private WebClient webClient; - private WebClientOptions webConfig; - private ServiceBinder binder; - private MessageConsumer consumer; - /* Database Properties */ - private String databaseIp; - private int databasePort; - private String databaseName; - private String databaseUserName; - private String databasePassword; - private int poolSize; - - /** This method is used to start the Verticle. It deploys a verticle in a cluster. */ - @Override - public void start() throws Exception { - - dataBrokerIp = config().getString("dataBrokerIP"); - dataBrokerPort = config().getInteger("dataBrokerPort"); - dataBrokerManagementPort = config().getInteger("dataBrokerManagementPort"); - dataBrokerVhost = config().getString("dataBrokerVhost"); - dataBrokerUserName = config().getString("dataBrokerUserName"); - dataBrokerPassword = config().getString("dataBrokerPassword"); - connectionTimeout = config().getInteger("connectionTimeout"); - requestedHeartbeat = config().getInteger("requestedHeartbeat"); - handshakeTimeout = config().getInteger("handshakeTimeout"); - requestedChannelMax = config().getInteger("requestedChannelMax"); - networkRecoveryInterval = config().getInteger("networkRecoveryInterval"); - - databaseIp = config().getString("callbackDatabaseIP"); - databasePort = config().getInteger("callbackDatabasePort"); - databaseName = config().getString("callbackDatabaseName"); - databaseUserName = config().getString("callbackDatabaseUserName"); - databasePassword = config().getString("callbackDatabasePassword"); - poolSize = config().getInteger("callbackpoolSize"); - - /* Configure the RabbitMQ Data Broker client with input from config files. */ - - config = new RabbitMQOptions(); - config.setUser(dataBrokerUserName); - config.setPassword(dataBrokerPassword); - config.setHost(dataBrokerIp); - config.setPort(dataBrokerPort); - config.setVirtualHost(dataBrokerVhost); - config.setConnectionTimeout(connectionTimeout); - config.setRequestedHeartbeat(requestedHeartbeat); - config.setHandshakeTimeout(handshakeTimeout); - config.setRequestedChannelMax(requestedChannelMax); - config.setNetworkRecoveryInterval(networkRecoveryInterval); - config.setAutomaticRecoveryEnabled(true); - - webConfig = new WebClientOptions(); - webConfig.setKeepAlive(true); - webConfig.setConnectTimeout(86400000); - webConfig.setDefaultHost(dataBrokerIp); - webConfig.setDefaultPort(dataBrokerManagementPort); - webConfig.setKeepAliveTimeout(86400000); - - /* Create a RabbitMQ Client with the configuration and vertx cluster instance. */ - - client = RabbitMQClient.create(vertx, config); - - /* Create a Vertx Web Client with the configuration and vertx cluster instance. */ - - webClient = WebClient.create(vertx, webConfig); - - /* Create a Json Object for properties */ - - JsonObject propObj = new JsonObject(); - - propObj.put("userName", dataBrokerUserName); - propObj.put("password", dataBrokerPassword); - propObj.put("vHost", dataBrokerVhost); - propObj.put("dataBrokerIP", dataBrokerIp); - propObj.put("dataBrokerPort", dataBrokerPort); - propObj.put("callbackDatabaseIP", databaseIp); - propObj.put("callbackDatabasePort", databasePort); - propObj.put("callbackDatabaseName", databaseName); - propObj.put("callbackDatabaseUserName", databaseUserName); - propObj.put("callbackDatabasePassword", databasePassword); - propObj.put("callbackpoolSize", poolSize); - - /* Call the callback constructor with the RabbitMQ client. */ - binder = new ServiceBinder(vertx); - callback = new CallbackServiceImpl(client, webClient, propObj, vertx); - - /* Publish the Callback service with the Event Bus against an address. */ - - consumer = - binder.setAddress(CALLBACK_SERVICE_ADDRESS).register(CallbackService.class, callback); - - LOGGER.info("Callback Verticle started"); - } - - @Override - public void stop() { - binder.unregister(consumer); - } -} diff --git a/src/main/java/iudx/resource/server/callback/Constants.java b/src/main/java/iudx/resource/server/callback/Constants.java deleted file mode 100644 index c38173aa4..000000000 --- a/src/main/java/iudx/resource/server/callback/Constants.java +++ /dev/null @@ -1,99 +0,0 @@ -package iudx.resource.server.callback; - -public class Constants { - - public static final String QUEUE_NAME = "queueName"; - - public static final String ERROR = "error"; - public static final String FAILURE = "failure"; - public static final String SUCCESS = "success"; - - public static final String TAGS = "tags"; - public static final String TYPE = "type"; - public static final String TITLE = "title"; - public static final String DETAIL = "detail"; - - public static final String MESSAGE = "Message"; - public static final String STATUS = "status"; - - public static final String ROUTING_KEY = "Routing Key"; - public static final String ACKMODE = "ackmode"; - public static final String ENCODING = "encoding"; - - public static final String URL = "url"; - public static final String CALLBACK_URL = "callbackurl"; - public static final String USER_NAME = "username"; - public static final String PASSWORD = "password"; - - public static final String POOL_SIZE = "poolSize"; - - public static final String CONTENT_TYPE = "content-type"; - public static final String APPLICATION_JSON = "application/json"; - - public static final String CREATE = "create"; - public static final String UPDATE = "update"; - public static final String DELETE = "delete"; - public static final String TABLE_NAME = "tableName"; - public static final String OPERATION = "operation"; - - public static final String COLON = " :: "; - public static final String NEW_LINE = "\n"; - - public static final String STATUS_CODE = "Status Code"; - public static final String RESPONSE_BODY = "Response Body"; - public static final String ERROR_MESSAGE = "Error Message"; - - public static final String CALLBACK_URL_NOT_FOUND = "Callback Url not found"; - public static final String CALLBACK_SUCCESS = "Data Send to CallBackUrl Successfully"; - - public static final String CONNECT_TO_CALLBACK_NOTIFICATION_QUEUE = - "Connected to callback.notification queue"; - public static final String CONNECT_TO_CALLBACK_NOTIFICATION_QUEUE_FAIL = - "Failed to connect with callback.notification queue"; - - public static final String CONNECT_TO_CALLBACK_DATA_QUEUE = "Connected to callback.data queue"; - public static final String CONNECT_TO_CALLBACK_DATA_QUEUE_FAIL = - "Failed to connect with callback.data queue"; - - public static final String RABBITMQ_CONSUMER_CREATED = "RabbitMQ consumer created"; - public static final String CONSUME_QUEUE_MESSAGE_FAIL = "Failed to consume message from Queue"; - public static final String QUEUE_CONNECTION_FAIL = - "rabbitmq client failed to create connection with Queue"; - public static final String MESSAGE_BODY_NULL = "Message body is NULL"; - public static final String GET_QUEUE_MESSAGE_FAIL = "Failed to get message from queue"; - public static final String QUEUE_EMPTY = "Queue is Empty"; - - public static final String DATABASE_QUERY_RESULT = "Database Query Result"; - public static final String DATABASE_QUERY_SUCCESS = "Database Query Successfully Done"; - public static final String DATABASE_QUERY_FAIL = "Database Query Failed"; - public static final String DATABASE_OPERATION_INVALID = - "Invalid database operation. Operation should be one of [create or update or delete]"; - public static final String DATABASE_OPERATION_NOT_FOUND = - "Database operation not found in message body"; - public static final String EXECUTING_SQL_QUERY = "Executing SQL Query on table"; - public static final String FETCH_DATA_FROM_DATABASE = "Fetching Data from DataBase.......!!!!"; - public static final String CACHE_UPDATE_SUCCESS = "Cache Updated Successfully"; - public static final String CACHE_DATA = "Cache Data"; - public static final String CONNECT_DATABASE_FAIL = "Error in Connecting Database"; - public static final String EXECUTE_QUERY_FAIL = "Failed to execute Query"; - public static final String CREATE_PG_CLIENT_OBJECT_FAIL = - "Failed to create pgClient for database query"; - public static final String ROWS = "rows"; - - public static final String JSON_PARSE_EXCEPTION = "Failed to parse message body"; - public static final String CALLBACK_JSON_OBJECT = "callBackJsonObj"; - public static final String CURRENT_MESSAGE_JSON_OBJECT = "currentMessageJsonObj"; - public static final String CALLBACK_URL_RESPONSE = "CallbackUrl Response"; - public static final String CALLBACK_URL_RESPONSE_NULL = "CallbackUrl response is null"; - public static final String CALLBACK_URL_INVALID = "CallbackUrl is Invalid"; - - - public static final String DATA_SEND_TO_CALLBACK_URL_SUCCESS = - "Data Send to CallBackUrl Successfully"; - public static final String DATA_SEND_TO_CALLBACK_URL_FAIL = "Failed to send data to callBackUrl"; - public static final String CREATE_CALLBACK_REQUEST_OBJECT_FAIL = - "Failed to create request object for sending callback request"; - public static final String CONNECT_TO_CALLBACK_URL_FAIL = "Failed to connect callbackUrl"; - public static final String NO_CALLBACK_URL_FOR_ROUTING_KEY = - "No callBackUrl exist for routing key"; -} diff --git a/src/main/java/iudx/resource/server/callback/package-info.java b/src/main/java/iudx/resource/server/callback/package-info.java deleted file mode 100644 index b7b5de644..000000000 --- a/src/main/java/iudx/resource/server/callback/package-info.java +++ /dev/null @@ -1,6 +0,0 @@ -@ModuleGen(groupPackage = "iudx.resource.server.callback", - name = "iudx-resource-server-callback-service") -package iudx.resource.server.callback; - -import io.vertx.codegen.annotations.ModuleGen; - diff --git a/src/test/java/iudx/resource/server/apiserver/subscription/CallbackSubscriptionTest.java b/src/test/java/iudx/resource/server/apiserver/subscription/CallbackSubscriptionTest.java deleted file mode 100644 index 4c451b859..000000000 --- a/src/test/java/iudx/resource/server/apiserver/subscription/CallbackSubscriptionTest.java +++ /dev/null @@ -1,263 +0,0 @@ -package iudx.resource.server.apiserver.subscription; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.json.JsonObject; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; -import iudx.resource.server.database.postgres.PostgresService; -import iudx.resource.server.databroker.DataBrokerService; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.Answer; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; - -@ExtendWith({VertxExtension.class, MockitoExtension.class}) -public class CallbackSubscriptionTest { - CallbackSubscription subscription; - @Mock - DataBrokerService databroker; - @Mock - PostgresService pgService; - @Mock - AsyncResult asyncResult; - @Mock - Throwable throwable; - String throwableMessage; - @BeforeEach - public void setUp(VertxTestContext vertxTestContext) { - subscription = new CallbackSubscription(databroker); - throwableMessage = "Dummy failure message"; - vertxTestContext.completeNow(); - } - - @Test - @DisplayName("Test create method : Success") - public void test_create_success(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).registerCallbackSubscription(any(), any()); - subscription.create(new JsonObject()).onComplete(handler -> { - if (handler.succeeded()) { - assertNull(handler.result()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test create method : Failure") - public void test_create_failure(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(false); - when(asyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn(throwableMessage); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).registerCallbackSubscription(any(), any()); - subscription.create(new JsonObject()).onComplete(handler -> { - if (handler.failed()) { - assertEquals("Dummy failure message",handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test update method : Success") - public void test_update_success(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).updateCallbackSubscription(any(), any()); - subscription.update(new JsonObject()).onComplete(handler -> { - if (handler.succeeded()) { - assertNull(handler.result()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test update method : Failure") - public void test_update_failure(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(false); - when(asyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn(throwableMessage); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).updateCallbackSubscription(any(), any()); - subscription.update(new JsonObject()).onComplete(handler -> { - if (handler.failed()) { - assertEquals("Dummy failure message",handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test append method : Success") - public void test_append_success(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).updateCallbackSubscription(any(), any()); - subscription.append(new JsonObject()).onComplete(handler -> { - if (handler.succeeded()) { - assertNull(handler.result()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test append method : Failure") - public void test_append_failure(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(false); - when(asyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn(throwableMessage); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).updateCallbackSubscription(any(), any()); - subscription.append(new JsonObject()).onComplete(handler -> { - if (handler.failed()) { - assertEquals("Dummy failure message",handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test delete method : Success") - public void test_delete_success(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).deleteStreamingSubscription(any(), any()); - subscription.delete(new JsonObject()).onComplete(handler -> { - if (handler.succeeded()) { - assertNull(handler.result()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test delete method : Failure") - public void test_delete_failure(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(false); - when(asyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn(throwableMessage); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).deleteStreamingSubscription(any(), any()); - subscription.delete(new JsonObject()).onComplete(handler -> { - if (handler.failed()) { - assertEquals("Dummy failure message",handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test get method : Success") - public void test_get_success(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).listCallbackSubscription(any(), any()); - subscription.get(new JsonObject()).onComplete(handler -> { - if (handler.succeeded()) { - assertNull(handler.result()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test get method : Failure") - public void test_get_failure(VertxTestContext vertxTestContext) { - when(asyncResult.succeeded()).thenReturn(false); - when(asyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn(throwableMessage); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(1)).handle(asyncResult); - return null; - } - }).when(databroker).listCallbackSubscription(any(), any()); - subscription.get(new JsonObject()).onComplete(handler -> { - if (handler.failed()) { - assertEquals("Dummy failure message",handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } -} diff --git a/src/test/java/iudx/resource/server/apiserver/subscription/SubscriptionServiceTest.java b/src/test/java/iudx/resource/server/apiserver/subscription/SubscriptionServiceTest.java index 89ed6741b..5081d1b88 100644 --- a/src/test/java/iudx/resource/server/apiserver/subscription/SubscriptionServiceTest.java +++ b/src/test/java/iudx/resource/server/apiserver/subscription/SubscriptionServiceTest.java @@ -278,7 +278,7 @@ public AsyncResult answer(InvocationOnMock arg1) throws Throwable { @Test @DisplayName("Test deleteSubscription method: Failure ") public void testDeleteSubscriptionFailure(VertxTestContext vertxTestContext) { - when(json.getString(anyString())).thenReturn("STREAMING"); + lenient().when(json.getString(anyString())).thenReturn("STREAMING"); service = new SubscriptionService(); service.subscription = mock(Subscription.class); when(service.subscription.delete(any())).thenReturn(jsonObjectFuture); @@ -310,7 +310,7 @@ public AsyncResult answer(InvocationOnMock arg0) throws Throwable { @Test @DisplayName("Test getSubscription method : Success") public void testGetSubscriptionSuccess(VertxTestContext vertxTestContext) { - when(json.getString(anyString())).thenReturn("STREAMING"); + lenient().when(json.getString(anyString())).thenReturn("STREAMING"); when(asyncResult.succeeded()).thenReturn(true); when(asyncResult.result()).thenReturn(json); service = new SubscriptionService(); @@ -336,7 +336,7 @@ public AsyncResult answer(InvocationOnMock arg0) throws Throwable { @Test @DisplayName("Test getSubscription method : Failure") public void testGetSubscriptionFailure(VertxTestContext vertxTestContext) { - when(json.getString(anyString())).thenReturn("STREAMING"); + lenient().when(json.getString(anyString())).thenReturn("STREAMING"); when(asyncResult.succeeded()).thenReturn(false); when(asyncResult.cause()).thenReturn(throwable); service = new SubscriptionService(); diff --git a/src/test/java/iudx/resource/server/callback/CallbackServiceImplTest.java b/src/test/java/iudx/resource/server/callback/CallbackServiceImplTest.java deleted file mode 100644 index 0f58261cd..000000000 --- a/src/test/java/iudx/resource/server/callback/CallbackServiceImplTest.java +++ /dev/null @@ -1,892 +0,0 @@ -package iudx.resource.server.callback; - -import com.rabbitmq.client.Envelope; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.json.DecodeException; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; -import io.vertx.pgclient.PgPool; -import io.vertx.rabbitmq.RabbitMQClient; -import io.vertx.rabbitmq.RabbitMQConsumer; -import io.vertx.rabbitmq.RabbitMQMessage; -import io.vertx.sqlclient.PreparedQuery; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.SqlConnection; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.Answer; - -import java.util.HashMap; -import java.util.stream.Stream; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; - -@ExtendWith(MockitoExtension.class) -@ExtendWith(VertxExtension.class) -public class CallbackServiceImplTest { - @Mock - RabbitMQClient rabbitMQClient; - @Mock - WebClient webClient; - @Mock - AsyncResult sqlConnectionAsyncResult; - @Mock - AsyncResult> rowSetAsyncResult; - @Mock - AsyncResult voidAsyncResult; - @Mock - Throwable throwable; - @Mock - AsyncResult> httpResponseAsyncResult; - @Mock - AsyncResult rabbitMQConsumerAsyncResult; - JsonObject jsonObject; - Vertx vertxObj; - CallbackServiceImpl callbackService; - @Mock - RabbitMQConsumer rabbitMQConsumer; - @Mock - HttpRequest httpRequest; - @Mock - HttpResponse httpResponse; - @Mock - JsonObject request; - @Mock - RabbitMQMessage rabbitMQMessage; - - @BeforeEach - public void setUp(VertxTestContext vertxTestContext) { - vertxObj = Vertx.vertx(); - jsonObject = new JsonObject(); - jsonObject.put("callbackDatabaseIP", "Dummy callbackDatabaseIP value"); - jsonObject.put("callbackDatabasePort", 8888); - jsonObject.put("callbackDatabaseName", "localhost"); - jsonObject.put("callbackDatabaseUserName", "guest"); - jsonObject.put("callbackDatabasePassword", "guest"); - jsonObject.put("callbackpoolSize", 10); - callbackService = new CallbackServiceImpl(rabbitMQClient, webClient, jsonObject, vertxObj); - vertxTestContext.completeNow(); - } - - - static Stream statusCode() { - return Stream.of( - Arguments.of(404, "{\"type\":404,\"title\":\"failure\",\"detail\":\"Callback Url not found\"}"), - Arguments.of(400, "{\"type\":400,\"title\":\"failure\",\"detail\":null}"), - Arguments.of(200, "{\"type\":200,\"title\":\"success\",\"detail\":\"Data Send to CallBackUrl Successfully\"}") - ); - } - - @ParameterizedTest - @DisplayName("Test sendDataToCallBackSubscriber method : with Different HTTP status") - @MethodSource("statusCode") - public void testSendDataToCallBackSubscriber(int code, String expected, VertxTestContext vertxTestContext) { - when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(httpRequest); - when(httpRequest.putHeader(anyString(), anyString())).thenReturn(httpRequest); - when(httpResponseAsyncResult.succeeded()).thenReturn(true); - when(httpResponseAsyncResult.result()).thenReturn(httpResponse); - when(httpResponse.statusCode()).thenReturn(code); - lenient().when(httpResponse.bodyAsString()).thenReturn("Dummy string"); - - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(1)).handle(httpResponseAsyncResult); - return null; - } - }).when(httpRequest).sendJsonObject(any(), any()); - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - assertEquals(expected, handler.result().toString()); - } else { - assertEquals(expected, handler.cause().getMessage()); - } - }); - vertxTestContext.completeNow(); - } - - @Test - @DisplayName("Test sendDataToCallBackSubscriber method : with null response") - public void testSendDataToCallBackSubscriberForNullResponse(VertxTestContext vertxTestContext) { - when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(httpRequest); - when(httpRequest.putHeader(anyString(), anyString())).thenReturn(httpRequest); - when(httpResponseAsyncResult.succeeded()).thenReturn(true); - when(httpResponseAsyncResult.result()).thenReturn(null); - when(httpResponseAsyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn("Dummy failure message"); - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(1)).handle(httpResponseAsyncResult); - return null; - } - }).when(httpRequest).sendJsonObject(any(), any()); - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"CallbackUrl response is null\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test sendDataToCallBackSubscriber method : Failure to connect callback url") - public void testSendDataToCallBackSubscriberFailure(VertxTestContext vertxTestContext) { - when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(httpRequest); - when(httpRequest.putHeader(anyString(), anyString())).thenReturn(httpRequest); - when(httpResponseAsyncResult.succeeded()).thenReturn(false); - when(httpResponseAsyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn("Dummy failure message"); - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(1)).handle(httpResponseAsyncResult); - return null; - } - }).when(httpRequest).sendJsonObject(any(), any()); - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Failed to connect callbackUrl\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test sendDataToCallBackSubscriber method : with invalid web request") - public void testSendDataToCallBackSubscriberForInvalidWebRequest(VertxTestContext vertxTestContext) { - when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(null); - - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Failed to send data to callBackUrl\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test sendDataToCallBackSubscriber method : with null callbBackUrl") - public void testSendDataToCallBackSubscriberForNullCallBackURL(VertxTestContext vertxTestContext) { - lenient().when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(null); - - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"CallbackUrl is Invalid\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test sendDataToCallBackSubscriber method : with null http request") - public void testSendDataToCallBackSubscriberForNullHttpRequest(VertxTestContext vertxTestContext) { - when(request.getString(anyString())).thenReturn("Dummy string"); - when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(null); - - callbackService.sendDataToCallBackSubscriber(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Failed to create request object for sending callback request\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - - @Test - @DisplayName("Test queryCallBackDataBase method : Failure in connecting database") - public void testQueryCallBackDataBaseFailure(VertxTestContext vertxTestContext) { - SqlConnection sqlConnection = mock(SqlConnection.class); - PreparedQuery> preparedquery = mock(PreparedQuery.class); - RowSet rowSet = mock(RowSet.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - - when(sqlConnectionAsyncResult.result()).thenReturn(sqlConnection); - when(sqlConnection.preparedQuery(anyString())).thenReturn(preparedquery); - when(rowSetAsyncResult.succeeded()).thenReturn(true); - when(rowSetAsyncResult.result()).thenReturn(rowSet); - - - when(sqlConnectionAsyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(sqlConnectionAsyncResult); - return null; - } - }).when(CallbackServiceImpl.pgClient).getConnection(any()); - - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(0)).handle(rowSetAsyncResult); - return null; - } - }).when(preparedquery).execute(any(Handler.class)); - callbackService.queryCallBackDataBase(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Error in Connecting Database\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test queryCallBackDataBase method : with Failure in Query Execution") - public void testQueryCallBackDataBaseQueryExecutionFailure(VertxTestContext vertxTestContext) { - SqlConnection sqlConnection = mock(SqlConnection.class); - PreparedQuery> preparedquery = mock(PreparedQuery.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - - when(sqlConnectionAsyncResult.result()).thenReturn(sqlConnection); - when(sqlConnection.preparedQuery(anyString())).thenReturn(preparedquery); - when(rowSetAsyncResult.succeeded()).thenReturn(false); - - - when(sqlConnectionAsyncResult.succeeded()).thenReturn(true); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(sqlConnectionAsyncResult); - return null; - } - }).when(CallbackServiceImpl.pgClient).getConnection(any()); - - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(0)).handle(rowSetAsyncResult); - return null; - } - }).when(preparedquery).execute(any(Handler.class)); - callbackService.queryCallBackDataBase(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Failed to execute Query\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test queryCallBackDataBase method : Failure in connecting Database") - public void testQueryCallBackDataBaseWithFailureInDBConnection(VertxTestContext vertxTestContext) { - SqlConnection sqlConnection = mock(SqlConnection.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - - when(sqlConnectionAsyncResult.cause()).thenReturn(throwable); - when(throwable.getMessage()).thenReturn("Dummy failure message"); - when(sqlConnectionAsyncResult.succeeded()).thenReturn(false); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(sqlConnectionAsyncResult); - return null; - } - }).when(CallbackServiceImpl.pgClient).getConnection(any()); - - - callbackService.queryCallBackDataBase(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("{\"error\":\"Error in Connecting Database\"}", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - - @Test - @DisplayName("Test connectToCallbackDataQueue method : Success") - public void testConnectToCallbackDataQueueSuccess(VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - Envelope envelopeValue = mock(Envelope.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - - when(request.getString(anyString())).thenReturn("Dummy string"); - lenient().when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(httpRequest); - when(httpRequest.putHeader(anyString(), anyString())).thenReturn(httpRequest); - - - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn("{\"Buffer\":\"Dummy buffer value\"}"); - - when(CallbackServiceImpl.pgCache.get(anyString())).thenReturn(request); - - when(rabbitMQMessage.envelope()).thenReturn(envelopeValue); - when(envelopeValue.getRoutingKey()).thenReturn("Dummy routing key"); - - when(httpResponseAsyncResult.result()).thenReturn(httpResponse); - when(httpResponse.statusCode()).thenReturn(200); - when(httpResponse.bodyAsString()).thenReturn("Dummy http response string"); - - - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - when(httpResponseAsyncResult.succeeded()).thenReturn(true); - - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(1)).handle(httpResponseAsyncResult); - return null; - } - }).when(httpRequest).sendJsonObject(any(), any()); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - - String expected = "{\"success\":\"Data Send to CallBackUrl Successfully\",\"Database Query Result\":\"Connected to callback.data queue\"}"; - - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.succeeded()) { - assertEquals(expected, handler.result().toString()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test connectToCallbackDataQueue method : with failure start method") - public void testConnectToCallbackDataQueueForFailedConnection(VertxTestContext vertxTestContext) { - - String expected = "{\"error\":\"rabbitmq client failed to create connection with QueueDummy string\"}"; - when(request.getString(anyString())).thenReturn("Dummy string"); - when(voidAsyncResult.succeeded()).thenReturn(false); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.failed()) { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - vertxTestContext.completeNow(); - } - - @Test - @DisplayName("Test connectToCallbackDataQueue method : with failure in consuming message") - public void testConnectToCallbackDataQueueForFailure(VertxTestContext vertxTestContext) { - - String expected = "{\"error\":\"Failed to consume message from QueueDummy string\"}"; - when(request.getString(anyString())).thenReturn("Dummy string"); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(false); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.failed()) { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - vertxTestContext.completeNow(); - } - - static Stream operations() { - return Stream.of( - Arguments.of("{\"abcd\":\"dummy database operation\"}", "{\"error\":\"Database operation not found in message body\"}"), - Arguments.of("{\"operation\":\"dummy operation\"}", "{\"error\":\"Invalid database operation. Operation should be one of [create or update or delete]\"}"), - Arguments.of("{\"operation\":\"delete\"}", "{\"Database Query Result\":\"Database Query Failed\"}") - ); - } - - @ParameterizedTest - @MethodSource("operations") - @DisplayName("Test connectToCallbackNotificationQueue method : Failure to query database ") - public void testConnectToCallbackNotificationQueueQueryDBFailure(String operation, String expected, VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - - when(request.getString(anyString())).thenReturn("Dummy string"); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn(operation); - - SqlConnection sqlConnection = mock(SqlConnection.class); - PreparedQuery> preparedquery = mock(PreparedQuery.class); - RowSet rowSet = mock(RowSet.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - - lenient().when(sqlConnectionAsyncResult.result()).thenReturn(sqlConnection); - lenient().when(sqlConnection.preparedQuery(anyString())).thenReturn(preparedquery); - lenient().when(rowSetAsyncResult.succeeded()).thenReturn(true); - lenient().when(rowSetAsyncResult.result()).thenReturn(rowSet); - - - lenient().when(sqlConnectionAsyncResult.succeeded()).thenReturn(true); - lenient().doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(sqlConnectionAsyncResult); - return null; - } - }).when(CallbackServiceImpl.pgClient).getConnection(any()); - - lenient().doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(0)).handle(rowSetAsyncResult); - return null; - } - }).when(preparedquery).execute(any(Handler.class)); - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - - callbackService.connectToCallbackNotificationQueue(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - - } else { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - - } - - static Stream booleanValues() { - return Stream.of( - Arguments.of(true, "{\"error\":\"Failed to consume message from Queue :: Dummy string\"}"), - Arguments.of(false, "{\"error\":\"rabbitmq client failed to create connection with Queue :: Dummy string\"}") - ); - } - - @ParameterizedTest - @MethodSource("booleanValues") - @DisplayName("Test connectToCallbackNotificationQueue method : Failure ") - public void testConnectToCallbackNotificationQueueFailure(boolean booleanValue, String expected, VertxTestContext vertxTestContext) { - - CallbackServiceImpl.pgCache = mock(HashMap.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - when(voidAsyncResult.succeeded()).thenReturn(booleanValue); - lenient().when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(false); - - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - lenient().doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - - callbackService.connectToCallbackNotificationQueue(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - - } else { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - } - - @Test - @DisplayName("Test connectToCallbackNotificationQueue method : Failure to query database ") - public void testConnectToCallbackNotificationQueueQueryDBFailure(VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - - when(request.getString(anyString())).thenReturn("Dummy string"); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn("{\"operation\"}"); - - when(request.getString(anyString())).thenReturn("Dummy string"); - CallbackServiceImpl.pgClient = mock(PgPool.class); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - - assertThrows(NullPointerException.class, () -> { - callbackService.connectToCallbackNotificationQueue(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.failNow(handler.cause()); - } else { - assertEquals("", handler.cause().getMessage()); - vertxTestContext.completeNow(); - } - }); - }); - vertxTestContext.completeNow(); - } - - @Test - @DisplayName("Test connectToCallbackDataQueue method : Callback URL failure") - public void testConnectToCallbackDataQueueForInvalidURL(VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - Envelope envelopeValue = mock(Envelope.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - - when(request.getString(anyString())).thenReturn("Dummy string"); - lenient().when(request.getJsonObject(anyString())).thenReturn(request); - when(webClient.postAbs(anyString())).thenReturn(httpRequest); - when(httpRequest.basicAuthentication(anyString(), anyString())).thenReturn(httpRequest); - when(httpRequest.putHeader(anyString(), anyString())).thenReturn(httpRequest); - - - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn("{\"Buffer\":\"Dummy buffer value\"}"); - - when(CallbackServiceImpl.pgCache.get(anyString())).thenReturn(request); - - when(rabbitMQMessage.envelope()).thenReturn(envelopeValue); - when(envelopeValue.getRoutingKey()).thenReturn("Dummy routing key"); - - when(httpResponseAsyncResult.result()).thenReturn(httpResponse); - when(httpResponse.statusCode()).thenReturn(404); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - when(httpResponseAsyncResult.succeeded()).thenReturn(true); - - doAnswer(new Answer>>() { - @Override - public AsyncResult> answer(InvocationOnMock arg0) throws Throwable { - ((Handler>>) arg0.getArgument(1)).handle(httpResponseAsyncResult); - return null; - } - }).when(httpRequest).sendJsonObject(any(), any()); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - - String expected = "{\"error\":\"Failed to send data to callBackUrl\"}"; - - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.failed()) { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test connectToCallbackDataQueue method : when Callback URL does not exist") - public void testConnectToCallbackDataQueueWithAbsentCallBackURL(VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - Envelope envelopeValue = mock(Envelope.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn("{\"Buffer\":\"Dummy buffer value\"}"); - when(CallbackServiceImpl.pgCache.get(anyString())).thenReturn(null); - when(rabbitMQMessage.envelope()).thenReturn(envelopeValue); - when(envelopeValue.getRoutingKey()).thenReturn("Dummy routing key"); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - - String expected = "{\"error\":\"No callBackUrl exist for routing keyDummy routing key\"}"; - - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.failed()) { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - @Test - @DisplayName("Test connectToCallbackDataQueue method : when message body is NULL") - public void testConnectToCallbackDataQueueWithNullMessageBody(VertxTestContext vertxTestContext) { - - when(request.getString(anyString())).thenReturn("Dummy string"); - when(rabbitMQMessage.body()).thenReturn(null); - when(voidAsyncResult.succeeded()).thenReturn(true); - CallbackServiceImpl.pgCache = mock(HashMap.class); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - String expected = "{\"error\":\"Message body is NULL\"}"; - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.failed()) { - assertEquals(expected, handler.cause().getMessage()); - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - } - - - @Test - @DisplayName("Test connectToCallbackDataQueue method : for invalid message body") - public void testConnectToCallbackDataQueueWithInvalidMessageBody(VertxTestContext vertxTestContext) { - - Buffer bufferValue = mock(Buffer.class); - Envelope envelopeValue = mock(Envelope.class); - CallbackServiceImpl.pgCache = mock(HashMap.class); - when(request.getString(anyString())).thenReturn("Dummy string"); - when(rabbitMQMessage.body()).thenReturn(bufferValue); - when(bufferValue.toString()).thenReturn("{ \"buffer\" }"); - when(rabbitMQMessage.envelope()).thenReturn(envelopeValue); - when(envelopeValue.getRoutingKey()).thenReturn("Dummy routing key"); - when(voidAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.succeeded()).thenReturn(true); - when(rabbitMQConsumerAsyncResult.result()).thenReturn(rabbitMQConsumer); - - - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(0)).handle(voidAsyncResult); - return null; - } - }).when(rabbitMQClient).start(any()); - doAnswer(new Answer>() { - @Override - public AsyncResult answer(InvocationOnMock arg0) throws Throwable { - ((Handler>) arg0.getArgument(2)).handle(rabbitMQConsumerAsyncResult); - return null; - } - }).when(rabbitMQClient).basicConsumer(anyString(), any(), any()); - - doAnswer(new Answer() { - @Override - public RabbitMQMessage answer(InvocationOnMock arg0) throws Throwable { - ((Handler) arg0.getArgument(0)).handle(rabbitMQMessage); - return null; - } - }).when(rabbitMQConsumer).handler(any()); - - assertThrows(DecodeException.class, () -> { - callbackService.connectToCallbackDataQueue(request, handler -> { - if (handler.succeeded()) { - vertxTestContext.completeNow(); - } else { - vertxTestContext.failNow(handler.cause()); - } - }); - }); - vertxTestContext.completeNow(); - } - - -}