From 54240970c03bcdd196df55eab944af891a203244 Mon Sep 17 00:00:00 2001 From: Nir David <124874956+nirda7@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:36:27 +0200 Subject: [PATCH] fix hpu destructors flow and remove finish_measurements (#379) Fix hpu related destructors flow, avoiding multiple calls to the same shutdown function. Remove the use of the finish_measurements function. --- requirements-hpu.txt | 2 +- vllm/engine/llm_engine.py | 3 --- vllm/entrypoints/llm.py | 4 ---- vllm/executor/hpu_executor.py | 8 ++------ vllm/executor/ray_hpu_executor.py | 18 ++++++++---------- vllm/worker/hpu_model_runner.py | 18 ++++-------------- vllm/worker/hpu_worker.py | 3 --- 7 files changed, 15 insertions(+), 41 deletions(-) diff --git a/requirements-hpu.txt b/requirements-hpu.txt index 09f8ed73f0d0b..59a97edbc3fbf 100644 --- a/requirements-hpu.txt +++ b/requirements-hpu.txt @@ -8,4 +8,4 @@ pandas tabulate setuptools>=61 setuptools-scm>=8 -vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@ecdf38e +vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@48d0303 diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 8dd639b456179..7ce610937c1a7 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1325,9 +1325,6 @@ def _advance_to_next_step( else: seq.append_token_id(sample.output_token, sample.logprobs) - def finish_measurements(self): - self.model_executor.finish_measurements() - def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]: """Performs one decoding iteration and returns newly generated results. diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 143ae29e673a3..86b0b6893f1d9 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -236,10 +236,6 @@ def set_tokenizer(self, tokenizer: AnyTokenizer) -> None: else: tokenizer_group.tokenizer = get_cached_tokenizer(tokenizer) - def finish_measurements(self): - assert not envs.VLLM_USE_V1, "INC does not support vLLM V1" - self.llm_engine.finish_measurements() # type: ignore[attr-defined] - @overload # LEGACY: single (prompt + optional token ids) def generate( self, diff --git a/vllm/executor/hpu_executor.py b/vllm/executor/hpu_executor.py index e82cc10d0e9f0..e6b48178c7c8e 100644 --- a/vllm/executor/hpu_executor.py +++ b/vllm/executor/hpu_executor.py @@ -94,9 +94,6 @@ def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None: msg = f"init_cache_engine took {cache_init_m.get_summary_string()}" logger.info(msg) - def finish_measurements(self): - self.driver_worker.finish_measurements() - def execute_model( self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]: @@ -200,9 +197,8 @@ def start_profile(self) -> None: def stop_profile(self) -> None: self.driver_worker.stop_profile() - def shutdown(self) -> None: - if hasattr(self.driver_worker, 'shutdown_inc'): - self.driver_worker.shutdown_inc() + def shutdown_inc(self) -> None: + self.driver_worker.shutdown_inc() class HPUExecutorAsync(HPUExecutor, ExecutorAsyncBase): diff --git a/vllm/executor/ray_hpu_executor.py b/vllm/executor/ray_hpu_executor.py index 9eb8b89b244af..c1c41313b4e0a 100644 --- a/vllm/executor/ray_hpu_executor.py +++ b/vllm/executor/ray_hpu_executor.py @@ -70,9 +70,13 @@ def _init_executor(self) -> None: self.output_decoder = msgspec.msgpack.Decoder( Optional[List[SamplerOutput]]) + self.terminate_ray = True + def shutdown(self) -> None: - for worker in self.workers: - worker.__ray_terminate__.remote() + if getattr(self, 'terminate_ray', False): + for worker in self.workers: + worker.__ray_terminate__.remote() + self.terminate_ray = False if hasattr(self, "forward_dag") and self.forward_dag is not None: self.forward_dag.teardown() import ray @@ -80,8 +84,8 @@ def shutdown(self) -> None: ray.kill(worker) self.forward_dag = None - def finish_measurements(self): - self._run_workers("finish_measurements") + def shutdown_inc(self): + self._run_workers("shutdown_inc") def _get_worker_module_and_class( self @@ -480,9 +484,6 @@ def _compiled_ray_dag(self, enable_asyncio: bool): return forward_dag.experimental_compile(enable_asyncio=enable_asyncio) - def __del__(self): - self.shutdown() - class RayHPUExecutorAsync(RayHPUExecutor, DistributedGPUExecutorAsync): @@ -553,6 +554,3 @@ async def _start_worker_execution_loop(self): for worker in self.non_driver_workers ] return await asyncio.gather(*coros) - - def __del__(self): - self.shutdown() diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index d53e870a114ca..3c7b5090e6233 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -1835,10 +1835,6 @@ def prepare_model_input( is_prompt=is_prompt, virtual_engine=virtual_engine) - def finish_measurements(self): - from neural_compressor.torch.quantization import finalize_calibration - finalize_calibration(self.model.model) - def _check_config(self, batch_size, seq_len, is_prompt, warmup_mode): cfg = (batch_size, seq_len, is_prompt) seen = cfg in self.seen_configs @@ -2206,18 +2202,12 @@ def _make_decode_output( return SamplerOutput(sampler_outputs) def shutdown_inc(self): - can_finalize_inc = False - from contextlib import suppress - with suppress(AttributeError): - can_finalize_inc = (self.model_config.quantization == 'inc') and \ - (self.model.model is not None) and \ - self.inc_initialized_successfully and \ - not getattr(self, "_is_inc_finalized", False) + can_finalize_inc = (self.model_config.quantization == 'inc') and \ + (self.model.model is not None) and \ + self.inc_initialized_successfully and \ + not getattr(self, "_is_inc_finalized", False) if can_finalize_inc: from neural_compressor.torch.quantization import ( finalize_calibration) finalize_calibration(self.model.model) self._is_inc_finalized = True - - def __del__(self): - self.shutdown_inc() diff --git a/vllm/worker/hpu_worker.py b/vllm/worker/hpu_worker.py index 1004af0eca40a..1fb5e4d30afff 100644 --- a/vllm/worker/hpu_worker.py +++ b/vllm/worker/hpu_worker.py @@ -255,9 +255,6 @@ def _warm_up_model(self) -> None: # the model initialization and profiling. set_random_seed(self.model_config.seed) - def finish_measurements(self): - self.model_runner.finish_measurements() - @property def do_metadata_broadcast(self) -> bool: return self.parallel_config.tensor_parallel_size > 1