Skip to content

Commit

Permalink
bulkCreateTelemetryInBatches
Browse files Browse the repository at this point in the history
  • Loading branch information
MacQSL committed Dec 19, 2024
1 parent 2dcaa78 commit 878088f
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions api/src/services/telemetry-services/telemetry-vendor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import { ApiPaginationOptions } from '../../zod-schema/pagination';
import { DBService } from '../db-service';
import { TelemetryDeploymentService } from './telemetry-deployment-service';

const TELEMETRY_BATCH_SIZE = 500;

/**
* A service class for working with telemetry vendor data.
*
Expand Down Expand Up @@ -241,29 +239,42 @@ export class TelemetryVendorService extends DBService {

/**
* Bulk create a telemetry in batches.
*
* Note: This is to prevent the SQL cap error when inserting a large number of telemetry records. > 700
* Note: This is to prevent SQL maximum query size error.
*
* @async
* @param {number} surveyId - The survey ID
* @param {CreateManualTelemetry[]} telemetry - The telemetry to create
* @returns {*} {Promise<void>}
*/
async bulkCreateTelemetryInBatches(surveyId: number, telemetry: CreateManualTelemetry[]): Promise<void> {
const batchSize = 500; // Max telemetry records to insert in a single query
const concurrent = 10; // Max concurrent queries

const deploymentIds = [...new Set(telemetry.map((record) => record.deployment_id))];
const deployments = await this.deploymentService.getDeploymentsForSurvey(surveyId, deploymentIds);

if (deployments.length !== deploymentIds.length) {
throw new ApiGeneralError('Failed to bulk create manual telemetry', [
'TelemetryVendorService->bulkCreateManualTelemetryInBatches',
'survey missing reference to one or more deployment IDs'
]);
}

// Split the teletry into batches to prevent SQL cap error
const telemetryBatches = chunk(telemetry, TELEMETRY_BATCH_SIZE);
const telemetryBatches = chunk(telemetry, batchSize);

// Create the async task processor
const telemetryProcessor = async (telemetryBatch: CreateManualTelemetry[]): Promise<void> => {
return this.bulkCreateManualTelemetry(surveyId, telemetryBatch);
};

// Process the telemetry in batches
const queueResult = await taskQueue(telemetryBatches, telemetryProcessor, 10);
const queueResult = await taskQueue(telemetryBatches, telemetryProcessor, concurrent);

// Check for any errors in the batch processing
const batchErrors = queueResult.filter((result) => result.error);
if (batchErrors.length) {
throw new ApiGeneralError('Failed to batch import telemetry.', [batchErrors.map((task) => task.error)]);
throw new ApiGeneralError('Failed to bulk create manual telemetry', [batchErrors.map((task) => task.error)]);
}
}
}

0 comments on commit 878088f

Please sign in to comment.