Skip to content

Commit

Permalink
V2 APIs changes (#21)
Browse files Browse the repository at this point in the history
* #OBS-I116: Dataset CRUD APIs test cases and fixes

* #OBS-I116: Dataset update API Dedupe and denorm test cases fixes

* #OBS-I116: Dataset Create api test case fixes

* #OBS-I116: Dataset update extraction config api test case fixes

* #OBS-I116: Dataset update api test cases

* #OBS-I116: fix: linting fixes

* #OBS-I116: lint fixes

* #OBS-I116: Dataset status transition test cases

* #OBS-I116: feat: Test cases and linting fixes

* #OBS-I116: feat: Dataset status transition test cases fix

* #OBS-I141: added a new metric to sum the response time

* #OBS-I141: modified the url variable and access dataset_id from params

* #OBS-I141: added helper function to get dataset_id for error cases

* #OBS-I141: added telemetry for v2 api's

* #OBS-I141: added a new metric to sum the response time

* #OBS-I141: modified the url variable and access dataset_id from params

* #OBS-I141: added helper function to get dataset_id for error cases

* #OBS-I141: added telemetry for v2 api's

* #OBS-I141: added telemetry for v2 api's

* #OBS-I143: feat: dataset publish changes to deploy flink connectors

* #OBS-I141: removed metric for sum of response time

* #OBS-I141: removed usage of builtin kafka methods from telemetry file

* #OBS-I146: feat: Retire fix

* Issue #SBCOSS-12 fix: convert all SQL raw queries to prepared statements

* Issue #SBCOSS-12 fix: tags is an array, so requires empty json for null case; dataset draft deletion requires deletion of transformation and source config drafts

* #SBCOSS-23: feat:  dataset publish changes for redeployment

* #OBS-I146: fix: Test case fix for read api

* #OBS-I146: fix: Test case fix for read api changes

* #OBS-I146: fix: status transition test cases

* #OBS-I146: fix: Test case script fix

* #OBS-I146: fix: Type error fix

* #OBS-I146: fix: Dataset read api test cases fixes

* #OBS-I146: fix: Hudi spec generation test cases

* #OBS-I146: fix: Test case and linting fix

* #OBS-I173: fix: Dataset update changes to accept type changes

* #OBS-I146: fix: linting fix

* #OBS-I146: fix: linting fix

* #OBS-I143: dataset publish changes fixes

* #OBS-I143: inswert query fix

* Issue #OBS-I144 fix: icon data as string; check default version

* #OBS-143: fix: dataset publish fixes

* #OBS-I181 - Updated the event structure

---------

Co-authored-by: JeraldJF <[email protected]>
Co-authored-by: Rakshitha-D <[email protected]>
Co-authored-by: SurabhiAngadi <[email protected]>
Co-authored-by: Harish Kumar Gangula <[email protected]>
Co-authored-by: Aniket Sakinala <[email protected]>
Co-authored-by: Ravi Mula <[email protected]>
  • Loading branch information
7 people authored Sep 3, 2024
1 parent fbdcf8a commit b9174ea
Show file tree
Hide file tree
Showing 151 changed files with 2,368 additions and 2,817 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ jobs:
cd api-service
npm install
npm run actions:test
npm run actions:test:v2
npm run lint
2 changes: 1 addition & 1 deletion api-service/.eslintignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ docs
coverage
@types
.nyc_output
src/v2/tests
src/tests
src/v1
dist
3 changes: 2 additions & 1 deletion api-service/.eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"plugin:@typescript-eslint/eslint-recommended",
"plugin:@typescript-eslint/recommended"
],
"rules": {
"rules": {
"@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_" }],
"@typescript-eslint/no-explicit-any": ["off"],
"@typescript-eslint/no-useless-escape": ["off"],
"@typescript-eslint/quotes": [
Expand Down
5 changes: 2 additions & 3 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
"main": "dist/app.js",
"scripts": {
"start": "ts-node ./src/app.ts",
"test": "source .env.test && nyc mocha ./src/v1/test/*.spec.ts --exit && nyc mocha ./src/v2/tests/**/*.spec.ts --exit",
"actions:test": "nyc mocha ./src/v1/test/*.spec.ts --exit",
"actions:test:v2": "nyc mocha ./src/v2/tests/**/*.spec.ts --exit",
"test": "source .env.test && nyc mocha ./src/tests/**/*.spec.ts --exit",
"actions:test": "nyc mocha ./src/tests/**/*.spec.ts --exit",
"build": "rm -rf dist && tsc --declaration -P . && cp package.json ./dist/package.json",
"package": "npm run build && cd dist && npm pack . && cd ..",
"lint": "eslint . --ext .ts",
Expand Down
3 changes: 3 additions & 0 deletions api-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import { errorHandler, obsrvErrorHandler } from "./middlewares/errors";
import { ResponseHandler } from "./helpers/ResponseHandler";
import { config } from "./configs/Config";
import { alertsRouter } from "./routes/AlertsRouter";
import { interceptAuditEvents } from "./services/telemetry";

const app: Application = express();

app.use(bodyParser.json({ limit: config.body_parser_limit}));
app.use(express.text());
app.use(express.json());
app.use(errorHandler)

app.use(interceptAuditEvents());
app.use("/v2/", v2Router);
app.use("/", druidProxyRouter);
app.use("/alerts/v1", alertsRouter);
Expand Down
2 changes: 1 addition & 1 deletion api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export const config = {
"encryption_algorithm": process.env.encryption_algorithm || "aes-256-ecb",
},
"grafana_config": {
"dialect": process.env.dialet || 'postgres',
"dialect": process.env.dialet || "postgres",
"url": process.env.grafana_url || "http://localhost:8000",
"access_token": process.env.grafana_token || ""
}
Expand Down
2 changes: 1 addition & 1 deletion api-service/src/connections/grafanaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ const grafanaHttpClient = axios.create({
baseURL: config.grafana_config.url
});

grafanaHttpClient.defaults.headers.common['Authorization'] = config.grafana_config.access_token;
grafanaHttpClient.defaults.headers.common["Authorization"] = config.grafana_config.access_token;

export { grafanaHttpClient };
16 changes: 8 additions & 8 deletions api-service/src/controllers/Alerts/Alerts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ const createAlertHandler = async (req: Request, res: Response, next: NextFunctio
updateTelemetryAuditEvent({ request: req, object: { id: response?.dataValues?.id, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: response.dataValues.id } });
} catch (error: any) {
let errorMessage = _.get(error, 'message')
if (_.get(error, 'name') == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, 'parent.detail')
let errorMessage = _.get(error, "message")
if (_.get(error, "name") == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, "parent.detail")
}
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
Expand All @@ -44,7 +44,7 @@ const publishAlertHandler = async (req: Request, res: Response, next: NextFuncti

const transformAlerts = async (alertModel: any) => {
const alert = alertModel?.toJSON();
const status = _.get(alert, 'status');
const status = _.get(alert, "status");
if (status !== "live") return alert;
return getAlertsMetadata(alert);
}
Expand Down Expand Up @@ -108,13 +108,13 @@ const updateAlertHandler = async (req: Request, res: Response, next: NextFunctio
await retireAlertSilence(alertId);
}
const updatedPayload = getAlertPayload({ ...req.body, manager: rulePayload?.manager });
await Alert.update({ ...updatedPayload, status: 'draft' }, { where: { id: alertId } });
await Alert.update({ ...updatedPayload, status: "draft" }, { where: { id: alertId } });
updateTelemetryAuditEvent({ request: req, currentRecord: rulePayload, object: { id: alertId, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: alertId } });
} catch (error: any) {
let errorMessage = _.get(error, 'message')
if (_.get(error, 'name') == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, 'parent.detail')
let errorMessage = _.get(error, "message")
if (_.get(error, "name") == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, "parent.detail")
}
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
Expand Down
20 changes: 10 additions & 10 deletions api-service/src/controllers/Alerts/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ const telemetryObject = { type: "metric", ver: "1.0.0" };

const createMetricHandler = async (req: Request, res: Response, next: NextFunction) => {
try {
const { component } = req?.body;
const { component } = req.body;
const transformComponent = _.toLower(component);
const metricsBody = await Metrics.create({ ...(req.body), component: transformComponent });
updateTelemetryAuditEvent({ request: req, object: { id: metricsBody?.dataValues?.id, ...telemetryObject } });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: metricsBody.dataValues.id } });
} catch (error: any) {
let errorMessage = _.get(error, 'message')
if (_.get(error, 'name') == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, 'parent.detail')
let errorMessage = _.get(error, "message")
if (_.get(error, "name") == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, "parent.detail")
}
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
Expand All @@ -30,7 +30,7 @@ const listMetricsHandler = async (req: Request, res: Response, next: NextFunctio
const metricsPayload = await Metrics.findAll({ limit: limit, offset: offset, ...(filters && { where: filters }) });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { metrics: metricsPayload, count: metricsPayload.length } });
} catch (error) {
const errorMessage = _.get(error, 'message')
const errorMessage = _.get(error, "message")
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
}
Expand All @@ -50,9 +50,9 @@ const updateMetricHandler = async (req: Request, res: Response, next: NextFuncti
});
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id } });
} catch (error) {
let errorMessage = _.get(error, 'message')
if (_.get(error, 'name') == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, 'parent.detail')
let errorMessage = _.get(error, "message")
if (_.get(error, "name") == "SequelizeUniqueConstraintError") {
errorMessage = _.get(error, "parent.detail")
}
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
Expand All @@ -66,7 +66,7 @@ const deleteMetricHandler = async (req: Request, res: Response, next: NextFuncti
await record.destroy();
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id } });
} catch (error) {
const errorMessage = _.get(error, 'message')
const errorMessage = _.get(error, "message")
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
}
Expand All @@ -78,7 +78,7 @@ const deleteMultipleMetricHandler = async (req: Request, res: Response, next: Ne
await Metrics.destroy({ where: filters });
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: {} });
} catch (error) {
const errorMessage = _.get(error, 'message')
const errorMessage = _.get(error, "message")
next(errorResponse((httpStatus.INTERNAL_SERVER_ERROR, { message: errorMessage })))
}
}
Expand Down
18 changes: 9 additions & 9 deletions api-service/src/controllers/Alerts/Silence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const createHandler = async (request: Request, response: Response, next: NextFun
const grafanaResponse = await createSilence(payload);
if (!grafanaResponse) return next({ message: httpStatus[httpStatus.INTERNAL_SERVER_ERROR], statusCode: httpStatus.INTERNAL_SERVER_ERROR })

let start_date = new Date(startDate);
let end_date = new Date(endDate);
const start_date = new Date(startDate);
const end_date = new Date(endDate);
const silenceBody = {
id: grafanaResponse.silenceId,
manager: grafanaResponse.manager,
Expand All @@ -31,7 +31,7 @@ const createHandler = async (request: Request, response: Response, next: NextFun
updateTelemetryAuditEvent({ request, object: { id: sileneResponse?.dataValues?.id, ...telemetryObject } });
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: { id: sileneResponse.dataValues.id } })
} catch (err) {
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, 'message') || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, "message") || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
next(error);
}
}
Expand All @@ -45,11 +45,11 @@ const transformSilences = async (silenceModel: any) => {
const listHandler = async (request: Request, response: Response, next: NextFunction) => {
try {
const silences = await Silence.findAll();
const count = _.get(silences, 'length');
const count = _.get(silences, "length");
const transformedSilences = await Promise.all(silences.map(transformSilences));
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: { transformedSilences, ...(count && { count }) } });
} catch (err) {
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, 'message') || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, "message") || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
next(error);
}
}
Expand All @@ -62,7 +62,7 @@ const fetchHandler = async (request: Request, response: Response, next: NextFunc
if (!silenceModel) return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND });
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: transformedSilence });
} catch (err) {
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, 'message') || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, "message") || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
next(error);
}
}
Expand All @@ -86,7 +86,7 @@ const updateHandler = async (request: Request, response: Response, next: NextFun
const silenceResponse = await Silence.update(updatedSilence, { where: { id } })
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: { silenceResponse } })
} catch (err) {
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, 'message') || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, "message") || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
next(error);
}
}
Expand All @@ -97,13 +97,13 @@ const deleteHandler = async (request: Request, response: Response, next: NextFun
const silenceModel = await Silence.findOne({ where: { id } });
if (!silenceModel) return next({ message: httpStatus[httpStatus.NOT_FOUND], statusCode: httpStatus.NOT_FOUND });
const silenceObject = silenceModel?.toJSON();
if (silenceObject?.status === 'expired') return next({ message: "Silence is already expired", statusCode: httpStatus.BAD_REQUEST });
if (silenceObject?.status === "expired") return next({ message: "Silence is already expired", statusCode: httpStatus.BAD_REQUEST });
await deleteSilence(silenceObject);
await silenceModel.destroy();
updateTelemetryAuditEvent({ request, object: { id, ...telemetryObject }, currentRecord: silenceObject });
ResponseHandler.successResponse(request, response, { status: httpStatus.OK, data: { id } })
} catch (err) {
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, 'message') || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
const error = errorResponse(httpStatus.INTERNAL_SERVER_ERROR, _.get(err, "message") || httpStatus[httpStatus.INTERNAL_SERVER_ERROR])
next(error);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const dataIn = async (req: Request, res: Response) => {
try {
const requestBody = req.body;
const datasetId = req.params.datasetId.trim();

const isValidSchema = schemaValidation(requestBody, validationSchema)
if (!isValidSchema?.isValid) {
logger.error({ apiId, message: isValidSchema?.message, code: "DATA_INGESTION_INVALID_INPUT" })
Expand Down Expand Up @@ -68,18 +68,23 @@ const addMetadataToEvents = (datasetId: string, payload: any) => {
const obsrvMeta = { syncts: now, flags: {}, timespans: {}, error: {}, source: source };
if (Array.isArray(validData)) {
const payloadRef = validData.map((event: any) => {
event = _.set(event, "obsrv_meta", obsrvMeta);
event = _.set(event, "dataset", datasetId);
event = _.set(event, "msgid", mid);
return event
const payload = {
event,
"obsrv_meta": obsrvMeta,
"dataset": datasetId,
"msgid": mid
}
return payload;
})
return payloadRef;
}
else {
_.set(validData, "msgid", mid);
_.set(validData, "obsrv_meta", obsrvMeta);
_.set(validData, "dataset", datasetId);
return validData
return ({
"event": validData,
"obsrv_meta": obsrvMeta,
"dataset": datasetId,
"msgid": mid
});
}
}

Expand Down
6 changes: 3 additions & 3 deletions api-service/src/controllers/DatasetCopy/DatasetCopy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const validateRequest = (req: Request) => {
}
}

const fetchDataset = async (req: Request, newDatasetId: string) => {
const fetchDataset = async (req: Request) => {
const datasetId = _.get(req, "body.request.source.datasetId");
const isLive = _.get(req, "body.request.source.isLive");

Expand All @@ -39,10 +39,10 @@ const datasetCopy = async (req: Request, res: Response) => {

validateRequest(req);
const newDatasetId = _.get(req, "body.request.destination.datasetId");
const dataset = await fetchDataset(req, newDatasetId);
const dataset = await fetchDataset(req);
updateRecords(dataset, newDatasetId)
const response = await datasetService.createDraftDataset(dataset).catch(err => {
if (err?.name === 'SequelizeUniqueConstraintError') {
if (err?.name === "SequelizeUniqueConstraintError") {
throw obsrvError(newDatasetId, "DATASET_ALREADY_EXISTS", `Dataset with id ${newDatasetId} already exists`, "BAD_REQUEST", 400);
}
throw obsrvError(newDatasetId, "DATASET_COPY_FAILURE", `Failed to clone dataset`, "INTERNAL_SERVER_ERROR", 500);
Expand Down
6 changes: 3 additions & 3 deletions api-service/src/controllers/DatasetCopy/DatasetCopyHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ const version = defaultDatasetConfig.version;

export const updateRecords = (datasetRecord: Record<string, any>, newDatasetId: string): void => {
const dataset_id = newDatasetId;
_.set(datasetRecord, 'api_version', "v2")
_.set(datasetRecord, 'status', DatasetStatus.Draft)
_.set(datasetRecord, "api_version", "v2")
_.set(datasetRecord, "status", DatasetStatus.Draft)
_.set(datasetRecord, "dataset_id", dataset_id)
_.set(datasetRecord, "id", dataset_id)
_.set(datasetRecord, "name", dataset_id)
_.set(datasetRecord, "version_key", Date.now().toString())
_.set(datasetRecord, 'version', version);
_.set(datasetRecord, "version", version);
_.set(datasetRecord, "entry_topic", config.telemetry_service_config.kafka.topics.createDataset)
_.set(datasetRecord, "router_config", { topic: newDatasetId })
}
6 changes: 3 additions & 3 deletions api-service/src/controllers/DatasetImport/DatasetImport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ const datasetImport = async (req: Request, res: Response) => {
const importDataset = async (dataset: Record<string, any>, overwrite: string | any) => {
const dataset_id = _.get(dataset,"dataset_id")
const response = await datasetService.createDraftDataset(dataset).catch(err => { return err })
if (response?.name === 'SequelizeUniqueConstraintError') {
if (response?.name === "SequelizeUniqueConstraintError") {
if (overwrite === "true") {
const overwriteRes = await datasetService.updateDraftDataset(dataset).catch(err=>{
const overwriteRes = await datasetService.updateDraftDataset(dataset).catch(()=>{
throw obsrvError(dataset_id, "DATASET_IMPORT_FAILURE", `Failed to import dataset: ${dataset_id} as overwrite failed`, "INTERNAL_SERVER_ERROR", 500);
})
return _.omit(overwriteRes, ["message"])
Expand All @@ -44,7 +44,7 @@ const importDataset = async (dataset: Record<string, any>, overwrite: string | a
const getResponseData = (ignoredConfigs: Record<string, any>) => {
const { ignoredConnectors, ignoredTransformations, ignoredDenorms } = ignoredConfigs;
let successMsg = "Dataset is imported successfully";
let partialIgnored: Record<string, any> = {};
const partialIgnored: Record<string, any> = {};

if (ignoredConnectors.length || ignoredTransformations.length || ignoredDenorms.length) {
successMsg = "Dataset is partially imported";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const datasetImportValidation = async (payload: Record<string, any>): Pro
throw obsrvError("", "DATASET_IMPORT_INVALID_CONFIGS", isRequestValid.message, "BAD_REQUEST", 400)
}

let datasetConfig = payload.request;
const datasetConfig = payload.request;

const connectors = _.get(datasetConfig, "connectors_config", []);
const transformations = _.get(datasetConfig, "transformations_config", []);
Expand Down Expand Up @@ -100,7 +100,7 @@ export const migrateExportedDatasetV1 = (requestPayload: Record<string, any>) =>
const { dataset_id, timestamp_key = "", data_key = "", type: datasetType } = _.get(datasetPayload, "data.metadata")
const type = datasetType === "master-dataset" ? DatasetType.master : DatasetType.event

let dataset: Record<string, any> = {
const dataset: Record<string, any> = {
dataset_id, id: dataset_id, name: dataset_id, type,
version_key: Date.now().toString(),
api_version: "v2",
Expand Down
Loading

0 comments on commit b9174ea

Please sign in to comment.