Skip to content

Commit

Permalink
Merge pull request #409 from Sreejit-K/metrics_Integration_For_ticker…
Browse files Browse the repository at this point in the history
…_funcitonality

Metrics service Integration for ticker functionality and DB migration Script in Donor service
  • Loading branch information
srprasanna authored Sep 8, 2023
2 parents d1564df + a35226c commit 2ffad32
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 40 deletions.
8 changes: 7 additions & 1 deletion backend/donor-service/configs/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const ESIGN_STATUS = Object.freeze({
const ESIGN_VALIDATION_KAFKA_BROKERS = process.env.ESIGN_VALIDATION_KAFKA_BROKERS || 'localhost:5101';
let ESIGN_VALIDATION_EXPIRE_TIME = process.env.ESIGN_VALIDATION_EXPIRE_TIME || 2*60;
const ESIGN_VALIDATION_PREVENT_3RD_PARTY = process.env.ESIGN_VALIDATION_PREVENT_3RD_PARTY === "true" || false;
const METRICS_URL = process.env.METRICS_URL || 'http://metrics:8070';
const KAFKA_BROKER_FOR_METRICS_SERVICE = process.env.KAFKA_BROKER_FOR_METRICS_SERVICE || 'kafka:9092';
const METRICS_TOPIC = process.env.METRICS_TOPIC || 'events';
const ESIGN_VALIDATION_KAFKA_TOPIC = process.env.ESIGN_VALIDATION_KAFKA_TOPIC || 'esign_topic';
const ESIGN_VALIDATION_KAFKA_TOPIC_GROUP = process.env.ESIGN_VALIDATION_KAFKA_TOPIC_GROUP || 'dev_esign_group_1';
const ESIGN_VALIDATION_CLIENT_ID = process.env.ESIGN_VALIDATION_CLIENT_ID || "dev-esign-client";
Expand Down Expand Up @@ -75,5 +78,8 @@ module.exports = {
ESIGN_VALIDATION_PREVENT_3RD_PARTY,
ESIGN_VALIDATION_KAFKA_TOPIC,
ESIGN_VALIDATION_KAFKA_TOPIC_GROUP,
ESIGN_VALIDATION_CLIENT_ID,
METRICS_URL,
KAFKA_BROKER_FOR_METRICS_SERVICE,
METRICS_TOPIC,
ESIGN_VALIDATION_CLIENT_ID
}
34 changes: 34 additions & 0 deletions backend/donor-service/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const {PLEDGE_STATUS, GENDER_MAP, SOCIAL_SHARE_TEMPLATE_MAP} = require('./config
const app = express();
const {convertToSocialShareResponse} = require("./utils/utils");
const consumer = require("./services/esign.consumer");
const metricsService = require('./services/metrics.service');
const dataMigrationScript = require('./utils/scriptForDataMigration');

(async() => {
await redis.initRedis({REDIS_URL: config.REDIS_URL});
Expand Down Expand Up @@ -749,4 +751,36 @@ app.use(function(err, req, res, next) {
message: "Internal server error"
})
});

// Metrics Sercvice Integration
app.get('/getMetrics/:schemaType', (req,res) => metricsService.getMetrics(req,res));


// Data Migration Script for NHA PROD
//
// pathParmas ---> entityName ex: /Pledge
// sample request body :
// {
// "osIdArray" : ["b6189f8f-9ef3-499f-961b-94bdd540f3cd"]
// }
app.post('/pushDataToCH/:entityName', async (req,res) => {
let entityName = req.params.entityName;
let osidArray = req.body.osIdArray;
console.log(osidArray, entityName);
let accessToken = await getServiceAccountToken();
try {
for (let i=0 ; i< osidArray.length; i++) {
await dataMigrationScript.migrateDataToCH(entityName,osidArray[i], accessToken);
await new Promise(resolve => setTimeout(resolve, 500)); // wait for 500 millisecons
}
res.status(200).send({
message: "Data Migaration Successfull!",
})
} catch (error) {
console.log("error while db migraiton",error);
res.status(500).send(error)
}

})

app.listen('3000');
2 changes: 1 addition & 1 deletion backend/donor-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion backend/donor-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
"dependencies": {
"axios": "^1.2.5",
"body-parser": "^1.20.1",
"clickhouse": "^2.6.0",
"cors": "^2.8.5",
"express": "^4.18.2",
"express-async-errors": "^3.1.1",
"form-data": "^4.0.0",
"kafkajs": "^2.2.4",
"nodemon": "^2.0.20",
"object-path": "^0.11.8",
"qs": "^6.11.0",
"ramda": "^0.28.0",
"redis": "^4.6.1",
"redis": "^4.6.7",
"sharp": "^0.32.1",
"swagger-ui-express": "^4.6.0",
"yamljs": "^0.3.0"
Expand Down
2 changes: 1 addition & 1 deletion backend/donor-service/services/createAbha.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async function verifyAadhaarOTP(req, res) {
res.json({new: verifyAadhaarOTPResponse.new, txnId: verifyAadhaarOTPResponse.txnId})
return;
}
let abhaRegistered = await isABHARegistered(verifyAadhaarOTPResponse.healthIdNumber, true)
let abhaRegistered = await isABHARegistered(verifyAadhaarOTPResponse.healthIdNumber)
if (abhaRegistered) {
await utils.checkForPledgeStatusAndReturnError(verifyAadhaarOTPResponse.healthIdNumber);
}
Expand Down
31 changes: 31 additions & 0 deletions backend/donor-service/services/kafka.producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const config = require('../configs/config');
const { Kafka, Partitioners } = require('kafkajs');

// Connect and send the event
async function emitEventToKafka(topic, telemetryEvent) {

const kafka = new Kafka({
clientId: 'dB-Migration',
brokers: config.KAFKA_BROKER_FOR_METRICS_SERVICE?.split(","), // List your Kafka brokers
createPartitioner: Partitioners.LegacyPartitioner
});

const producer = kafka.producer();

await producer.connect();
await producer.send({
topic,
messages: [
{
value: JSON.stringify(telemetryEvent)
}
]
});

await producer.disconnect();
}

module.exports = {
emitEventToKafka
}

43 changes: 43 additions & 0 deletions backend/donor-service/services/metrics.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { default: axios } = require('axios');
const config = require('../configs/config');
const consants = require('../configs/constants');
const redis = require('../services/redis.service');
const abhaProfile = require('./abhaProfile.service');
const { ConfigSource } = require('kafkajs');
const utils =require('../utils/utils');

const lengthOfKey = 15;
async function getMetrics(req, res) {
try {
let schemaType = req.params.schemaType;
if (!schemaType) schemaType = "Pledge";
let metricsURL = `${config.METRICS_URL}/v1/metrics`;
const key = abhaProfile.getKeyBasedOnEntityName(schemaType);
let dataFromMetricsSerice = ((await axios.get(metricsURL)).data);
let keys = await redis.getAllKeys(key);
keys = keys.filter((i) => i.length === lengthOfKey);
if (Object.keys(dataFromMetricsSerice).length > 0 ) {
dataFromMetricsSerice[schemaType.toString().toLowerCase()]["UNPLEDGED"] = await getUnpledgedCount( keys);
} else {
dataFromMetricsSerice[schemaType.toString().toLowerCase()] = {};
dataFromMetricsSerice[schemaType.toString().toLowerCase()]["UNPLEDGED"] = await getUnpledgedCount( keys);
}
res.json(dataFromMetricsSerice);
} catch (err) {
const error = utils.getErrorObject(err)
res.status(error.status).send(error)
}
}

async function getUnpledgedCount (keys) {
let count = 0 ;
for (let i=0 ; i< keys.length; i++) {
let value = await redis.getKey(keys[i]);
if ( value === consants.PLEDGE_STATUS.UNPLEDGED) count++;
}
return count;
}

module.exports ={
getMetrics,
}
7 changes: 6 additions & 1 deletion backend/donor-service/services/redis.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ async function deleteKey(key) {
await client.del(key);
}

async function getAllKeys(keyType) {
return await client.sendCommand(['KEYS', `${keyType}**************`]);
}

module.exports = {
storeKeyWithExpiry,
initRedis,
Expand All @@ -54,5 +58,6 @@ module.exports = {
increment,
storeKey,
storeHashWithExpiry,
getHash
getHash,
getAllKeys
};
65 changes: 65 additions & 0 deletions backend/donor-service/utils/scriptForDataMigration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const { default: axios } = require('axios');
const config = require('../configs/config');
const op = require('object-path');
const kafkaClient =require('../services/kafka.producer');


const topicName = `${config.METRICS_TOPIC}`;

async function migrateDataToCH (entityName, entityId, accessToken) {

try {
let searchURL= `${config.REGISTRY_URL}/api/v1/${entityName}/${entityId}`;
console.log(searchURL);
let getTheDataFromRegistry = await axios.get(searchURL, {
headers: {
'Authorization': `Bearer ${accessToken}`,
}
});
if (getTheDataFromRegistry.data && getTheDataFromRegistry.data != '') {
let transformedData = transformTheData(getTheDataFromRegistry.data, entityName);
let telemertryObject = getTelemtryObject( entityName , entityId,transformedData);
await kafkaClient.emitEventToKafka(topicName, telemertryObject)
console.log("Migration was successfull!");
}
} catch (error) {
console.log(error);
console.log(error?.response?.data);
throw new Error("Error While DB Migration : ", error);
}
}


function transformTheData (data, entityName) {
let transformedData = data
let getInternalFeilds = require(`../schemas/${entityName}.json`);
if (getInternalFeilds?._osConfig?.internalFields) getInternalFeilds = getInternalFeilds?._osConfig?.internalFields
else throw new Error("Couldnt fetch the schema / internal feilds - check the schema directory!");
for (let i=0 ; i < getInternalFeilds.length; i++) {
let attribute = getInternalFeilds[i].split('$.')[1];
op.del(transformedData, attribute);
}
return transformedData;
}

function getTelemtryObject (entityName , entityId , filteredData) {
return {
"eid" : "ADD",
"ets" : Date.now(),
"ver" : "3.1",
"mid" : "22e83ab6-f8c5-47af-a84b-c7113e1feb76",
"actor" : {
"id" : "",
"type" : "USER"
},
"object" : {
"id" : entityId,
"type" : entityName
},
"edata" : filteredData
}
}

module.exports ={
migrateDataToCH,
}
90 changes: 62 additions & 28 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
timeout: 5s
retries: 5
registry:
image: tejashjl/sunbird-rc-core:v0.0.13-7
image: dockerhub/sunbird-rc-core:v0.0.14
volumes:
- ${PWD}/${SCHEMA_DIR-java/registry/src/main/resources/public/_schemas}:/home/sunbirdrc/config/public/_schemas
environment:
Expand Down Expand Up @@ -77,13 +77,18 @@ services:
- external_entities=nha-admin
- invite_required_validation_enabled=false
- invite_signature_enabled=false
- event_enabled=${ENABLE_METRICS_SERVICE-false}
- event_topic=${METRICS_TOPIC-events}
- event_providerName=dev.sunbirdrc.registry.service.impl.KafkaEventService
ports:
- "8081:8081"
depends_on:
es:
condition: service_healthy
db:
condition: service_healthy
kafka:
condition: service_started
healthcheck:
test: [ "CMD-SHELL", "wget -nv -t1 --spider http://localhost:8081/health || exit 1" ]
interval: 30s
Expand Down Expand Up @@ -217,33 +222,57 @@ services:
interval: 30s
timeout: 10s
retries: 10
# zookeeper:
# image: confluentinc/cp-zookeeper:latest
# ports:
# - "2181:2181"
# environment:
# ZOOKEEPER_CLIENT_PORT: "2181"
# ZOOKEEPER_TICK_TIME: "2000"
# kafka:
# image: confluentinc/cp-kafka:latest
# depends_on:
# zookeeper:
# condition: service_started
# ports:
# - "9092:9092"
# environment:
# KAFKA_BROKER_ID: "1"
# KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
# KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,OUTSIDE://localhost:9094"
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
# KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
# healthcheck:
# test:
# [ "CMD", "kafka-topics", "--list", "--zookeeper", "zookeeper:2181" ]
# interval: 30s
# timeout: 10s
# retries: 10
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_TICK_TIME: "2000"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
zookeeper:
condition: service_started
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: "1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,OUTSIDE://localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
healthcheck:
test:
[ "CMD", "kafka-topics", "--list", "--zookeeper", "zookeeper:2181" ]
interval: 30s
timeout: 10s
retries: 10
metrics:
image: dockerhub/sunbird-rc-metrics
environment:
CLICK_HOUSE_URL: clickhouse:9000
CLICKHOUSE_DATABASE: default
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_METRICS_TOPIC: ${METRICS_TOPIC-events}
REDIS_URL: redis:6379
ports:
- "8070:8070"
depends_on:
kafka:
condition: service_started
registry:
condition: service_healthy
clickhouse:
image: clickhouse/clickhouse-server:head-alpine
volumes:
- ./${ClickHosue_DIR-ch-data}:/var/lib/clickhouse
- ${PWD}/${SCHEMA_DIR-java/registry/src/main/resources/public/_schemas}:/app/schemas
ports:
- "9002:9000"
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
public-key-service:
image: dockerhub/sunbird-rc-public-key-service
environment:
Expand Down Expand Up @@ -300,6 +329,8 @@ services:
- "8003:8003"
donor-service:
build: backend/donor-service
volumes:
- ${PWD}/${SCHEMA_DIR-java/registry/src/main/resources/public/_schemas}:/app/schemas
ports:
- "3000:3000"
healthcheck:
Expand Down Expand Up @@ -342,3 +373,6 @@ services:
ESIGN_VALIDATION_KAFKA_BROKERS: ${ESIGN_VALIDATION_KAFKA_BROKERS}
ESIGN_VALIDATION_KAFKA_TOPIC: ${ESIGN_VALIDATION_KAFKA_TOPIC}
ESIGN_VALIDATION_KAFKA_TOPIC_GROUP: ${ESIGN_VALIDATION_KAFKA_TOPIC_GROUP}
METRICS_URL: ${METRICS_URL}
KAFKA_BROKER_FOR_METRICS_SERVICE: ${KAFKA_BROKER_FOR_METRICS_SERVICE}
METRICS_TOPIC: ${METRICS_TOPIC}
2 changes: 1 addition & 1 deletion donor-registry/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2ffad32

Please sign in to comment.