From bc5156a388f906cd6ac8a74628856e81ac3257c8 Mon Sep 17 00:00:00 2001 From: Robert Johnstone Date: Thu, 20 Jul 2023 20:44:31 +0800 Subject: [PATCH 1/2] minimal example of steaming CSV processing --- api/package-lock.json | 48 +++++++++- api/package.json | 2 + api/src/paths/activities.ts | 151 +++++++++++++++++++------------- api/src/utils/iapp-csv-utils.ts | 150 ++++++++++++++++++++++++++++--- 4 files changed, 275 insertions(+), 76 deletions(-) diff --git a/api/package-lock.json b/api/package-lock.json index 55944cd65..0c2a6f734 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -46,6 +46,7 @@ "node-fetch": "^2.6.7", "object-path": "^0.11.8", "pg": "~8.7.0", + "pg-cursor": "^2.10.1", "pg-format": "^1.0.4", "proj4": "^2.6.3", "qs": "~6.9.7", @@ -78,6 +79,7 @@ "@types/mocha": "~8.0.1", "@types/node": "^17.0.23", "@types/pg": "~7.14.4", + "@types/pg-cursor": "^2.7.0", "@types/supertest": "^2.0.12", "@types/uuid": "~8.3.0", "@types/yamljs": "~0.2.31", @@ -874,6 +876,16 @@ "pg-types": "^2.2.0" } }, + "node_modules/@types/pg-cursor": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/@types/pg-cursor/-/pg-cursor-2.7.0.tgz", + "integrity": "sha512-4Milg/OUqTO2VuPvvRwPxaQTaiVb+bXvSK+ZCwiHjwinbD4/lPqV9AREg8sJAT0cy5ruY38aaajBc2FbdPaKcA==", + "dev": true, + "dependencies": { + "@types/node": "*", + "@types/pg": "*" + } + }, "node_modules/@types/qs": { "version": "6.9.7", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.7.tgz", @@ -9177,6 +9189,14 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "node_modules/pg-cursor": { + "version": "2.10.1", + "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.10.1.tgz", + "integrity": "sha512-t4bjqL/gtohsNoByFcD4EKjHVppOuJipBYVhunVUm25TnaS2xEEr2jy3vumOX2z44SHROaf4K7QWPY9xnKT3HA==", + "peerDependencies": { + "pg": "^8" + } + }, "node_modules/pg-format": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/pg-format/-/pg-format-1.0.4.tgz", @@ -13179,6 +13199,16 @@ "pg-types": "^2.2.0" } }, + "@types/pg-cursor": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/@types/pg-cursor/-/pg-cursor-2.7.0.tgz", + "integrity": "sha512-4Milg/OUqTO2VuPvvRwPxaQTaiVb+bXvSK+ZCwiHjwinbD4/lPqV9AREg8sJAT0cy5ruY38aaajBc2FbdPaKcA==", + "dev": true, + "requires": { + "@types/node": "*", + "@types/pg": "*" + } + }, "@types/qs": { "version": "6.9.7", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.7.tgz", @@ -13463,7 +13493,8 @@ "version": "5.3.2", "resolved": "https://registry.npmjs.org/acorn-jsx/-/acorn-jsx-5.3.2.tgz", "integrity": "sha512-rq9s+JNhf0IChjtDXxllJ7g41oZk5SlXtp0LHwyA5cejwn7vKmKp4pPri6YEePv2PU65sAsegbXtIinmDFDXgQ==", - "dev": true + "dev": true, + "requires": {} }, "acorn-walk": { "version": "7.2.0", @@ -16311,7 +16342,8 @@ "fs-routes": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fs-routes/-/fs-routes-7.0.1.tgz", - "integrity": "sha512-kSAfx/P8oLSi5+tblecTETcJJ/Q+qL+xzGx4hns/+gHXMkTOZEzG73/2dBDW1FFy5+ZW080XoMaBAN2kCN55aQ==" + "integrity": "sha512-kSAfx/P8oLSi5+tblecTETcJJ/Q+qL+xzGx4hns/+gHXMkTOZEzG73/2dBDW1FFy5+ZW080XoMaBAN2kCN55aQ==", + "requires": {} }, "fs.realpath": { "version": "1.0.0", @@ -19706,6 +19738,12 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "pg-cursor": { + "version": "2.10.1", + "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.10.1.tgz", + "integrity": "sha512-t4bjqL/gtohsNoByFcD4EKjHVppOuJipBYVhunVUm25TnaS2xEEr2jy3vumOX2z44SHROaf4K7QWPY9xnKT3HA==", + "requires": {} + }, "pg-format": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/pg-format/-/pg-format-1.0.4.tgz", @@ -19719,7 +19757,8 @@ "pg-pool": { "version": "3.4.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.4.0.tgz", - "integrity": "sha512-ddOpyZ2C7ySddu9sQ2t+91LeWT2Lv186kntK4TfPcoV9qH5OOcZAIWrzkGzwhisfHv2pk913/VuT1dPOUXzDMQ==" + "integrity": "sha512-ddOpyZ2C7ySddu9sQ2t+91LeWT2Lv186kntK4TfPcoV9qH5OOcZAIWrzkGzwhisfHv2pk913/VuT1dPOUXzDMQ==", + "requires": {} }, "pg-protocol": { "version": "1.5.0", @@ -22233,7 +22272,8 @@ "ws": { "version": "8.4.2", "resolved": "https://registry.npmjs.org/ws/-/ws-8.4.2.tgz", - "integrity": "sha512-Kbk4Nxyq7/ZWqr/tarI9yIt/+iNNFOjBXEWgTb4ydaNHBNGgvf2QHbS9fdfsndfjFlFwEd4Al+mw83YkaD10ZA==" + "integrity": "sha512-Kbk4Nxyq7/ZWqr/tarI9yIt/+iNNFOjBXEWgTb4ydaNHBNGgvf2QHbS9fdfsndfjFlFwEd4Al+mw83YkaD10ZA==", + "requires": {} }, "xdg-basedir": { "version": "4.0.0", diff --git a/api/package.json b/api/package.json index dc944e475..4b63f2eb0 100644 --- a/api/package.json +++ b/api/package.json @@ -62,6 +62,7 @@ "node-fetch": "^2.6.7", "object-path": "^0.11.8", "pg": "~8.7.0", + "pg-cursor": "^2.10.1", "pg-format": "^1.0.4", "proj4": "^2.6.3", "qs": "~6.9.7", @@ -94,6 +95,7 @@ "@types/mocha": "~8.0.1", "@types/node": "^17.0.23", "@types/pg": "~7.14.4", + "@types/pg-cursor": "^2.7.0", "@types/supertest": "^2.0.12", "@types/uuid": "~8.3.0", "@types/yamljs": "~0.2.31", diff --git a/api/src/paths/activities.ts b/api/src/paths/activities.ts index 99465b5a3..f3a45c067 100644 --- a/api/src/paths/activities.ts +++ b/api/src/paths/activities.ts @@ -1,18 +1,19 @@ 'use strict'; -import {RequestHandler} from 'express'; -import {Operation} from 'express-openapi'; -import {SQLStatement} from 'sql-template-strings'; -import {ALL_ROLES, SEARCH_LIMIT_MAX, SEARCH_LIMIT_DEFAULT, SECURITY_ON} from '../constants/misc'; -import {getDBConnection} from '../database/db'; -import {ActivitySearchCriteria} from '../models/activity'; -import {getActivitiesSQL, deleteActivitiesSQL} from '../queries/activity-queries'; -import {getLogger} from '../utils/logger'; -import {InvasivesRequest} from '../utils/auth-utils'; -import {createHash} from 'crypto'; +import { RequestHandler } from 'express'; +import { Operation } from 'express-openapi'; +import { SQLStatement } from 'sql-template-strings'; +import { ALL_ROLES, SEARCH_LIMIT_MAX, SEARCH_LIMIT_DEFAULT, SECURITY_ON } from '../constants/misc'; +import { getDBConnection } from '../database/db'; +import { ActivitySearchCriteria } from '../models/activity'; +import { getActivitiesSQL, deleteActivitiesSQL } from '../queries/activity-queries'; +import { getLogger } from '../utils/logger'; +import { InvasivesRequest } from '../utils/auth-utils'; +import { createHash } from 'crypto'; import cacheService from '../utils/cache/cache-service'; -import {mapSitesRowsToCSV} from '../utils/iapp-csv-utils'; -import {versionedKey} from "../utils/cache/cache-utils"; +import { mapSingleRowToCSV, mapSitesRowsToCSV } from '../utils/iapp-csv-utils'; +import { versionedKey } from "../utils/cache/cache-utils"; +import Cursor from 'pg-cursor'; const defaultLog = getLogger('activity'); const CACHENAME = "Activities - Fat"; @@ -136,6 +137,47 @@ DELETE.apiDoc = { } }; +function _sanitizeRow(isAuth, row) { + const sanitized = { ...row }; + + sanitized.activity_payload.created_by = null; + sanitized.activity_payload.updated_by = null; + sanitized.activity_payload.reviewed_by = null; + sanitized.activity_payload.user_role = []; + if (sanitized.activity_payload?.form_data?.activity_type_data) { + sanitized.activity_payload.form_data.activity_type_data.activity_persons = []; + } + return sanitized; +} + + +async function _handleStreamingCSVResponse(isAuth, response, cursor, CSVType) { + + response + .status(200) + .contentType('text/csv') + .set('Content-Disposition', 'attachment; filename="export.csv"'); + + + while (true) { + const rows = await cursor.read(10); + if (rows.length === 0) { + break; + } + + rows.forEach((row) => { + try { + const csvRow = mapSingleRowToCSV(_sanitizeRow(isAuth, row), CSVType); + response.write(csvRow); + } catch (e) { + defaultLog.error({ error: e }); + response.write('Error processing row\r\n'); + } + }); + } + response.end(); +} + /** * Fetches all activity records based on request search filter criteria. * @@ -145,7 +187,7 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { return async (req: InvasivesRequest, res) => { const criteria = JSON.parse(req.query['query']); - defaultLog.debug({label: 'activity', message: 'getActivitiesBySearchFilterCriteria', body: criteria}); + defaultLog.debug({ label: 'activity', message: 'getActivitiesBySearchFilterCriteria', body: criteria }); const roleName = (req as any).authContext.roles[0]?.role_name; const sanitizedSearchCriteria = new ActivitySearchCriteria(criteria); @@ -153,7 +195,7 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { const isAuth = req.authContext?.user !== null ? true : false; const user_role = (req as any).authContext?.roles?.[0]?.role_id; if (user_role) { - const user_roles = Array.from({length: user_role}, (_, i) => i + 1); + const user_roles = Array.from({ length: user_role }, (_, i) => i + 1); sanitizedSearchCriteria.user_roles = user_roles; } @@ -169,10 +211,10 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { const connection = await getDBConnection(); if (!connection) { - defaultLog.error({label: 'activity', message: 'getActivitiesBySearchFilterCriteria', body: criteria}); + defaultLog.error({ label: 'activity', message: 'getActivitiesBySearchFilterCriteria', body: criteria }); return res .status(503) - .json({message: 'Database connection unavailable', request: criteria, namespace: 'activities', code: 503}); + .json({ message: 'Database connection unavailable', request: criteria, namespace: 'activities', code: 503 }); } // we'll send it later, overriding cache headers as appropriate @@ -219,7 +261,7 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { return res.status(200).set(responseCacheHeaders).json(cachedResult); } } catch (e) { - const message = (e === undefined) ? 'undefined' : e.message + const message = (e === undefined) ? 'undefined' : e.message; defaultLog.warn( { message: 'caught an error while checking cache. this is odd but continuing with request as though no cache present.', @@ -234,48 +276,39 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { if (!sqlStatement) { return res .status(500) - .json({message: 'Unable to generate SQL statement', request: criteria, namespace: 'activities', code: 500}); + .json({ message: 'Unable to generate SQL statement', request: criteria, namespace: 'activities', code: 500 }); } - // needs to be mutable - let response = await connection.query(sqlStatement.text, sqlStatement.values); - if (!isAuth) { - if (response.rows.length > 0) { - // remove sensitive data from json obj - for (var i in response.rows) { - response.rows[i].activity_payload.created_by = null; - response.rows[i].activity_payload.updated_by = null; - response.rows[i].activity_payload.reviewed_by = null; - response.rows[i].activity_payload.user_role = []; - if (response.rows[i].activity_payload.form_data.activity_type_data) { - response.rows[i].activity_payload.form_data.activity_type_data.activity_persons = []; - } - } - } - } - - const responseBody = { - message: 'Got activities by search filter criteria', - request: criteria, - result: response.rows, - count: response.rowCount, - namespace: 'activities', - code: 200 - }; + if (sanitizedSearchCriteria.isCSV) { + const resultSetCursor = await connection.query(new Cursor(sqlStatement.text, sqlStatement.values)); - if (ETag !== null) { - // save for later; - await cache.put(ETag, responseBody); - } + return await _handleStreamingCSVResponse( + isAuth, + res, + resultSetCursor, + sanitizedSearchCriteria.CSVType + ); - if (sanitizedSearchCriteria.isCSV) { - const responseCSV = response.rowCount > 0 ? await mapSitesRowsToCSV(response, sanitizedSearchCriteria.CSVType) : []; - return res.status(200).set(responseCacheHeaders).contentType('text/csv').set('Content-Disposition', 'attachment; filename="export.csv"').send(responseCSV as unknown as string); } else { + const resultSet = await connection.query(sqlStatement.text, sqlStatement.values); + + const responseBody = { + message: 'Got activities by search filter criteria', + request: criteria, + result: resultSet.rows.map((r) => _sanitizeRow(isAuth, r)), + count: resultSet.rowCount, + namespace: 'activities', + code: 200 + }; + + if (ETag !== null) { + // save for later; + await cache.put(ETag, responseBody); + } return res.status(200).set(responseCacheHeaders).json(responseBody); } } catch (error) { - defaultLog.debug({label: 'getActivitiesBySearchFilterCriteria', message: 'error', error}); + defaultLog.error({ label: 'getActivitiesBySearchFilterCriteria', message: 'error', error }); return res.status(500).json({ message: 'Error getting activities by search filter criteria', error, @@ -295,13 +328,13 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { */ function deleteActivitiesByIds(): RequestHandler { return async (req: InvasivesRequest, res) => { - defaultLog.debug({label: 'activity', message: 'deleteActivitiesByIds', body: req.body}); + defaultLog.debug({ label: 'activity', message: 'deleteActivitiesByIds', body: req.body }); const sanitizedSearchCriteria = new ActivitySearchCriteria({ keycloakToken: req.keycloakToken }); - const isAdmin = (req as any).authContext.roles.find(role => role.role_id === 18) + const isAdmin = (req as any).authContext.roles.find(role => role.role_id === 18); const preferred_username = req.authContext.preferredUsername; const ids = Object.values(req.query.id) as string[]; sanitizedSearchCriteria.activity_ids = ids; @@ -310,7 +343,7 @@ function deleteActivitiesByIds(): RequestHandler { if (!connection) { return res .status(503) - .json({message: 'Database connection unavailable', request: req.body, namespace: 'activities', code: 503}); + .json({ message: 'Database connection unavailable', request: req.body, namespace: 'activities', code: 503 }); } if (isAdmin === false) { @@ -319,7 +352,7 @@ function deleteActivitiesByIds(): RequestHandler { if (!sqlStatement) { return res .status(500) - .json({message: 'Unable to generate SQL statement', request: req.body, namespace: 'activities', code: 500}); + .json({ message: 'Unable to generate SQL statement', request: req.body, namespace: 'activities', code: 500 }); } const response = await connection.query(sqlStatement.text, sqlStatement.values); @@ -341,7 +374,7 @@ function deleteActivitiesByIds(): RequestHandler { if (!ids || !ids.length) { return res .status(400) - .json({message: 'Invalid request, no ids provided', request: req.body, namespace: 'activities', code: 400}); + .json({ message: 'Invalid request, no ids provided', request: req.body, namespace: 'activities', code: 400 }); } try { @@ -350,7 +383,7 @@ function deleteActivitiesByIds(): RequestHandler { if (!sqlStatement) { return res .status(500) - .json({message: 'Unable to generate SQL statement', request: req.body, namespace: 'activities', code: 500}); + .json({ message: 'Unable to generate SQL statement', request: req.body, namespace: 'activities', code: 500 }); } const response = await connection.query(sqlStatement.text, sqlStatement.values); @@ -364,10 +397,10 @@ function deleteActivitiesByIds(): RequestHandler { code: 200 }); } catch (error) { - defaultLog.debug({label: 'deleteActivitiesByIds', message: 'error', error}); + defaultLog.debug({ label: 'deleteActivitiesByIds', message: 'error', error }); return res .status(500) - .json({message: 'Error deleting activities by ids', error, namespace: 'activities', code: 500}); + .json({ message: 'Error deleting activities by ids', error, namespace: 'activities', code: 500 }); } finally { connection.release(); } diff --git a/api/src/utils/iapp-csv-utils.ts b/api/src/utils/iapp-csv-utils.ts index 76cf38f1d..9511696dc 100644 --- a/api/src/utils/iapp-csv-utils.ts +++ b/api/src/utils/iapp-csv-utils.ts @@ -1,4 +1,128 @@ -import { format } from 'date-fns'; +import { format, parseISO } from 'date-fns'; + +export const mapSingleRowToCSV = (row: any, templateName: string) => { + // set up callbacks to format specific fields + const fieldFormatMap = {}; + const defaultFormatter = (value) => { + return typeof value === 'string' ? '"' + value + '"' : value; + }; + switch (templateName) { + default: + fieldFormatMap['site_created_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['survey_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['treatment_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['monitoring_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['collection_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['inspection_date'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['last_surveyed_date'] = (value) => { + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['activity_date_time'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); + return date; + }; + fieldFormatMap['created_timestamp'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); + return date; + }; + fieldFormatMap['updated_timestamp'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); + return date; + }; + fieldFormatMap['date_entered'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['date_updated'] = (value) => { + if (value === null) return ''; + const date = format(parseISO(value), 'yyyy-MM-dd'); + return date; + }; + fieldFormatMap['jurisdictions'] = (value) => { + return '"' + value + '"'; + }; + fieldFormatMap['comment'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['comments'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['site_comments'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['site_location'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['bioagent_source'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['survey_comments'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['treatment_comments'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['monitoring_comments'] = (value) => { + return '"' + value.replace(/\"/g, '') + '"'; + }; + fieldFormatMap['jurisdiction'] = (value) => { + return '"' + value + '"'; + }; + fieldFormatMap['other_surveyors'] = (value) => { + return '"' + value + '"'; + }; + fieldFormatMap['other_applicators'] = (value) => { + return '"' + value + '"'; + }; + break; + } + + return Object.keys(row) + .map((fieldNameRaw: any) => { + const fieldName = fieldNameRaw.trim(); + const formatter = + typeof fieldFormatMap[fieldName] === 'function' ? fieldFormatMap[fieldName] : defaultFormatter; + const unformatted = + typeof row[fieldName] === 'string' ? row[fieldName].replace(/(\r\n|\n|\r)/gm, '') : row[fieldName]; + + try { + return formatter(unformatted); + } catch (e) { + return `Unformattable value ${unformatted}`; + } + }) + .join(',') + "\r\n"; +}; + export const mapSitesRowsToCSV = async (response: any, templateName: string) => { const headers = response.fields.map((fieldObj) => fieldObj?.name).join(',') + '\n'; @@ -12,61 +136,61 @@ export const mapSitesRowsToCSV = async (response: any, templateName: string) => default: fieldFormatMap['site_created_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['survey_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['treatment_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['monitoring_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['collection_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['inspection_date'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['last_surveyed_date'] = (value) => { - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['activity_date_time'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd HH:mm:ss'); + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); return date; }; fieldFormatMap['created_timestamp'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd HH:mm:ss'); + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); return date; }; fieldFormatMap['updated_timestamp'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd HH:mm:ss'); + const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); return date; }; fieldFormatMap['date_entered'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['date_updated'] = (value) => { if (value === null) return ''; - const date = format(value, 'yyyy-MM-dd'); + const date = format(parseISO(value), 'yyyy-MM-dd'); return date; }; fieldFormatMap['jurisdictions'] = (value) => { From e3dad7510ab19b8149f7aa94bf0bbca06f4a1d42 Mon Sep 17 00:00:00 2001 From: Robert Johnstone Date: Fri, 11 Aug 2023 15:44:19 +0800 Subject: [PATCH 2/2] All CSV reports now run in streaming mode --- api/src/app.ts | 7 +- api/src/paths/activities.ts | 187 ++++++++++------------- api/src/paths/points-of-interest-lean.ts | 2 +- api/src/paths/points-of-interest.ts | 173 +++++++++------------ api/src/utils/iapp-csv-utils.ts | 117 +++++++------- api/src/utils/iapp-json-utils.ts | 126 ++++++++++----- 6 files changed, 307 insertions(+), 305 deletions(-) diff --git a/api/src/app.ts b/api/src/app.ts index 1de975419..b4716a0d3 100644 --- a/api/src/app.ts +++ b/api/src/app.ts @@ -107,8 +107,13 @@ initialize({ message: 'unexpected error', error: error?.message + error?.stack || error }); + if (!res.headersSent) { + // streaming responses cannot alter headers after dispatch + res.status(error.status || error.code || 500).json(error); + } else { + + } - res.status(error.status || error.code || 500).json(error); } }); diff --git a/api/src/paths/activities.ts b/api/src/paths/activities.ts index f3a45c067..d180d1eb5 100644 --- a/api/src/paths/activities.ts +++ b/api/src/paths/activities.ts @@ -11,9 +11,8 @@ import { getLogger } from '../utils/logger'; import { InvasivesRequest } from '../utils/auth-utils'; import { createHash } from 'crypto'; import cacheService from '../utils/cache/cache-service'; -import { mapSingleRowToCSV, mapSitesRowsToCSV } from '../utils/iapp-csv-utils'; import { versionedKey } from "../utils/cache/cache-utils"; -import Cursor from 'pg-cursor'; +import { streamActivitiesResult, streamIAPPResult } from '../utils/iapp-json-utils'; const defaultLog = getLogger('activity'); const CACHENAME = "Activities - Fat"; @@ -137,47 +136,6 @@ DELETE.apiDoc = { } }; -function _sanitizeRow(isAuth, row) { - const sanitized = { ...row }; - - sanitized.activity_payload.created_by = null; - sanitized.activity_payload.updated_by = null; - sanitized.activity_payload.reviewed_by = null; - sanitized.activity_payload.user_role = []; - if (sanitized.activity_payload?.form_data?.activity_type_data) { - sanitized.activity_payload.form_data.activity_type_data.activity_persons = []; - } - return sanitized; -} - - -async function _handleStreamingCSVResponse(isAuth, response, cursor, CSVType) { - - response - .status(200) - .contentType('text/csv') - .set('Content-Disposition', 'attachment; filename="export.csv"'); - - - while (true) { - const rows = await cursor.read(10); - if (rows.length === 0) { - break; - } - - rows.forEach((row) => { - try { - const csvRow = mapSingleRowToCSV(_sanitizeRow(isAuth, row), CSVType); - response.write(csvRow); - } catch (e) { - defaultLog.error({ error: e }); - response.write('Error processing row\r\n'); - } - }); - } - response.end(); -} - /** * Fetches all activity records based on request search filter criteria. * @@ -223,92 +181,109 @@ function getActivitiesBySearchFilterCriteria(): RequestHandler { // server-side cache const cache = cacheService.getCache(CACHENAME); - // check the cache tag to see if, perhaps, the user already has the latest - try { - const cacheQueryResult = await connection.query( - `select updated_at - from cache_versions - where cache_name = $1`, - ['activity'] - ); - const cacheVersion = cacheQueryResult.rows[0].updated_at; - - // because we have parameters and user roles, the actual resource cache tag is - // tuple: (cacheVersion, parameters, roleName) - // hash it for brevity and to obscure the real modification date - - const cacheTagStr = versionedKey(`${CACHENAME} ${cacheVersion} ${JSON.stringify(criteria)} ${roleName}`); - - ETag = createHash('sha1').update(cacheTagStr).digest('hex'); - - // ok, see if we got a conditional request - const ifNoneMatch = req.header('If-None-Match'); - if (ifNoneMatch && ifNoneMatch === ETag) { - // great, we can shortcut this request. - connection.release(); - return res.status(304).send({}); //not-modified - } + if (!sanitizedSearchCriteria.isCSV) { - // we computed ok, so make sure we send it - responseCacheHeaders['ETag'] = ETag; - responseCacheHeaders['Cache-Control'] = 'must-revalidate, max-age=0'; + // check the cache tag to see if, perhaps, the user already has the latest + try { + const cacheQueryResult = await connection.query( + `select updated_at + from cache_versions + where cache_name = $1`, + ['activity'] + ); + const cacheVersion = cacheQueryResult.rows[0].updated_at; - // check server-side cache - const cachedResult = await cache.get(ETag); - if (cachedResult) { - // hit! send this one and save some db traffic - connection.release(); - return res.status(200).set(responseCacheHeaders).json(cachedResult); - } - } catch (e) { - const message = (e === undefined) ? 'undefined' : e.message; - defaultLog.warn( - { - message: 'caught an error while checking cache. this is odd but continuing with request as though no cache present.', - error: message - } - ); - } + // because we have parameters and user roles, the actual resource cache tag is + // tuple: (cacheVersion, parameters, roleName) + // hash it for brevity and to obscure the real modification date - try { - const sqlStatement: SQLStatement = getActivitiesSQL(sanitizedSearchCriteria, false, isAuth); + const cacheTagStr = versionedKey(`${CACHENAME} ${cacheVersion} ${JSON.stringify(criteria)} ${roleName}`); - if (!sqlStatement) { - return res - .status(500) - .json({ message: 'Unable to generate SQL statement', request: criteria, namespace: 'activities', code: 500 }); - } + ETag = createHash('sha1').update(cacheTagStr).digest('hex'); - if (sanitizedSearchCriteria.isCSV) { - const resultSetCursor = await connection.query(new Cursor(sqlStatement.text, sqlStatement.values)); + // ok, see if we got a conditional request + const ifNoneMatch = req.header('If-None-Match'); + if (ifNoneMatch && ifNoneMatch === ETag) { + // great, we can shortcut this request. + connection.release(); + return res.status(304).send({}); //not-modified + } + + // we computed ok, so make sure we send it + responseCacheHeaders['ETag'] = ETag; + responseCacheHeaders['Cache-Control'] = 'must-revalidate, max-age=0'; - return await _handleStreamingCSVResponse( - isAuth, - res, - resultSetCursor, - sanitizedSearchCriteria.CSVType + // check server-side cache + const cachedResult = await cache.get(ETag); + if (cachedResult) { + // hit! send this one and save some db traffic + connection.release(); + return res.status(200).set(responseCacheHeaders).json(cachedResult); + } + } catch (e) { + const message = (e === undefined) ? 'undefined' : e.message; + defaultLog.warn( + { + message: 'caught an error while checking cache. this is odd but continuing with request as though no cache present.', + error: message + } ); + } + } + try { + if (sanitizedSearchCriteria.isCSV) { + res.status(200) + await streamActivitiesResult(sanitizedSearchCriteria, res); } else { - const resultSet = await connection.query(sqlStatement.text, sqlStatement.values); + const sqlStatement: SQLStatement = getActivitiesSQL(sanitizedSearchCriteria, false, isAuth); + + if (!sqlStatement) { + return res + .status(500) + .json({ + message: 'Unable to generate SQL statement', + request: criteria, + namespace: 'activities', + code: 500 + }); + } + + // needs to be mutable + let response = await connection.query(sqlStatement.text, sqlStatement.values); + if (!isAuth) { + if (response.rows.length > 0) { + // remove sensitive data from json obj + for (var i in response.rows) { + response.rows[i].activity_payload.created_by = null; + response.rows[i].activity_payload.updated_by = null; + response.rows[i].activity_payload.reviewed_by = null; + response.rows[i].activity_payload.user_role = []; + if (response.rows[i].activity_payload.form_data.activity_type_data) { + response.rows[i].activity_payload.form_data.activity_type_data.activity_persons = []; + } + } + } + } const responseBody = { message: 'Got activities by search filter criteria', request: criteria, - result: resultSet.rows.map((r) => _sanitizeRow(isAuth, r)), - count: resultSet.rowCount, + result: response.rows, + count: response.rowCount, namespace: 'activities', code: 200 }; - if (ETag !== null) { + if (ETag !== null && responseBody.result.length < 200) { // save for later; await cache.put(ETag, responseBody); } + return res.status(200).set(responseCacheHeaders).json(responseBody); } } catch (error) { - defaultLog.error({ label: 'getActivitiesBySearchFilterCriteria', message: 'error', error }); + defaultLog.debug({ label: 'getActivitiesBySearchFilterCriteria', message: 'error', error }); return res.status(500).json({ message: 'Error getting activities by search filter criteria', error, diff --git a/api/src/paths/points-of-interest-lean.ts b/api/src/paths/points-of-interest-lean.ts index 87ca3917b..eb5f55ebf 100644 --- a/api/src/paths/points-of-interest-lean.ts +++ b/api/src/paths/points-of-interest-lean.ts @@ -3,7 +3,7 @@ import { RequestHandler } from 'express'; import { Operation } from 'express-openapi'; import { SQLStatement } from 'sql-template-strings'; -import { getIAPPsites, getSpeciesCodesFromIAPPDescriptionList, getSpeciesRef } from '../utils/iapp-json-utils'; +import { getSpeciesCodesFromIAPPDescriptionList, getSpeciesRef } from '../utils/iapp-json-utils'; import { getDBConnection } from '../database/db'; import { PointOfInterestSearchCriteria } from '../models/point-of-interest'; import { getPointsOfInterestLeanSQL } from '../queries/point-of-interest-queries'; diff --git a/api/src/paths/points-of-interest.ts b/api/src/paths/points-of-interest.ts index 9bd855173..fefc4fea7 100644 --- a/api/src/paths/points-of-interest.ts +++ b/api/src/paths/points-of-interest.ts @@ -1,20 +1,18 @@ 'use strict'; -import { RequestHandler, response } from 'express'; +import { RequestHandler } from 'express'; import { Operation } from 'express-openapi'; import { SQLStatement } from 'sql-template-strings'; -import { getIAPPsites } from '../utils/iapp-json-utils'; -import { ALL_ROLES, SEARCH_LIMIT_MAX, SEARCH_LIMIT_DEFAULT, SECURITY_ON } from '../constants/misc'; +import { streamIAPPResult } from '../utils/iapp-json-utils'; +import { ALL_ROLES, SECURITY_ON } from '../constants/misc'; import { getDBConnection } from '../database/db'; import { PointOfInterestSearchCriteria } from '../models/point-of-interest'; import { getPointsOfInterestSQL, getSpeciesMapSQL } from '../queries/point-of-interest-queries'; import { getLogger } from '../utils/logger'; -import cacheService from '../utils/cache/cache-service'; -import { createHash } from 'crypto'; import { versionedKey } from '../utils/cache/cache-utils'; +import { createHash } from 'crypto'; const defaultLog = getLogger('point-of-interest'); -const CACHENAME = 'POI - Fat'; export const GET: Operation = [getPointsOfInterestBySearchFilterCriteria()]; @@ -23,10 +21,10 @@ GET.apiDoc = { tags: ['point-of-interest'], security: SECURITY_ON ? [ - { - Bearer: ALL_ROLES - } - ] + { + Bearer: ALL_ROLES + } + ] : [], responses: { 200: { @@ -83,6 +81,7 @@ export const isIAPPrelated = (PointOfInterestSearchCriteria: any) => { */ function getPointsOfInterestBySearchFilterCriteria(): RequestHandler { return async (req, res) => { + const criteria = JSON.parse(req.query['query']); defaultLog.debug({ @@ -108,90 +107,68 @@ function getPointsOfInterestBySearchFilterCriteria(): RequestHandler { const sanitizedSearchCriteria = new PointOfInterestSearchCriteria(criteria); defaultLog.debug({ message: 'sanitizedSearchCriteria', sanitizedSearchCriteria }); - const connection = await getDBConnection(); - - if (!connection) { - return res.status(503).json({ - message: 'Database connection unavailable.', - request: criteria, - namespace: 'points-of-interest', - code: 503 - }); - } - // we'll send it later, overriding cache headers as appropriate const responseCacheHeaders = {}; let ETag = null; - // server-side cache - const cache = cacheService.getCache(CACHENAME); - // check the cache tag to see if, perhaps, the user already has the latest - try { - const cacheQueryResult = await connection.query( - `select updated_at - from cache_versions - where cache_name = $1`, - [isIAPPrelated(sanitizedSearchCriteria) ? 'iapp_site_summary' : 'activity'] - ); - const cacheVersion = cacheQueryResult.rows[0].updated_at; - - // because we have parameters and user roles, the actual resource cache tag is - // tuple: (cacheVersion, parameters) - // hash it for brevity and to obscure the real modification date - - const cacheTagStr = versionedKey(`${CACHENAME} ${cacheVersion} ${JSON.stringify(sanitizedSearchCriteria)}`); - - ETag = createHash('sha1').update(cacheTagStr).digest('hex'); - - // ok, see if we got a conditional request - const ifNoneMatch = req.header('If-None-Match'); - if (ifNoneMatch && ifNoneMatch == ETag) { - // great, we can shortcut this request. - connection.release(); - return res.status(304).send({}); //not-modified - } - // we computed ok, so make sure we send it - responseCacheHeaders['ETag'] = ETag; - responseCacheHeaders['Cache-Control'] = 'must-revalidate, max-age=0'; + // do not check cache for CSV files + if (!(isIAPPrelated(sanitizedSearchCriteria) && sanitizedSearchCriteria.isCSV)) { + const cacheCheckConnection = await getDBConnection(); - // check server-side cache - const cachedResult = await cache.get(ETag); - if (cachedResult) { - // hit! send this one and save some db traffic - connection.release(); - return res.status(200).set(responseCacheHeaders).json(cachedResult); + try { + const cacheQueryResult = await cacheCheckConnection.query( + `select updated_at + from cache_versions + where cache_name = $1`, + [isIAPPrelated(sanitizedSearchCriteria) ? 'iapp_site_summary' : 'activity'] + ); + const cacheVersion = cacheQueryResult.rows[0].updated_at; + + // because we have parameters and user roles, the actual resource cache tag is + // tuple: (cacheVersion, parameters) + // hash it for brevity and to obscure the real modification date + + const cacheTagStr = versionedKey(`POI ${cacheVersion} ${JSON.stringify(sanitizedSearchCriteria)}`); + + ETag = createHash('sha1').update(cacheTagStr).digest('hex'); + + // ok, see if we got a conditional request + const ifNoneMatch = req.header('If-None-Match'); + if (ifNoneMatch && ifNoneMatch == ETag) { + // great, we can shortcut this request. + return res.status(304).send({}); //not-modified + } + + // we computed ok, so make sure we send it + responseCacheHeaders['ETag'] = ETag; + responseCacheHeaders['Cache-Control'] = 'must-revalidate, max-age=0'; + + res.set(responseCacheHeaders); + + } finally { + cacheCheckConnection.release(); } - } catch (e) { - const message = e.message || e; - defaultLog.warn({ - message: - 'caught an error while checking cache. this is odd but continuing with request as though no cache present.', - error: message - }); } - try { - if (isIAPPrelated(sanitizedSearchCriteria)) { - const responseSurveyExtract = await getIAPPsites(sanitizedSearchCriteria); + if (isIAPPrelated(sanitizedSearchCriteria)) { + res.status(200); + await streamIAPPResult(sanitizedSearchCriteria, res); + } else { + const connection = await getDBConnection(); - const responseBody = { message: 'Got IAPP sites', result: responseSurveyExtract, code: 200 }; - if (ETag !== null) { - // save for later; - await cache.put(ETag, responseBody); - } - if (sanitizedSearchCriteria.isCSV) { - return res - .status(200) - .set(responseCacheHeaders) - .contentType('text/csv') - .set('Content-Disposition', 'attachment; filename="export.csv"') - .send((responseSurveyExtract as unknown) as string); - } else { - return res.status(200).set(responseCacheHeaders).json(responseBody); - } - } else { + + if (!connection) { + return res.status(503).json({ + message: 'Database connection unavailable.', + request: criteria, + namespace: 'points-of-interest', + code: 503 + }); + } + + try { const sqlStatement: SQLStatement = getPointsOfInterestSQL(sanitizedSearchCriteria); if (!sqlStatement) { @@ -220,25 +197,21 @@ function getPointsOfInterestBySearchFilterCriteria(): RequestHandler { code: 200 }; - if (ETag !== null) { - // save for later; - await cache.put(ETag, responseBody); - } + return res.status(200).json(responseBody); - return res.status(200).set(responseCacheHeaders).json(responseBody); + } catch (error) { + const message = error.message || error; + defaultLog.debug({ label: 'getPointsOfInterestBySearchFilterCriteria', message: 'error', error: message }); + return res.status(500).json({ + message: 'Failed to get points of interest by search filter criteria', + request: criteria, + error: error, + namespace: 'points-of-interest', + code: 500 + }); + } finally { + connection.release(); } - } catch (error) { - const message = error.message || error; - defaultLog.debug({ label: 'getPointsOfInterestBySearchFilterCriteria', message: 'error', error: message }); - return res.status(500).json({ - message: 'Failed to get points of interest by search filter criteria', - request: criteria, - error: error, - namespace: 'points-of-interest', - code: 500 - }); - } finally { - connection.release(); } }; } diff --git a/api/src/utils/iapp-csv-utils.ts b/api/src/utils/iapp-csv-utils.ts index 1a0299e94..94c8621e2 100644 --- a/api/src/utils/iapp-csv-utils.ts +++ b/api/src/utils/iapp-csv-utils.ts @@ -1,7 +1,13 @@ import { format, parseISO } from 'date-fns'; -export const mapSitesRowsToCSV = async (response: any, templateName: string) => { - const headers = response.fields.map((fieldObj) => fieldObj?.name).join(',') + '\n'; +// tunable -- how many rows to request/stream at a time. lower = more round trips to DB, but less node memory use. +const CURSOR_PAGE_SIZE = 100; + +export async function* generateSitesCSV(cursor: any, templateName: string) { + let page = await cursor.read(1); + // this is a private field on cursor but there's no better way to get the metadata. + // need to have read() at least once for it to be present. + yield cursor._result.fields.map((fieldObj) => fieldObj?.name).join(',') + '\n'; // set up callbacks to format specific fields const fieldFormatMap = {}; @@ -12,84 +18,72 @@ export const mapSitesRowsToCSV = async (response: any, templateName: string) => default: fieldFormatMap['site_created_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['survey_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['treatment_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['monitoring_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['collection_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; - } + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); + }; fieldFormatMap['inspection_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['last_surveyed_date'] = (value) => { if (value === null) { - return ''; + return ''; } - const justDate = typeof value === 'string'? value : value.toISOString() - const date = format(parseISO(justDate), 'yyyy-MM-dd'); - return date; + const justDate = typeof value === 'string' ? value : value.toISOString(); + return format(parseISO(justDate), 'yyyy-MM-dd'); }; fieldFormatMap['activity_date_time'] = (value) => { if (value === null) return ''; - const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); - return date; + return format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); }; fieldFormatMap['created_timestamp'] = (value) => { if (value === null) return ''; - const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); - return date; + return format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); }; fieldFormatMap['updated_timestamp'] = (value) => { if (value === null) return ''; - const date = format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); - return date; + return format(parseISO(value), 'yyyy-MM-dd HH:mm:ss'); }; fieldFormatMap['date_entered'] = (value) => { if (value === null) return ''; - const date = format(parseISO(value), 'yyyy-MM-dd'); - return date; + return format(parseISO(value), 'yyyy-MM-dd'); }; fieldFormatMap['date_updated'] = (value) => { if (value === null) return ''; - const date = format(parseISO(value), 'yyyy-MM-dd'); - return date; + return format(parseISO(value), 'yyyy-MM-dd'); }; fieldFormatMap['jurisdictions'] = (value) => { return '"' + value + '"'; @@ -129,23 +123,24 @@ export const mapSitesRowsToCSV = async (response: any, templateName: string) => }; break; } - const rows = response.rows.map((row) => { - return Object.keys(row) - .map((fieldNameRaw: any) => { - try { - const fieldName = fieldNameRaw.trim(); - const formatter = - typeof fieldFormatMap[fieldName] === 'function' ? fieldFormatMap[fieldName] : defaultFormatter; - const unformatted = - typeof row[fieldName] === 'string' ? row[fieldName].replace(/(\r\n|\n|\r)/gm, '') : row[fieldName]; - const formatted = formatter(unformatted); - return formatted; - } catch (e) { - return null; - } - }) - .join(','); - }); - const csv = headers + rows.join('\n'); - return csv; -}; + do { + for (const row of page) { + yield Object.keys(row) + .map((fieldNameRaw: any) => { + try { + const fieldName = fieldNameRaw.trim(); + const formatter = + typeof fieldFormatMap[fieldName] === 'function' ? fieldFormatMap[fieldName] : defaultFormatter; + const unformatted = + typeof row[fieldName] === 'string' ? row[fieldName].replace(/(\r\n|\n|\r)/gm, '') : row[fieldName]; + return formatter(unformatted); + } catch (e) { + return 'ERROR'; + } + }) + .join(','); + yield '\n'; + } + page = await cursor.read(CURSOR_PAGE_SIZE); + } while (page.length > 0); +} diff --git a/api/src/utils/iapp-json-utils.ts b/api/src/utils/iapp-json-utils.ts index 5661a6e3e..96bb12617 100644 --- a/api/src/utils/iapp-json-utils.ts +++ b/api/src/utils/iapp-json-utils.ts @@ -1,7 +1,6 @@ import { speciesRefSql } from '../queries/species_ref'; import { SQL, SQLStatement } from 'sql-template-strings'; import { getDBConnection } from '../database/db'; -import { PointOfInterestSearchCriteria } from '../models/point-of-interest'; import { getIappExtractFromDB, getSitesBasedOnSearchCriteriaSQL } from '../queries/iapp-queries'; import { getLogger } from './logger'; import { densityMap, distributionMap, mapAspect, mapSlope } from './iapp-payload/iapp-function-utils'; @@ -11,7 +10,9 @@ import { chemicalTreatmentJSON, mechanicalTreatmenntsJSON } from './iapp-payload/extracts-json-utils'; -import { mapSitesRowsToCSV } from './iapp-csv-utils'; +import { generateSitesCSV } from './iapp-csv-utils'; +import Cursor from 'pg-cursor'; +import { getActivitiesSQL } from '../queries/activity-queries'; const defaultLog = getLogger('point-of-interest'); @@ -354,27 +355,57 @@ const getIAPPjson = (row: any, extract: any, searchCriteria: any) => { } }; -export const getIAPPsites = async (searchCriteria: any) => { - let connection; - try { - connection = await getDBConnection(); - } catch (e) { - throw { - message: 'Error connecting to database', - code: 500, - namespace: 'iapp-json-utils' - }; - } +export async function streamActivitiesResult(searchCriteria: any, res: any) { + const connection = await getDBConnection(); - if (!connection) { + const sqlStatement: SQLStatement = getActivitiesSQL(searchCriteria, false, true); + + if (!sqlStatement) { throw { - code: 503, - message: 'Failed to establish database connection', + code: 400, + message: 'Failed to build SQL statement', namespace: 'iapp-json-utils' }; } try { + res.contentType('text/csv') + .setHeader('Content-Disposition', 'attachment; filename="export.csv"') + .setHeader('transfer-encoding', 'chunked'); + + + const cursor = await connection.query(new Cursor(sqlStatement.text, sqlStatement.values)); + + const generatedRows = generateSitesCSV(cursor, searchCriteria.CSVType); + for await (const row of generatedRows) { + res.write(row); + } + } finally { + res.end(); + connection.release(); + } +} + +export const streamIAPPResult = async (searchCriteria: any, res: any) => { + let connection; + try { + connection = await getDBConnection(); + } catch (e) { + throw { + message: 'Error connecting to database', + code: 500, + namespace: 'iapp-json-utils' + }; + } + + if (!connection) { + throw { + code: 503, + message: 'Failed to establish database connection', + namespace: 'iapp-json-utils' + }; + } + const sqlStatement: SQLStatement = getSitesBasedOnSearchCriteriaSQL(searchCriteria); if (!sqlStatement) { @@ -385,27 +416,50 @@ export const getIAPPsites = async (searchCriteria: any) => { }; } - const response = await connection.query(sqlStatement.text, sqlStatement.values); + try { + if (searchCriteria.isCSV) { + res.contentType('text/csv') + .setHeader('Content-Disposition', 'attachment; filename="export.csv"') + .setHeader('transfer-encoding', 'chunked'); - if (searchCriteria.isCSV && searchCriteria.isIAPP) { - var returnVal1 = response.rowCount > 0 ? await mapSitesRowsToCSV(response, searchCriteria.CSVType) : []; - return returnVal1; - } else { - var returnVal2 = response.rowCount > 0 ? await mapSitesRowsToJSON(response, searchCriteria) : []; + const cursor = await connection.query(new Cursor(sqlStatement.text, sqlStatement.values)); - return { - rows: returnVal2, - count: returnVal2.length - }; + const generatedRows = generateSitesCSV(cursor, searchCriteria.CSVType); + for await (const row of generatedRows) { + res.write(row); + } + + } else { + try { + const response = await connection.query(sqlStatement.text, sqlStatement.values); + var returnVal2 = response.rowCount > 0 ? await mapSitesRowsToJSON(response, searchCriteria) : []; + + res.setHeader('Content-Type', 'application/json; charset=utf-8'); + + res.write( + JSON.stringify({ + message: 'Got points of interest by search filter criteria', + request: searchCriteria, + result: { + rows: returnVal2 + }, + count: returnVal2.length, + namespace: 'points-of-interest', + code: 200 + }) + ); + } catch (error) { + defaultLog.debug({ label: 'getIAPPjson', message: 'error', error }); + throw { + code: 500, + message: 'Failed to get IAPP sites', + namespace: 'iapp-json-utils' + }; + } + } + } finally { + res.end(); + connection.release(); } - } catch (error) { - defaultLog.debug({ label: 'getIAPPjson', message: 'error', error }); - throw { - code: 500, - message: 'Failed to get IAPP sites', - namespace: 'iapp-json-utils' - }; - } finally { - connection.release(); } -}; +;