Skip to content

Commit

Permalink
Kill the children
Browse files Browse the repository at this point in the history
  • Loading branch information
Psy-Fer committed Mar 24, 2021
1 parent 0af0f3d commit b3e551d
Showing 1 changed file with 52 additions and 2 deletions.
54 changes: 52 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#from src.job import Job
import src.queue as q
import os
import signal
import base64
from celery import Celery
import subprocess
Expand All @@ -18,7 +19,8 @@
import gzip
import glob
import argparse

import redis
import traceback

class MyParser(argparse.ArgumentParser):
def error(self, message):
Expand Down Expand Up @@ -51,6 +53,10 @@ def error(self, message):
#Create a System object with a queue of length maximum_queue_size
qSys = System(max_queue_size)

if fnmatch.fnmatch(sys.argv[0], "*celery"):
worker_port = int(sys.argv[5].split(":")[2].split("/")[0])
red = redis.StrictRedis(host='localhost', port=worker_port, db=0)

#Global variable for base filepath
#initialised as /user/data
config_file = os.path.dirname(os.path.realpath(__file__))+'/config.init'
Expand Down Expand Up @@ -125,14 +131,27 @@ def check_override(output_folder, override_data):
@celery.task(bind=True)
def executeJob(self, job_name, gather_cmd, demult_cmd, min_cmd):
logger.info("In celery task, executing job...")
logger.info("executing job_name: {}".format(job_name))

# group ID to kill children
# {"job_name": #####}
Anakin = {}

self.update_state(state='PROGRESS', meta={'current':10, 'status':'Beginning execution'})

commands = [gather_cmd, demult_cmd, min_cmd]
for i, cmd in enumerate(commands):
po = subprocess.Popen(cmd, shell=True,
po = subprocess.Popen(cmd, shell=True, preexec_fn=os.setsid,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

Anakin[job_name] = po.pid
rval = json.dumps(Anakin)
red.set(str(job_name), rval)
# k = ["{}: {}".format(key, Anakin[key]) for key in Anakin.keys()]
# sys.stderr.write(",".join(k))
# sys.stderr.write("\n")

stdout, stderr = po.communicate()
#self.update_state(state='PROGRESS')
po.wait()
Expand All @@ -156,6 +175,34 @@ def executeJob(self, job_name, gather_cmd, demult_cmd, min_cmd):
self.update_state(state='FINISHED', meta={'current': 100, 'status': 'Finishing', 'result': returnCode}) #Don't know if this is actually used
return {'current': 100, 'total': 100, 'status': 'Task completed!', 'result': returnCode}

@celery.task(bind=True)
def killJob(self, job_name):
logger.info("In celery task, executing job...")
logger.info("killing job_name: {}".format(job_name))
pidss = red.get(str(job_name))
Anakin = json.loads(pidss)
if not Anakin:
sys.stderr.write("ANAKIN EMPTY!!!\n")
else:
# k = ["{}: {}".format(key, Anakin[key]) for key in Anakin.keys()]
# sys.stderr.write(",".join(k))
try:
# k = ["{}: {}".format(key, Anakin[key]) for key in Anakin.keys()]
# sys.stderr.write(",".join(k))
group_pid = Anakin[job_name]
sys.stderr.write("killing PID: {}\n".format(group_pid))
os.killpg(group_pid, signal.SIGTERM)
except:
traceback.print_exc()
sys.stderr.write("killJob FAILED - 1")
sys.stderr.write("\n")
return 1
sys.stderr.write("killJob SUCCESS - 0")
sys.stderr.write("\n")
return 0
sys.stderr.write("killJob FAILED (ANAKIN EMPTY) - 1")
sys.stderr.write("\n")
return 1

@app.route('/task/<job_name>', methods = ['POST'])
def task(job_name):
Expand Down Expand Up @@ -548,6 +595,7 @@ def error(job_name):
barcode_type = job.barcode_type
# abort existing job
task = job.task_id
blank = killJob.apply_async(args=[job_name])
celery.control.revoke(task, terminate=True, signal='SIGKILL')
qSys.removeQueuedJob(job_name)

Expand Down Expand Up @@ -737,6 +785,7 @@ def progress(job_name):
def abort(job_name):
job = qSys.getJobByName(job_name)
task = job.task_id
blank = killJob.apply_async(args=[job_name])
celery.control.revoke(task,terminate=True, signal='SIGKILL')

qSys.removeQueuedJob(job_name)
Expand All @@ -746,6 +795,7 @@ def abort(job_name):
def abort_delete(job_name):
job = qSys.getJobByName(job_name)
task = job.task_id
blank = killJob.apply_async(args=[job_name])
celery.control.revoke(task,terminate=True, signal='SIGKILL')
os.system('rm -r ' + job.output_folder)

Expand Down

0 comments on commit b3e551d

Please sign in to comment.