diff --git a/CHANGELOG.md b/CHANGELOG.md index b89ef854..d4013b42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,7 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed -- refactor: instrumentations +- refactor: instrumentation - `is_worker_enabled` status check moved from `VerificationFailed` to `Failed` - refactor: static attributes for telemetry - refactor: aws setup for Event Bridge diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 2c791a27..181d64f4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -347,11 +347,12 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { let verification_status = job_handler.verify_job(config.clone(), &mut job).await?; tracing::Span::current().record("verification_status", format!("{:?}", &verification_status)); - let attributes = [ + let mut attributes = vec![ KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), KeyValue::new("operation_type", "verify_job"), KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)), ]; + let mut operation_job_status: Option = None; match verification_status { JobVerificationStatus::Verified => { @@ -380,12 +381,13 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed"); JobError::Other(OtherError(e)) })?; + operation_job_status = Some(JobStatus::Completed); } JobVerificationStatus::Rejected(e) => { tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected"); - let mut new_job = job.clone(); - new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e); - new_job.status = JobStatus::VerificationFailed; + let mut new_job_metadata = job.metadata.clone(); + new_job_metadata.insert(JOB_METADATA_ERROR.to_string(), e); + operation_job_status = Some(JobStatus::VerificationFailed); let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; @@ -402,7 +404,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { &job, JobItemUpdates::new() .update_status(JobStatus::VerificationFailed) - .update_metadata(new_job.metadata) + .update_metadata(new_job_metadata) .build(), ) .await @@ -437,32 +439,39 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationTimeout"); JobError::Other(OtherError(e)) })?; - return Ok(()); - } - let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; + operation_job_status = Some(JobStatus::VerificationTimeout); + } else { + let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; + + config + .database() + .update_job(&job, JobItemUpdates::new().update_metadata(metadata).build()) + .await + .map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata"); + JobError::Other(OtherError(e)) + })?; - config.database().update_job(&job, JobItemUpdates::new().update_metadata(metadata).build()).await.map_err( - |e| { - tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata"); + tracing::debug!(job_id = ?id, "Adding job back to verification queue"); + add_job_to_verification_queue( + job.id, + &job.job_type, + Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), + ) + .await + .map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue"); JobError::Other(OtherError(e)) - }, - )?; - - tracing::debug!(job_id = ?id, "Adding job back to verification queue"); - add_job_to_verification_queue( - job.id, - &job.job_type, - Duration::from_secs(job_handler.verification_polling_delay_seconds()), - config.clone(), - ) - .await - .map_err(|e| { - tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue"); - JobError::Other(OtherError(e)) - })?; + })?; + } } }; + if let Some(job_status) = operation_job_status { + attributes.push(KeyValue::new("operation_job_status", format!("{}", job_status))); + } + tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);