Skip to content

Commit

Permalink
Merge pull request #33 from bcgov/EMSEDT_ERROR_LOGS
Browse files Browse the repository at this point in the history
EMSEDT File Errors
  • Loading branch information
vmanawat authored Sep 23, 2024
2 parents 22798f8 + 9f61293 commit 4c217ca
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 88 deletions.
29 changes: 19 additions & 10 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ model file_submission {
create_utc_timestamp DateTime @db.Timestamp(6)
update_user_id String @db.VarChar(200)
update_utc_timestamp DateTime @db.Timestamp(6)
file_error_logs file_error_logs[]
file_operation_codes file_operation_codes @relation(fields: [file_operation_code], references: [file_operation_code], onDelete: NoAction, onUpdate: NoAction, map: "file_operation_code_fk")
submission_status submission_status_code @relation(fields: [submission_status_code], references: [submission_status_code], onDelete: NoAction, onUpdate: NoAction, map: "submission_status_code_fk")
}
Expand Down Expand Up @@ -228,16 +229,6 @@ model aqi_sample_fractions {
update_utc_timestamp DateTime @db.Timestamp(6)
}

model aqi_data_classifications {
aqi_data_classifications_id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
custom_id String @db.VarChar(200)
description String? @db.VarChar(2000)
create_user_id String @db.VarChar(200)
create_utc_timestamp DateTime @db.Timestamp(6)
update_user_id String @db.VarChar(200)
update_utc_timestamp DateTime @db.Timestamp(6)
}

model file_operation_codes {
file_operation_code String @id(map: "file_operation_code_pk") @db.VarChar(20)
description String @db.VarChar(250)
Expand All @@ -249,3 +240,21 @@ model file_operation_codes {
update_utc_timestamp DateTime @db.Timestamp(6)
file_submission file_submission[]
}

model file_error_logs {
file_error_log_id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
file_submission_id String? @db.Uuid
file_name String? @db.VarChar(200)
error_log Json?
file_submission file_submission? @relation(fields: [file_submission_id], references: [submission_id], onDelete: NoAction, onUpdate: NoAction, map: "file_submission_id_fk")
}

model aqi_data_classifications {
aqi_data_classifications_id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
custom_id String @db.VarChar(200)
description String? @db.VarChar(2000)
create_user_id String @db.VarChar(200)
create_utc_timestamp DateTime @db.Timestamp(6)
update_user_id String @db.VarChar(200)
update_utc_timestamp DateTime @db.Timestamp(6)
}
61 changes: 23 additions & 38 deletions backend/src/aqi_api/aqi_api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export class AqiApiService {
private readonly logger = new Logger(AqiApiService.name);
private axiosInstance: AxiosInstance;

private wait = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
private wait = (seconds: number) =>
new Promise((resolve) => setTimeout(resolve, seconds * 1000));

constructor(private prisma: PrismaService) {
this.axiosInstance = axios.create({
Expand Down Expand Up @@ -105,26 +105,27 @@ export class AqiApiService {
`API call to Observation Import succeeded: ${response.status}`,
);
const statusURL = response.headers.location;
const obsResults = await this.getObservationsStatusResult(statusURL);
const obsStatus = await this.getObservationsStatusResult(statusURL);

const errorMessages = this.parseObsResultResponse(obsResults);
const errorMessages = this.parseObsResultResponse(obsStatus);
return errorMessages;
}
} catch (err) {
console.error("API call to Observation Import failed: ", err);
}
}

async getObservationsStatusResult(location: string) {
async getObservationsStatusResult(statusURL: string) {
try {
const response = await axios.get(location, {
const response = await axios.get(statusURL, {
headers: {
Authorization: `token ${process.env.AQI_ACCESS_TOKEN}`,
"x-api-key": process.env.AQI_ACCESS_TOKEN,
},
});

await this.wait(15000);
await this.wait(9);
console.log(response.data.id)

const obsResultResponse = await axios.get(
`${process.env.AQI_BASE_URL}/v2/observationimports/${response.data.id}/result`,
Expand Down Expand Up @@ -157,16 +158,16 @@ export class AqiApiService {
}

parseObsResultResponse(obsResults: any) {
let errorMessages = "";
let errorMessages = [];
if (obsResults.errorCount > 0) {
obsResults.importItems.forEach((item) => {
const rowId = item.rowId;
const errors = item.errors;

Object.entries(errors).forEach((error) => {
errorMessages += `ERROR: Row ${rowId} Observation file: ${error[1][0].errorMessage}\n`;
let errorLog = `{"rowNum": ${rowId}, "type": "ERROR", "message": {"Observation File": "${error[1][0].errorMessage}"}}`;
errorMessages.push(JSON.parse(errorLog));
});
errorMessages += "\n";
});
}
return errorMessages;
Expand Down Expand Up @@ -236,37 +237,21 @@ export class AqiApiService {
}
}

mergeErrorMessages(localErrors: string, remoteErrors: string) {
const localErrorLines = localErrors.split("\n");
const remoteErrorLines = remoteErrors.split("\n");

const errorMap: any = {};

const extractRowNumber = (line: string): number | null => {
const match = line.match(/Row (\d+)/);
return match ? parseInt(match[1]) : null;
};
mergeErrorMessages(localErrors: any[], remoteErrors: any[]) {
const map = new Map<number, any>();

const addErrorsToMap = (lines: string[]) => {
lines.forEach((line) => {
const rowNumber = extractRowNumber(line);
if (rowNumber !== null) {
if (!errorMap[rowNumber]) {
errorMap[rowNumber] = [];
}
errorMap[rowNumber].push(line);
}
});
const mergeItem = (item: any) => {
const exists = map.get(item.rowNum);
map.set(
item.rowNum,
exists
? { ...exists, message: { ...exists.message, ...item.message } }
: item,
);
};

addErrorsToMap(localErrorLines);
addErrorsToMap(remoteErrorLines);

const mergedErrors = Object.keys(errorMap)
.sort((a, b) => parseInt(a) - parseInt(b))
.flatMap((row) => errorMap[parseInt(row)])
.join("\n");
[...localErrors, ...remoteErrors].forEach(mergeItem);

return mergedErrors;
return Array.from(map.values());
}
}
14 changes: 7 additions & 7 deletions backend/src/cron-job/cron-job.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,20 +433,20 @@ export class CronJobService {
grab all the files from the DB and S3 bucket that have a status of QUEUED
for each file returned, change the status to INPROGRESS and go to the parser
*/
if (!this.dataPullDownComplete) {
this.logger.warn("Data pull down from AQSS did not complete");
return;
}
// if (!this.dataPullDownComplete) {
// this.logger.warn("Data pull down from AQSS did not complete");
// return;
// }

let filesToValidate = await this.fileParser.getQueuedFiles();

if (filesToValidate.length < 1) {
console.log("************** NO FILES TO VALIDATE **************");
return
return;
} else {
for (const file of filesToValidate) {
const fileBinary = await this.objectStore.getFileData(file.file_name);

this.fileParser.parseFile(
fileBinary,
file.file_name,
Expand All @@ -455,7 +455,7 @@ export class CronJobService {
);
}
this.dataPullDownComplete = false;
return
return;
}
}
}
Loading

0 comments on commit 4c217ca

Please sign in to comment.