Skip to content

Commit

Permalink
terminate jobs after completion (#33)
Browse files Browse the repository at this point in the history
* terminate jobs after completion

* fix

* fix logging

* imagePullSecrets

* image pull policy

* add filtering pod

* fix imagePullSecrets

* fix imagePullSecrets

* fix imagePullSecrets

* fix imagePullSecrets

* add expiry

* update expiry in Readme

* fix storage env

* Issue-#44: Update IPFS related urls (#45)

* Update operator.yml

Co-authored-by: Akshay <[email protected]>
  • Loading branch information
alexcos20 and akshay-ap authored Jan 17, 2022
1 parent 7fb8aef commit 177ca71
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 87 deletions.
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ The following resources need attention:
| `IPFS_TYPE` | IPFS library to use. 'CLUSTER' to use ipfs-cluster, 'CLIENT' to use ipfs-client (default) |
| `IPFS_OUTPUT`, `IPFS_ADMINLOGS` | IPFS gateway to upload the output data (algorithm logs & algorithm output) and admin logs (logs from pod-configure & pod-publish)|
| `IPFS_OUTPUT_PREFIX`, `IPFS_ADMINLOGS_PREFIX` | Prefix used for the results files (see below) |
| `IPFS_EXPIRY_TIME` | Default expiry time for ipfs (see https://github.com/ipfs/ipfs-cluster/blob/dbca14e83295158558234e867477ce07a523b81b/CHANGELOG.md#rest-api-2_), with an expected value in Go's time format, i.e. 12h (optional)
| `IPFS_API_KEY`, `IPFS_API_CLIENT ` | IPFS API Key and Client ID for authentication purpose (optional) |
| `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION` | S3 credentials for the logs and output buckets. |
| `AWS_BUCKET_OUTPUT` | Bucket that will hold the output data (algorithm logs & algorithm output). |
Expand All @@ -104,6 +103,9 @@ The following resources need attention:
| `NOTIFY_STOP_URL` | URL to call when a new job ends. |
| `SERVICE_ACCOUNT` | K8 service account to run pods (same as the one used in deployment). Defaults to db-operator|
| `NODE_SELECTOR` | K8 node selector (if defined) |
| `PULL_SECRET` | ImagesPullSecret (if defined) (see https://kubernetes.io/docs/concepts/containers/images/#referring-to-an-imagepullsecrets-on-a-pod)|
| `PULL_POLICY` | imagePullPolicy (if defined) (see https://kubernetes.io/docs/concepts/configuration/overview/#container-images)|
| `FILTERING_CONTAINER` | Filtering pod image to use for filtering (if defined)|



Expand All @@ -113,12 +115,16 @@ The following resources need attention:

## Usage of IPFS_OUTPUT and IPFS_OUTPUT_PREFIX (IPFS_ADMINLOGS/IPFS_ADMINLOGS_PREFIX)
This will allow you to have the following scenarios:
1. IPFS_OUTPUT=ipfs.oceanprotocol.com:5001 , IPFS_OUTPUT_PREFIX=ipfs.oceanprotocol.com:8080/ipfs/

Port 5001 will be used to call addFIle, but the result will look like "ipfs.oceanprotocol.com:8080/ipfs/HASH"
2. IPFS_OUTPUT=ipfs.oceanprotocol.com:5001 , IPFS_OUTPUT_PREFIX=ipfs://

Port 5001 will be used to call addFIle, but the result will look like "ipfs://HASH" (you will hide your ipfs deployment)
1. - `IPFS_OUTPUT`=http://ipfs.oceanprotocol.com:5001
- `IPFS_OUTPUT_PREFIX`=http://ipfs.oceanprotocol.com:8080/ipfs/

Port 5001 will be used to call addFIle, but the result will look like `ipfs.oceanprotocol.com:8080/ipfs/HASH`

2. - `IPFS_OUTPUT`=http://ipfs.oceanprotocol.com:5001
- `IPFS_OUTPUT_PREFIX`=ipfs://

Port 5001 will be used to call addFIle, but the result will look like "ipfs://HASH" (you will hide your ipfs deployment)

3. IPFS_EXPIRY_TIME = the default expiry time. "0" = unlimited

## Usage of NOTIFY_START_URL and NOTIFY_STOP_URL
Expand All @@ -128,6 +134,9 @@ The following resources need attention:
- secret: Secret value (exported to algo pod as secret env)
- DID: Array of input DIDs

## Storage Expiry
Op-engine will pass a ENV variable called STORAGE_EXPIRY to pod-publishing (the env is defined in op-service and passed through from there).

## Usage of NODE_SELECTOR
If defined, all pods are going to contain the following selectors in the specs:
```
Expand Down Expand Up @@ -188,6 +197,11 @@ reclaimPolicy: Retain
For more information, please visit https://kubernetes.io/docs/concepts/storage/storage-classes/
## Usage of FILTERING_CONTAINER
After an algorithm job is done, you can run your own custom image that can do an analysis of the output folder.
That image could detect data leaks and overwrite the output folder if needed
Format is the usual docker image notation.
## Customizing job templates
Expand Down
10 changes: 5 additions & 5 deletions kubernetes/operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ spec:
- name: IPFS_OUTPUT
value: http://youripfsserver:5001
- name: IPFS_OUTPUT_PREFIX
value: http://youripfsserver:5001/ipfs/
value: http://youripfsserver:8080/ipfs/
- name: IPFS_ADMINLOGS
value: http://youradminipfsserver:5001
- name: IPFS_ADMINLOGS_PREFIX
value: http://youradminipfsserver:5001/ipfs/
value: http://youradminipfsserver:8080/ipfs/
- name: IPFS_EXPIRY_TIME
value: "3600"
- name: IPFS_API_KEY
Expand All @@ -54,13 +54,13 @@ spec:
- name: LOG_LEVEL
value: DEBUG
- name: POD_CONFIGURATION_CONTAINER
value: oceanprotocol/pod-configuration:v1.0.1
value: oceanprotocol/pod-configuration:v1.0.10
- name: NOTIFY_START_URL
value: http://yourserver/
- name: NOTIFY_STOP_URL
value: http://yourserver/
- name: POD_PUBLISH_CONTAINER
value: oceanprotocol/pod-publishing:v1.0.0
value: oceanprotocol/pod-publishing:v1.0.1
- name: POSTGRES_DB
valueFrom:
configMapKeyRef:
Expand All @@ -86,7 +86,7 @@ spec:
configMapKeyRef:
key: POSTGRES_PORT
name: postgres-config
image: oceanprotocol/operator-engine:v1.0.1
image: oceanprotocol/operator-engine:v1.0.2
imagePullPolicy: Always
name: ocean-compute-operator
resources: {}
Expand Down
4 changes: 4 additions & 0 deletions operator_engine/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class OperatorConfig:
OPERATOR_PRIVATE_KEY = getenv('OPERATOR_PRIVATE_KEY',None)
NODE_SELECTOR = getenv('NODE_SELECTOR',None)
SERVICE_ACCOUNT = getenv('SERVICE_ACCOUNT','db-operator')
NODE_SELECTOR = getenv('NODE_SELECTOR',None)
PULL_SECRET = getenv('PULL_SECRET',None)
PULL_POLICY = getenv('PULL_POLICY','Always')
FILTERING_CONTAINER = getenv('FILTERING_CONTAINER',None)

class VolumeConfig:
VOLUME_SIZE = getenv('VOLUME_SIZE', '2Gi')
Expand Down
83 changes: 61 additions & 22 deletions operator_engine/operator_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,60 @@

from resources import create_configmap_workflow, notify_start, notify_stop

logging.basicConfig(format='%(asctime)s %(message)s')
logger = logging.getLogger('ocean-operator')
#logger.setLevel(OperatorConfig.LOG_LEVEL)
logger.setLevel(logging.DEBUG)

kubernetes.config.load_incluster_config()
#current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
#start the sql thread


def handle_new_job(jobId,logger):
api = kubernetes.client.BatchV1Api()
sql_body=get_sql_job_workflow(jobId,logger)
if sql_body is None:
logging.error(f'Sql workflow is empty for {jobId}')
logger.error(f'Sql workflow is empty for {jobId}')
return
body = json.loads(sql_body)
if not isinstance(body,dict):
logging.error(f'Error loading dict workflow for {jobId}')
logger.error(f'Error loading dict workflow for {jobId}')
return

namespace = body['metadata']['namespace']
#check if we already have a jobid
sqlstatus=get_sql_job_status(body['metadata']['name'],logging)
sqlstatus=get_sql_job_status(body['metadata']['name'],logger)
if sqlstatus>10:
logging.error(f"Creating workflow failed, already in db!!!")
logger.error(f"Creating workflow failed, already in db!!!")
return {'message': "Creating workflow failed, already in db"}

notify_start(body, logger)
update_sql_job_status(body['metadata']['name'],20,logger)
# Configmap for workflow
logging.debug(f"Job: {jobId} Creating config map")
logger.info(f"Job: {jobId} Creating config map")
create_configmap_workflow(body, logger)

# Volume
logging.debug(f"Job: {jobId} Creating volumes")
logger.info(f"Job: {jobId} Creating volumes")
create_all_pvc(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources'])


# Configure pod
logging.error(f"Job: {jobId} Start conf pod")
logger.info(f"Job: {jobId} Start conf pod")
create_configure_job(body, logger)
# Wait configure pod to finish
while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-configure-job",logger):
logging.error(f"Job: {jobId} Waiting for configure pod to finish")
while not wait_finish_job(namespace, f"{body['metadata']['name']}-configure-job",logger):
logger.debug(f"Job: {jobId} Waiting for configure pod to finish")
time.sleep(5.0)
#we should check for a timeout

# Terminate configure job
if OperatorConfig.DEBUG_NO_CLEANUP is None:
try:
name=body['metadata']['name']+"-configure-job"
logger.info(f"Removing job {name}")
api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove configure job\n")
sqlstatus=get_sql_job_status(body['metadata']['name'],logger)
# Run the algo if status == 30, else configure failed..
if sqlstatus==30:
Expand All @@ -64,46 +74,75 @@ def handle_new_job(jobId,logger):
create_algorithm_job(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources'])
starttime=int(time.time())
# Wait configure pod to finish
while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-algorithm-job",logger):
while not wait_finish_job(namespace, f"{body['metadata']['name']}-algorithm-job",logger):
duration=int(time.time())-starttime
shouldstop=False
logging.debug(f"Job: {jobId} Waiting for algorithm pod to finish, {duration} seconds of running so far")
logger.debug(f"Job: {jobId} Waiting for algorithm pod to finish, {duration} seconds of running so far")
#Check if algo is taking too long
if 'maxtime' in body['spec']['metadata']['stages'][0]['compute']:
if isinstance(body['spec']['metadata']['stages'][0]['compute']['maxtime'], int):
if duration>body['spec']['metadata']['stages'][0]['compute']['maxtime']:
logging.info("Algo is taking too long. Kill IT!")
logger.info("Algo is taking too long. Kill IT!")
shouldstop=True
update_sql_job_istimeout(body['metadata']['name'],logger)
#Check if stop was requested
if check_sql_stop_requested(body['metadata']['name'],logger) is True:
logging.info(f"Job: {jobId} Algo has a stop request. Kill IT!")
logger.info(f"Job: {jobId} Algo has a stop request. Kill IT!")
shouldstop=True
#Stop it if needed
if shouldstop is True:
stop_specific_job(body['metadata']['namespace'],body['metadata']['name']+"-algorithm-job",logger)
stop_specific_job(namespace,body['metadata']['name']+"-algorithm-job",logger)
break
time.sleep(5.0)
else:
logging.info(f"Job: {jobId} Configure failed, algo was skipped")
logger.info(f"Job: {jobId} Configure failed, algo was skipped")
# Terminate algorithm job
if OperatorConfig.DEBUG_NO_CLEANUP is None:
try:
name=body['metadata']['name']+"-algorithm-job"
logger.info(f"Removing job {name}")
api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove algorithm job\n")

# Filtering pod
if OperatorConfig.FILTERING_CONTAINER:
update_sql_job_status(body['metadata']['name'],50,logger)
create_filter_job(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources'])
while not wait_finish_job(namespace, f"{body['metadata']['name']}-filter-job",logger):
logger.debug(f"Job: {jobId} Waiting for filtering pod to finish")
time.sleep(5.0)
if OperatorConfig.DEBUG_NO_CLEANUP is None:
try:
name=body['metadata']['name']+"-filter-job"
logger.info(f"Removing job {name}")
api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove filter job\n")
# Publish job
# Update status only if algo was runned
if sqlstatus==30:
update_sql_job_status(body['metadata']['name'],60,logger)
create_publish_job(body, logger)
# Wait configure pod to finish
while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-publish-job",logger):
logging.error(f"Job: {jobId} Waiting for publish pod to finish")
while not wait_finish_job(namespace, f"{body['metadata']['name']}-publish-job",logger):
logger.debug(f"Job: {jobId} Waiting for publish pod to finish")
time.sleep(5.0)
#we should check for a timeout

# Terminate publish job
if OperatorConfig.DEBUG_NO_CLEANUP is None:
try:
name=body['metadata']['name']+"-publish-job"
logger.info(f"Removing job {name}")
api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove algorithm job\n")

if sqlstatus==30:
update_sql_job_status(body['metadata']['name'],70,logger)
update_sql_job_datefinished(body['metadata']['name'],logger)
logging.info(f"Job: {jobId} Finished")
cleanup_job(body['metadata']['namespace'], jobId, logger)
logger.info(f"Job: {jobId} Finished")
cleanup_job(namespace, jobId, logger)
notify_stop(body, logger)
return {'message': "Creating workflow finished"}

Expand Down
Loading

0 comments on commit 177ca71

Please sign in to comment.