From 0d7ec580ac84124b69f320f4fe73c43921fece85 Mon Sep 17 00:00:00 2001 From: Yihong Wang Date: Tue, 28 May 2024 13:20:55 -0700 Subject: [PATCH] Use UBI Python image Use UBI Python image and have type hints for the Task data struct. Signed-off-by: Yihong Wang --- docker/Dockerfile | 14 ++++++-------- server/app.py | 48 +++++++++++++++++++++++++++++++---------------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 373d109..93d7c80 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,22 +1,20 @@ # Dockerfile -FROM python:3.11-slim +FROM registry.access.redhat.com/ubi9/python-311@sha256:fccda5088dd13d2a3f2659e4c904beb42fc164a0c909e765f01af31c58affae3 ARG port=8080 -RUN apt-get update && \ - apt-get install -y --no-install-recommends git curl gcc python3-dev && \ - apt-get clean - -RUN useradd -m myuser +USER root +RUN useradd -m myuser -G 0 && chmod 755 /home/myuser COPY server /home/myuser/server RUN chown -R myuser:myuser /home/myuser/server && chmod a+rx /home/myuser/server +RUN sed -i.bak 's/include-system-site-packages = false/include-system-site-packages = true/' /opt/app-root/pyvenv.cfg USER myuser WORKDIR /home/myuser RUN mkdir /home/myuser/hf_home && chmod og+rwx /home/myuser/hf_home RUN mkdir /home/myuser/output && chmod og+rwx /home/myuser/output RUN mkdir /home/myuser/.cache -ENV PATH="/home/myuser/.local/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" +ENV PATH="/opt/app-root/bin:/opt/app-root/src/.local/bin/:/opt/app-root/src/bin:/home/myuser/.local/bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" RUN pip install --no-cache-dir --user --upgrade ibm-generative-ai[lm-eval] RUN pip install --no-cache-dir --user -r server/requirements.txt @@ -28,7 +26,7 @@ RUN git clone https://github.com/EleutherAI/lm-evaluation-harness.git && \ git apply /home/myuser/server/patch/models.patch && pip install --no-cache-dir --user -e .[unitxt] ENV FLASK_PORT=8080 -ENV PYTHONPATH=/home/myuser/.local/lib/python3.11/site-packages:/home/myuser/lm-evaluation-harness:/home/myuser:/home/myuser/server +ENV PYTHONPATH=/opt/app-root/src/.local/lib/python3.11/site-packages:/home/myuser/lm-evaluation-harness:/home/myuser:/home/myuser/server ENV HF_HOME=/home/myuser/hf_home EXPOSE 8080 diff --git a/server/app.py b/server/app.py index aafb6ae..c013c18 100644 --- a/server/app.py +++ b/server/app.py @@ -5,6 +5,7 @@ import json import os import fnmatch +from typing import TypedDict, Dict, List, Literal from flask import Flask, jsonify, request from flask_cors import CORS @@ -14,12 +15,21 @@ _CANCEL_KEY = 'cancel' _STATUS_KEY = 'status' _TASK_ID_KEY = 'task_id' -_STATUS_ERROR = 'error' +_ARGS_KEY = 'args' +_ERROR_KEY = _STATUS_ERROR = 'error' _STATUS_RUNNING = 'running' _STATUS_COMPLETE = 'complete' _STATUS_CANCEALLED = 'cancelled' +class _Task(TypedDict): + """ The data struct for the Task object """ + status: Literal['running', 'error', 'cancelled', 'complete'] + args: List[str] + cancel: bool + error: str + + def create_app(test_config=None): """Create the Flask application.""" app = Flask(__name__, static_url_path='/doc') @@ -32,11 +42,14 @@ def create_app(test_config=None): # A dictionary to store job status # This will be moved to the CRD or storage - _jobs = {} + _jobs: Dict[str, _Task] = {} @app.route('/submit_job', methods=['POST']) def submit_job(): """ Submit a job and spawn a subprocess to run the job """ + if request.json is None: + return jsonify({_ERROR_KEY: 'The body is not application/json'}), 415 + task_id = str(uuid.uuid4()) args = [ ('--model', request.json.get('model')), @@ -67,11 +80,15 @@ def submit_job(): '--log_samples', '--trust_remote_code', '--show_config'] filtered_args = [(arg, value) for arg, value in args if value is not None] - flat_and_filtered_args = [item if sublist[0] not in novalue_args else sublist[0] - for sublist in filtered_args for item in sublist] + flat_and_filtered_args: List[str] = [item if sublist[0] not in novalue_args else sublist[0] + for sublist in filtered_args for item in sublist] + + _jobs[task_id] = { + _STATUS_KEY: _STATUS_RUNNING, + _ARGS_KEY: flat_and_filtered_args, + _CANCEL_KEY: False, + _ERROR_KEY: ''} - _jobs[task_id] = {_STATUS_KEY: _STATUS_RUNNING, - 'args': flat_and_filtered_args} threading.Thread(target=_background_task, args=(task_id,)).start() return jsonify({_TASK_ID_KEY: task_id}) @@ -88,9 +105,11 @@ def poll_job(): def job_results(): """ Get the results of a job. Return the results as a JSON object """ task_id = request.args.get(_TASK_ID_KEY) + if task_id not in _jobs: + return jsonify({_ERROR_KEY: 'The specified job does not exist'}), 404 def _handle_error(): - return jsonify({_STATUS_ERROR: _jobs[task_id][_STATUS_ERROR]}) + return jsonify({_ERROR_KEY: _jobs[task_id][_ERROR_KEY]}) def _handle_running(): return jsonify({_STATUS_KEY: 'The job is still running'}) @@ -107,7 +126,7 @@ def _handle_complete(): result = json.load(f) return jsonify(result) - return jsonify({_STATUS_ERROR: 'Job completed but no result found'}) + return jsonify({_ERROR_KEY: 'Job completed but no result found'}) handlers = { _STATUS_ERROR: _handle_error, @@ -116,13 +135,10 @@ def _handle_complete(): _STATUS_COMPLETE: _handle_complete, } - if task_id not in _jobs: - return jsonify({_STATUS_ERROR: 'The specified job does not exist'}), 404 - if _jobs[task_id][_STATUS_KEY] in handlers: return handlers[_jobs[task_id][_STATUS_KEY]]() - return jsonify({_STATUS_ERROR: f"unknown state: {_jobs[task_id][_STATUS_KEY]}"}) + return jsonify({_ERROR_KEY: f"unknown state: {_jobs[task_id][_STATUS_KEY]}"}) @app.route('/list_jobs', methods=['GET']) def list_jobs(): @@ -139,7 +155,7 @@ def cancel_job(): task_id = request.args.get(_TASK_ID_KEY) if task_id not in _jobs: - return jsonify({_STATUS_ERROR: 'The specified job does not exist'}), 404 + return jsonify({_ERROR_KEY: 'The specified job does not exist'}), 404 _jobs[task_id][_CANCEL_KEY] = True @@ -148,7 +164,7 @@ def cancel_job(): ) def _background_task(task_id): - flat_and_filtered_args = _jobs[task_id]['args'] + flat_and_filtered_args = _jobs[task_id][_ARGS_KEY] os.makedirs(f"{_OUTPUT_PATH}/{task_id}") cmd = ['python', '-m', 'lm_eval'] + flat_and_filtered_args + \ ['--output_path', f"{_OUTPUT_PATH}/{task_id}"] @@ -185,7 +201,7 @@ def _background_task(task_id): ) as err_out: _jobs[task_id][_STATUS_KEY] = _STATUS_ERROR - _jobs[task_id][_STATUS_ERROR] = err_out.read() + _jobs[task_id][_ERROR_KEY] = err_out.read() else: _jobs[task_id][_STATUS_KEY] = _STATUS_COMPLETE @@ -202,4 +218,4 @@ def _find_result(pattern, path): if __name__ == '__main__': service_app = create_app() service_app.run(debug=True, host='0.0.0.0', - port=os.getenv("FLASK_PORT", default="8080")) + port=int(os.getenv("FLASK_PORT", default="8080")))