Skip to content

Commit

Permalink
update: added operation_job_status (#195)
Browse files Browse the repository at this point in the history
* update: added operation_job_status
  • Loading branch information
heemankv authored Dec 19, 2024
1 parent 9e67649 commit 07c38a3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- refactor: instrumentations
- refactor: instrumentation
- `is_worker_enabled` status check moved from `VerificationFailed` to `Failed`
- refactor: static attributes for telemetry
- refactor: aws setup for Event Bridge
Expand Down
61 changes: 35 additions & 26 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,12 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let verification_status = job_handler.verify_job(config.clone(), &mut job).await?;
tracing::Span::current().record("verification_status", format!("{:?}", &verification_status));

let attributes = [
let mut attributes = vec![
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "verify_job"),
KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)),
];
let mut operation_job_status: Option<JobStatus> = None;

match verification_status {
JobVerificationStatus::Verified => {
Expand Down Expand Up @@ -380,12 +381,13 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed");
JobError::Other(OtherError(e))
})?;
operation_job_status = Some(JobStatus::Completed);
}
JobVerificationStatus::Rejected(e) => {
tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected");
let mut new_job = job.clone();
new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e);
new_job.status = JobStatus::VerificationFailed;
let mut new_job_metadata = job.metadata.clone();
new_job_metadata.insert(JOB_METADATA_ERROR.to_string(), e);
operation_job_status = Some(JobStatus::VerificationFailed);

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
Expand All @@ -402,7 +404,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.update_metadata(new_job_metadata)
.build(),
)
.await
Expand Down Expand Up @@ -437,32 +439,39 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationTimeout");
JobError::Other(OtherError(e))
})?;
return Ok(());
}
let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?;
operation_job_status = Some(JobStatus::VerificationTimeout);
} else {
let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?;

config
.database()
.update_job(&job, JobItemUpdates::new().update_metadata(metadata).build())
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata");
JobError::Other(OtherError(e))
})?;

config.database().update_job(&job, JobItemUpdates::new().update_metadata(metadata).build()).await.map_err(
|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata");
tracing::debug!(job_id = ?id, "Adding job back to verification queue");
add_job_to_verification_queue(
job.id,
&job.job_type,
Duration::from_secs(job_handler.verification_polling_delay_seconds()),
config.clone(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue");
JobError::Other(OtherError(e))
},
)?;

tracing::debug!(job_id = ?id, "Adding job back to verification queue");
add_job_to_verification_queue(
job.id,
&job.job_type,
Duration::from_secs(job_handler.verification_polling_delay_seconds()),
config.clone(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue");
JobError::Other(OtherError(e))
})?;
})?;
}
}
};

if let Some(job_status) = operation_job_status {
attributes.push(KeyValue::new("operation_job_status", format!("{}", job_status)));
}

tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
Expand Down

0 comments on commit 07c38a3

Please sign in to comment.