Skip to content

Commit

Permalink
Merge pull request #44 from VEuPathDB/issue-33
Browse files Browse the repository at this point in the history
Allow for other campuses to claim expired jobs.
  • Loading branch information
Foxcapades authored Apr 28, 2023
2 parents 713776a + d0f0ced commit af3a71e
Showing 1 changed file with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.veupathdb.lib.compute.platform.intern.s3.S3
import org.veupathdb.lib.compute.platform.intern.ws.ScratchSpaces
import org.veupathdb.lib.compute.platform.job.AsyncJob
import org.veupathdb.lib.compute.platform.job.JobFileReference
import org.veupathdb.lib.compute.platform.job.JobStatus
import org.veupathdb.lib.compute.platform.job.JobSubmission
import org.veupathdb.lib.compute.platform.model.JobReference
import org.veupathdb.lib.hash_id.HashID
Expand Down Expand Up @@ -130,8 +131,8 @@ object AsyncPlatform {
* @throws IllegalArgumentException If the given [queue] value is not a valid
* queue ID/name.
*
* @throws IllegalStateException If the ID of the given job already exists and
* belongs to another instance of this service.
* @throws IllegalStateException If the ID of the given job already exists,
* belongs to another instance of this service, and is not expired.
*/
@JvmStatic
fun submitJob(queue: String, job: JobSubmission) {
Expand All @@ -142,18 +143,18 @@ object AsyncPlatform {
throw IllegalArgumentException("Attempted to submit a job to nonexistent queue '$queue'.")

// Lookup the job to see if it already exists.
val exists = getJob(job.jobID)
val existingJob = getJob(job.jobID)
// If it does exist
?.also {
// And it is not owned by this service instance
if (!it.owned)
// And it is not owned by this service instance AND the job is not
// expired, bail here.
if (!it.owned && it.status != JobStatus.Expired)
// Throw an exception
throw IllegalStateException("Attempted to submit a job that would overwrite an existing job owned by another campus (${job.jobID})")
throw IllegalStateException("Attempted to submit a job that would overwrite an existing, non-expired job owned by another campus (${job.jobID})")
}
.let { it != null }

// If the job already exists
if (exists)
if (existingJob != null && existingJob.owned)
// Reset the job status to queued and update the queue name
QueueDB.markJobAsQueued(job.jobID, queue)
// Else, if the job does not already exist
Expand Down Expand Up @@ -188,6 +189,21 @@ object AsyncPlatform {
QueueDB.getJob(jobID)?.also {
// It does...
Log.debug("Job found in the managed database")

val s3Job = S3.getJob(jobID)

// If the status as determined by looking at S3 does not align with the
// status we last knew in our internal database, then another campus has
// claimed ownership of the job.
if (s3Job != null && s3Job.status != it.status) {
// Delete our DB record for the job and return the S3 instance instead.
QueueDB.deleteJob(jobID)
return s3Job
}

// The statuses did align, so we (this service instance) presumably still
// own the job.

// update it's last accessed date
QueueDB.updateJobLastAccessed(jobID)
// and return it
Expand Down

0 comments on commit af3a71e

Please sign in to comment.