From e9a7162b17325d303d49b83ab56cb81f7a0d3b70 Mon Sep 17 00:00:00 2001 From: Zakor Date: Thu, 18 Jul 2024 13:47:16 +0200 Subject: [PATCH 1/2] AsyncLLMEngine improvements - Add option to only send back first token(s) and the final result - Replaced execute_model_async with execute_model. According to rpd trace results this has lesser CPU overhead --- vllm/engine/async_llm_engine.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index db4d2849b3f0e..eec310125c9a2 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -50,18 +50,28 @@ def _raise_exception_on_finish( class AsyncStream: """A stream of RequestOutputs or EmbeddingRequestOutputs for a request - that can be iterated over asynchronously.""" + that can be iterated over asynchronously. + Args: + first_item_only: Only emit the first and the finished request to the queue. + """ - def __init__(self, request_id: str) -> None: + def __init__(self, request_id: str, first_item_only: bool = False) -> None: self.request_id = request_id self._queue: asyncio.Queue = asyncio.Queue() self._finished = False + self._first_item_only = first_item_only + self._first_item = first_item_only def put(self, item: Union[RequestOutput, EmbeddingRequestOutput, Exception]) -> None: if self._finished: return - self._queue.put_nowait(item) + if self._first_item_only: + if self._first_item or item.finished: + self._first_item = False + self._queue.put_nowait(item) + else: + self._queue.put_nowait(item) def finish(self) -> None: self._queue.put_nowait(StopAsyncIteration()) @@ -135,14 +145,16 @@ def process_exception(self, logger.info("Finished request %s.", request_id) self.abort_request(request_id) - def add_request(self, request_id: str, + def add_request(self, + request_id: str, + first_token_only: bool = False, **engine_add_request_kwargs) -> AsyncStream: """Add a request to be sent to the engine on the next background loop iteration.""" if request_id in self._request_streams: raise KeyError(f"Request {request_id} already exists.") - stream = AsyncStream(request_id) + stream = AsyncStream(request_id, first_token_only) self._new_requests.put_nowait((stream, { "request_id": request_id, **engine_add_request_kwargs @@ -223,7 +235,7 @@ async def step_async( num_lookahead_slots=scheduler_outputs.num_lookahead_slots, running_queue_size=scheduler_outputs.running_queue_size, ) - output = await self.model_executor.execute_model_async( + output = self.model_executor.execute_model( execute_model_req) else: output = [] @@ -534,6 +546,7 @@ async def add_request( params: Union[SamplingParams, PoolingParams], arrival_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, + first_token_only: bool = False, ) -> AsyncStream: if self.log_requests: if isinstance(inputs, str): @@ -587,6 +600,7 @@ async def add_request( params=params, arrival_time=arrival_time, lora_request=lora_request, + first_token_only=first_token_only, ) return stream @@ -597,6 +611,7 @@ async def generate( sampling_params: SamplingParams, request_id: str, lora_request: Optional[LoRARequest] = None, + first_token_only: bool = False, ) -> AsyncIterator[RequestOutput]: """Generate outputs for a request. @@ -611,6 +626,7 @@ async def generate( sampling_params: The sampling parameters of the request. request_id: The unique id of the request. lora_request: LoRA request to use for generation, if any. + first_token_only: Only return the first token(s) and the final output Yields: The output `RequestOutput` objects from the LLMEngine @@ -663,6 +679,7 @@ async def generate( request_id, inputs, sampling_params, + first_token_only=first_token_only, lora_request=lora_request, ): yield LLMEngine.validate_output(output, RequestOutput) @@ -737,6 +754,7 @@ async def encode( request_id, inputs, pooling_params, + first_token_only=False, lora_request=lora_request, ): yield LLMEngine.validate_output(output, EmbeddingRequestOutput) @@ -748,6 +766,7 @@ async def _process_request( params: Union[SamplingParams, PoolingParams], *, lora_request: Optional[LoRARequest] = None, + first_token_only: bool = False, ) -> AsyncIterator[Union[RequestOutput, EmbeddingRequestOutput]]: """Common logic to process requests with SamplingParams or PoolingParams.""" @@ -759,6 +778,7 @@ async def _process_request( params, arrival_time=arrival_time, lora_request=lora_request, + first_token_only=first_token_only, ) try: From aedf43ec3d6ae05fa8a741671d6f467ac27ff557 Mon Sep 17 00:00:00 2001 From: Zakor Date: Thu, 18 Jul 2024 13:48:40 +0200 Subject: [PATCH 2/2] LLM changes to provide a callback to insert requests --- vllm/core/scheduler.py | 3 +++ vllm/engine/llm_engine.py | 4 ++++ vllm/entrypoints/llm.py | 27 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index c4e738e8f59cc..f65b69bffd248 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -365,6 +365,9 @@ def has_unfinished_seqs(self) -> bool: def get_num_unfinished_seq_groups(self) -> int: return len(self.waiting) + len(self.running) + len(self.swapped) + def get_num_waiting_seq_groups(self) -> int: + return len(self.waiting) + def _schedule_running( self, running_queue: deque, diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 11b800afb38a1..b32d0cdf46cfc 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -644,6 +644,10 @@ def get_num_unfinished_requests(self) -> int: """Gets the number of unfinished requests.""" return self.scheduler.get_num_unfinished_seq_groups() + def get_num_waiting_requests(self) -> int: + """Gets the number of unfinished requests.""" + return self.scheduler.get_num_waiting_seq_groups() + def has_unfinished_requests(self) -> bool: """Returns True if there are unfinished requests.""" return self.scheduler.has_unfinished_seqs() diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index bbb5d31f0606a..a7ced98763930 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -117,6 +117,7 @@ def __init__( max_context_len_to_capture: Optional[int] = None, max_seq_len_to_capture: int = 32768, disable_custom_all_reduce: bool = False, + sample_injection_cb = None, **kwargs, ) -> None: if "disable_log_stats" not in kwargs: @@ -145,6 +146,9 @@ def __init__( engine_args, usage_context=UsageContext.LLM_CLASS) self.request_counter = Counter() + self.sample_injection_cb = sample_injection_cb + + def get_tokenizer( self) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]: return self.llm_engine.tokenizer.tokenizer @@ -549,6 +553,12 @@ def _run_engine( outputs: List[Union[RequestOutput, EmbeddingRequestOutput]] = [] total_toks = 0 while self.llm_engine.has_unfinished_requests(): + if self.sample_injection_cb: + num_added_requests = self.sample_injection_cb() + if num_added_requests < 0: + self.sample_injection_cb = None + if use_tqdm: + pbar.total += num_added_requests step_outputs = self.llm_engine.step() for output in step_outputs: if output.finished: @@ -567,3 +577,20 @@ def _run_engine( # This is necessary because some requests may be finished earlier than # its previous requests. return sorted(outputs, key=lambda x: int(x.request_id)) + + def add_prompt_token_ids( + self, + prompt_token_ids: Optional[Union[List[int], List[List[int]]]] = None, + sampling_params: Optional[Union[SamplingParams, + Sequence[SamplingParams]]] = None,): + inputs = self._convert_v1_inputs( + prompts=None, + prompt_token_ids=prompt_token_ids, + multi_modal_data=None, + ) + + self._validate_and_add_requests( + inputs=inputs, + params=sampling_params, + lora_request=None, + )