diff --git a/vllm/v1/attention/backends/hpu_attn.py b/vllm/v1/attention/backends/hpu_attn.py index 397fb37442e36..dadcc6380f137 100644 --- a/vllm/v1/attention/backends/hpu_attn.py +++ b/vllm/v1/attention/backends/hpu_attn.py @@ -174,6 +174,7 @@ def forward( block_indices = attn_metadata.block_indices block_offsets = attn_metadata.block_offsets if attn_metadata.is_prompt: + import pdb; pdb.set_trace() key = key.unflatten(0, (block_indices.size(0), -1)) value = value.unflatten(0, (block_indices.size(0), -1)) if kv_cache is not None: diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py index 38f1c03a4d3ac..1e40223380a17 100644 --- a/vllm/v1/core/kv_cache_manager.py +++ b/vllm/v1/core/kv_cache_manager.py @@ -24,7 +24,7 @@ def __init__( self.block_size = block_size self.num_gpu_blocks = num_gpu_blocks self.sliding_window = sliding_window - self.enable_caching = enable_caching + self.enable_caching = False # NOTE(woosuk): To avoid frequent block allocation, we preallocate some # blocks for each request. For example, when a request reaches the end # of its block table, we preallocate N blocks in advance. This way, we diff --git a/vllm/v1/executor/hpu_executor.py b/vllm/v1/executor/hpu_executor.py index 0eab194d9618c..14d2fda6ee775 100644 --- a/vllm/v1/executor/hpu_executor.py +++ b/vllm/v1/executor/hpu_executor.py @@ -25,7 +25,7 @@ def __init__(self, vllm_config: VllmConfig) -> None: self.observability_config = vllm_config.observability_config self.worker = self._create_worker() - self.worker.init_device() + self.worker.initialize() self.worker.load_model() def _create_worker( @@ -62,7 +62,7 @@ def initialize_cache(self, num_hpu_blocks: int) -> None: logger.info("# HPU blocks: %d", num_hpu_blocks) from vllm_hpu_extension.profiler import HabanaMemoryProfiler with HabanaMemoryProfiler() as cache_init_m: - self.worker.initialize_cache(num_hpu_blocks, 0) + self.worker.initialize_cache(num_hpu_blocks) msg = f"init_cache_engine took {cache_init_m.get_summary_string()}" logger.info(msg) diff --git a/vllm/v1/worker/hpu_model_runner.py b/vllm/v1/worker/hpu_model_runner.py index 3fbd5b2fe70b2..905c56efa9211 100755 --- a/vllm/v1/worker/hpu_model_runner.py +++ b/vllm/v1/worker/hpu_model_runner.py @@ -22,7 +22,7 @@ from vllm.plugins import set_compilation_config from vllm.sampling_params import SamplingParams, SamplingType from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, cdiv, is_fake_hpu, - is_pin_memory_available) + is_pin_memory_available, make_tensor_with_pad) from vllm.v1.attention.backends.hpu_attn import HPUAttentionBackendV1, HPUAttentionMetadata from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.sample.metadata import SamplingMetadata @@ -134,9 +134,11 @@ def _update_metadata(self, attn_metadata, batch_size, seq_len, device, def forward(self, *args, **kwargs): kwargs = kwargs.copy() - selected_token_indices = kwargs.pop('selected_token_indices') - if 'warmup_mode' in kwargs: - kwargs.pop('warmup_mode') + if 'selected_token_indices' in kwargs: + kwargs.pop('selected_token_indices') +# selected_token_indices = kwargs.pop('selected_token_indices') +# if 'warmup_mode' in kwargs: +# kwargs.pop('warmup_mode') input_ids = kwargs['input_ids'] kwargs['attn_metadata'] = self._update_metadata( kwargs['attn_metadata'], input_ids.size(0), input_ids.size(1), @@ -144,7 +146,7 @@ def forward(self, *args, **kwargs): #LoraMask.setLoraMask(kwargs.pop('lora_mask')) hidden_states = self.model(*args, **kwargs) hidden_states = hidden_states.view(-1, hidden_states.shape[-1]) - hidden_states = hidden_states.index_select(0, selected_token_indices) +# hidden_states = hidden_states.index_select(0, selected_token_indices) return hidden_states def compute_logits(self, *args, **kwargs): @@ -162,7 +164,7 @@ def sampler(self): def _maybe_wrap_in_hpu_graph(*args, **kwargs): return htorch.hpu.wrap_in_hpu_graph( HpuModelAdapter(*args, **kwargs), disable_tensor_cache=True - ) if htorch.utils.internal.is_lazy() else HpuModelAdapter(*args, **kwargs) + ) if False and htorch.utils.internal.is_lazy() else HpuModelAdapter(*args, **kwargs) def subtuple(obj: object, typename: str, @@ -429,10 +431,16 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): num_scheduled_tokens = np.array(num_scheduled_tokens, dtype=np.int32) assert max_num_scheduled_tokens > 0 + # NOTE(kzawora): In prefills, when prefix caching is enabled (on by + # default), num_computed_tokens_cpu might be non-zero even for first + # batch. This is a new behavior in V1 - KVs can be cached even within + # the same iteration seq_lens = (self.input_batch.num_computed_tokens_cpu[:num_reqs] + num_scheduled_tokens) - context_lens = self.input_batch.num_output_tokens_cpu[:num_reqs] + self.input_batch.num_prompt_tokens_cpu[:num_reqs] + context_lens = self.input_batch.num_computed_tokens_cpu[:num_reqs] max_seq_len = seq_lens.max() + num_blocks = math.ceil(max_seq_len / self.block_size) # NOTE(kzawora): it seems like block table is overallocated by 1, but I don't know why. Maybe add +1 here + max_blocked_seq_len = num_blocks * self.block_size # Get request indices. # E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2] @@ -441,11 +449,13 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # Get batched arange. # E.g., [2, 5, 3] -> [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] - arange_matrix = np.tile(np.arange(max_num_scheduled_tokens), + arange_matrix = np.tile(np.arange(max_blocked_seq_len), (num_reqs, 1)) mask = arange_matrix < num_scheduled_tokens[:, np.newaxis] arange = arange_matrix[mask] - + arange_matrix[~mask] = 0 + + # Get positions. positions = torch.empty((total_num_scheduled_tokens, ), dtype=torch.int32, @@ -473,16 +483,23 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): out=input_ids) # Calculate the slot mapping. - block_numbers = self.input_batch.block_table_cpu_tensor.flatten()[ - token_indices // self.block_size] + #block_numbers = self.input_batch.block_table_cpu_tensor.flatten()[ + # token_indices // self.block_size] + block_numbers = self.input_batch.block_table_cpu_tensor block_offsets = token_indices % self.block_size - slot_mapping = torch.empty((total_num_scheduled_tokens, ), - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) - torch.add(block_numbers * self.block_size, - block_offsets, - out=slot_mapping) + + block_table = self.input_batch.block_table_cpu_tensor[:num_reqs,:num_blocks] + import pdb; pdb.set_trace() + + slot_mapping = torch.add(torch.repeat_interleave(torch.mul(block_table, self.block_size), self.block_size, dim=1), torch.from_numpy(arange_matrix)) + slot_mapping.masked_fill_(torch.from_numpy(~mask), 0) + #slot_mapping = torch.empty((num_reqs,max_seq_len), + # dtype=torch.int32, + # device="cpu", + # pin_memory=self.pin_memory) +# torch.add(block_numbers * self.block_size, +# block_offsets, +# out=slot_mapping) # Prepare the attention metadata. query_start_loc = torch.empty((num_reqs + 1, ), @@ -502,13 +519,17 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): seq_start_loc_np[0] = 0 np.cumsum(seq_lens, out=seq_start_loc_np[1:]) - import pdb; pdb.set_trace() # NOTE(kzawora): this is probably dumb - input_ids = self.input_batch.token_ids_cpu[:num_reqs,:max_seq_len] + input_ids = self.input_batch.token_ids_cpu[:num_reqs,:max_blocked_seq_len] positions = [list(range(context_len, seq_len)) for context_len, seq_len in zip(context_lens,seq_lens)] # idk what to do here self.input_ids[:num_reqs,:max_seq_len].copy_(torch.from_numpy(input_ids), non_blocking=True) - self.positions[:num_reqs,:max_seq_len].copy_(torch.from_numpy(positions), + positions = make_tensor_with_pad(positions, + max_len=max_blocked_seq_len, + pad=0, + dtype=torch.long, + device=self.device) + self.positions[:num_reqs,:max_seq_len].copy_(positions, non_blocking=True) seq_lens_tensor = torch.empty((num_reqs, ), dtype=torch.int32, @@ -525,14 +546,13 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): query_start_loc = query_start_loc.to(self.device, non_blocking=True) seq_start_loc = seq_start_loc.to(self.device, non_blocking=True) slot_mapping = slot_mapping.to(self.device, non_blocking=True).long() - seq_lens = self.seq_lens.to(self.device, non_blocking=True) - - import pdb; pdb.set_trace() - attn_metadata = None + seq_lens_tensor = seq_lens_tensor.to(self.device, non_blocking=True) + context_lens_tensor = context_lens_tensor.to(self.device, non_blocking=True) prefix_block_list_tensor = [] # FIXME(kzawora) block_indices, block_offsets = precompute_indices_and_offsets( - self.block_size, slot_mapping, True) + self.block_size, slot_mapping, is_prompt) + import pdb; pdb.set_trace() attn_metadata = HPUAttentionMetadata( is_prompt=is_prompt, block_list=prefix_block_list_tensor, @@ -590,22 +610,22 @@ def execute_model( self._update_states(scheduler_output) attn_metadata, logits_indices = self._prepare_inputs(scheduler_output) num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens - if (self.use_cuda_graph - and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1]): - # Use piecewise CUDA graphs. - # Add padding to the batch size. - num_input_tokens = self._get_padded_batch_size( - num_scheduled_tokens) - else: - # Eager mode. - num_input_tokens = num_scheduled_tokens + batch_size = self.input_batch.num_reqs + seq_lens = (self.input_batch.num_computed_tokens_cpu[:batch_size] + + num_scheduled_tokens) + max_seq_len = seq_lens.max() + num_blocks = math.ceil(max_seq_len / self.block_size) # NOTE(kzawora): it seems like block table is overallocated by 1, but I don't know why. Maybe add +1 here + max_blocked_seq_len = num_blocks * self.block_size + seq_len = num_scheduled_tokens + trimmed_attn_metadata = trim_attn_metadata(attn_metadata) with set_forward_context(attn_metadata): - hidden_states = self.model( - input_ids=self.input_ids[:num_input_tokens], - positions=self.positions[:num_input_tokens], + import pdb; pdb.set_trace() + hidden_states = self.model.forward( + input_ids=self.input_ids[:batch_size,:max_blocked_seq_len], + positions=self.positions[:batch_size,:max_blocked_seq_len], kv_caches=self.kv_caches, - attn_metadata=None, + attn_metadata=trimmed_attn_metadata, ) hidden_states = hidden_states[:num_scheduled_tokens] hidden_states = hidden_states[logits_indices] @@ -739,6 +759,7 @@ def _dummy_run( ) selected_token_indices = torch.arange(0, seq_len*batch_size, device=self.device) trimmed_attn_metadata = trim_attn_metadata(attn_metadata) + return # bypass dummy run for now # Dummy run. self.model.forward(input_ids=input_ids, positions=position_ids, @@ -813,13 +834,14 @@ def initialize_kv_cache(self, num_blocks: int) -> None: if self.device != 'hpu' and not is_fake_hpu() \ and self.dtype == torch.float8_e4m3fn: dtype = torch.uint8 - for _ in range(self.num_attention_layers): + for _ in range(self.num_attn_layers): key_cache = torch.zeros(kv_cache_shape, dtype=dtype, device=self.device) value_cache = torch.zeros(kv_cache_shape, dtype=dtype, device=self.device) kv_layer = (key_cache, value_cache) self.kv_caches.append(kv_layer) + htorch.hpu.synchronize() def _get_padded_batch_size(self, batch_size: int) -> Optional[int]: # TODO: Optimize this? @@ -870,7 +892,7 @@ def __init__( self.token_ids_cpu = np.empty((max_num_reqs, max_model_len), dtype=np.int32) - self.num_computed_tokens_cpu = np.empty(max_num_reqs, dtype=np.int32) + self.num_computed_tokens_cpu = np.zeros(max_num_reqs, dtype=np.int32) self.num_output_tokens_cpu = np.empty(max_num_reqs, dtype=np.int32) self.num_prompt_tokens_cpu = np.empty(max_num_reqs, dtype=np.int32) diff --git a/vllm/v1/worker/hpu_worker.py b/vllm/v1/worker/hpu_worker.py index 2cc33bdb51bce..03c0f9cdf21b7 100644 --- a/vllm/v1/worker/hpu_worker.py +++ b/vllm/v1/worker/hpu_worker.py @@ -1,39 +1,28 @@ -############################################################################### -# Copyright (C) 2024 Habana Labs, Ltd. an Intel Company -############################################################################### - +"""A GPU worker class.""" import gc import os -from typing import List, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Optional, Tuple -import habana_frameworks.torch as htorch # noqa:F401 import torch import torch.distributed -from vllm_hpu_extension.profiler import HabanaMemoryProfiler, format_bytes -import vllm.envs as envs -from vllm.config import ParallelConfig, SpeculativeConfig, VllmConfig +from vllm.config import CacheConfig, ModelConfig, ParallelConfig, VllmConfig from vllm.distributed import (ensure_model_parallel_initialized, init_distributed_environment) from vllm.logger import init_logger from vllm.model_executor import set_random_seed -from vllm.utils import hpu_backend_string, hpu_device_string, is_fake_hpu -from vllm.v1.core.scheduler import SchedulerOutput +from vllm.utils import STR_DTYPE_TO_TORCH_DTYPE, get_dtype_size, is_fake_hpu from vllm.v1.outputs import ModelRunnerOutput -from vllm.worker.cache_engine import CacheEngine from vllm.v1.worker.hpu_model_runner import HPUModelRunner -from vllm.worker.model_runner_base import ModelRunnerBase logger = init_logger(__name__) +from vllm_hpu_extension.profiler import HabanaMemoryProfiler, format_bytes +if TYPE_CHECKING: + from vllm.v1.core.scheduler import SchedulerOutput -class HPUWorker: - """A worker class that executes (a partition of) the model on a HPU. - Each worker is associated with a single HPU. The worker is responsible for - maintaining the KV cache and executing the model on the HPU. In case of - distributed inference, each worker is assigned a partition of the model. - """ +class HPUWorker: def __init__( self, @@ -41,10 +30,9 @@ def __init__( local_rank: int, rank: int, distributed_init_method: str, - is_driver_worker: bool = False, - speculative_config: Optional[SpeculativeConfig] = None, - model_runner_cls: Optional[Type[ModelRunnerBase]] = None, - ) -> None: + ): + + # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config) self.vllm_config = vllm_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config @@ -56,82 +44,27 @@ def __init__( self.speculative_config = vllm_config.speculative_config self.prompt_adapter_config = vllm_config.prompt_adapter_config self.observability_config = vllm_config.observability_config - self.parallel_config.rank = rank + self.local_rank = local_rank self.rank = rank self.distributed_init_method = distributed_init_method - self.is_driver_worker = is_driver_worker - if self.is_driver_worker: - assert self.rank == 0, "The driver worker must have rank 0." if self.model_config.trust_remote_code: # note: lazy import to avoid importing torch before initializing from vllm.utils import init_cached_hf_modules init_cached_hf_modules() - self.model_runner: HPUModelRunner = HPUModelRunner( - vllm_config=vllm_config) - # Uninitialized cache engine. Will be initialized by - # initialize_cache. - self.cache_engine: List[HPUCacheEngine] - # Initialize gpu_cache as embedding models don't initialize kv_caches - self.hpu_cache: Optional[List[List[torch.tensor]]] = None - # Torch profiler. Enabled and configured through env vars: - # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace - if envs.VLLM_TORCH_PROFILER_DIR: - torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR - logger.info("Profiling enabled. Traces will be saved to: %s", - torch_profiler_trace_dir) - self.profiler = torch.profiler.profile( - activities=[ - torch.profiler.ProfilerActivity.CPU, - torch.profiler.ProfilerActivity.HPU, - ], - with_stack=True, - on_trace_ready=torch.profiler.tensorboard_trace_handler( - torch_profiler_trace_dir, use_gzip=True)) - else: - self.profiler = None - - def start_profile(self): - if self.profiler is None: - raise RuntimeError("Profiler is not enabled.") - self.profiler.start() - - def stop_profile(self): - if self.profiler is None: - raise RuntimeError("Profiler is not enabled.") - self.profiler.stop() - - def _set_env_vars(self): - local_rank = self.local_rank - if self.parallel_config.world_size == 1: - local_rank = -1 - import os - os.environ["LOCAL_RANK"] = str(local_rank) - os.environ["ID"] = str(local_rank) - os.environ["WORLD_SIZE"] = str(self.parallel_config.world_size) - os.environ["RANK"] = str(self.rank) - - def init_device(self) -> None: - if self.device_config.device.type == "hpu": - self.device = torch.device("hpu") - torch.hpu.set_device(self.device) - elif self.device_config.device_type == "cpu": - self.device = torch.device("cpu") - else: - raise RuntimeError( - f"Not support device type: {self.device_config.device}") + self.model_runner = HPUModelRunner(vllm_config) + + def initialize(self): # Initialize the distributed environment. - if self.model_config.quantization == 'inc': - self._set_env_vars() init_worker_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method, self.local_rank) # Set random seed. set_random_seed(self.model_config.seed) - def load_model(self): + def load_model(self) -> None: self.model_runner.load_model() @torch.inference_mode() @@ -153,7 +86,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: # Execute a forward pass with dummy inputs to profile the memory usage # of the model. if is_fake_hpu(): - cache_block_size = self.get_cache_block_size_bytes() + cache_block_size = _get_cache_block_size(self.cache_config, self.model_config, self.parallel_config) fake_hpu_cache_alloc = 4 * 2**30 # take 4 GiB flat on fake hpu return fake_hpu_cache_alloc // cache_block_size, 0 with HabanaMemoryProfiler() as m: @@ -166,7 +99,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: # recipes we will use the extra memory for graphs/blocks free_hpu_memory = torch.hpu.mem_get_info()[0] - cache_block_size = self.get_cache_block_size_bytes() + cache_block_size = _get_cache_block_size(self.cache_config, self.model_config, self.parallel_config) graph_reserved_mem = (float( os.environ.get('VLLM_GRAPH_RESERVED_MEM', '0.1')) if not self.model_config.enforce_eager else 0) @@ -187,90 +120,47 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: f"{format_bytes(cache_size_bytes)} reserved for KV cache") logger.info(msg) num_hpu_blocks = int(cache_size_bytes // cache_block_size) - num_cpu_blocks = int(self.cache_config.swap_space_bytes // - cache_block_size) num_hpu_blocks = max(num_hpu_blocks, 0) - num_cpu_blocks = max(num_cpu_blocks, 0) gc.collect() - return num_hpu_blocks, num_cpu_blocks - - def initialize_cache(self, num_gpu_blocks: int, - num_cpu_blocks: int) -> None: - """Allocate GPU and CPU KV cache with the specified number of blocks. - - This also warms up the model, which may record CUDA graphs. - """ - raise_if_cache_size_invalid(num_gpu_blocks, - self.cache_config.block_size, - self.model_config.max_model_len) - - self.cache_config.num_gpu_blocks = num_gpu_blocks - self.cache_config.num_cpu_blocks = num_cpu_blocks - - with HabanaMemoryProfiler() as m: - self._init_cache_engine() - torch.hpu.synchronize() - msg = ("Initializing cache engine " - f"took {m.get_summary_string()}") - logger.info(msg) - self._warm_up_model() - - def _init_cache_engine(self): - assert self.cache_config.num_gpu_blocks is not None - self.cache_engine = [ - HPUCacheEngine(self.cache_config, self.model_config, - self.parallel_config, self.device_config) - for _ in range(self.parallel_config.pipeline_parallel_size) - ] - self.hpu_cache = [ - self.cache_engine[ve].gpu_cache - for ve in range(self.parallel_config.pipeline_parallel_size) - ] - - def _warm_up_model(self) -> None: - # NOTE(kzawora): We should use virtual engine index here - # for pipeline parallelism. Using 0 for now. - assert self.hpu_cache is not None - logger.error("Warmup is disabled for v1 HPU worker") - #self.model_runner.warmup_model(self.hpu_cache[0]) + return num_hpu_blocks, 0 + + + def initialize_cache(self, num_gpu_blocks: int) -> None: + """Allocate GPU and CPU KV cache with the specified number of blocks.""" + if num_gpu_blocks <= 0: + raise ValueError("No available memory for the cache blocks. " + "Try increasing `gpu_memory_utilization` when " + "initializing the engine.") + + max_seq_len = self.cache_config.block_size * num_gpu_blocks + max_model_len = self.model_config.max_model_len + if max_model_len > max_seq_len: + raise ValueError( + f"The model's max seq len ({max_model_len}) " + "is larger than the maximum number of tokens that can be " + f"stored in KV cache ({max_seq_len}). Try increasing " + "`gpu_memory_utilization` or decreasing `max_model_len` when " + "initializing the engine.") + + self.model_runner.initialize_kv_cache(num_gpu_blocks) + + def compile_or_warm_up_model(self) -> None: + if not self.model_config.enforce_eager: + self.model_runner.capture_model() # Reset the seed to ensure that the random state is not affected by # the model initialization and profiling. set_random_seed(self.model_config.seed) - def finish_measurements(self): - self.model_runner.finish_measurements() - - @property - def kv_cache(self) -> Optional[List[List[torch.Tensor]]]: - return self.hpu_cache - - + @torch.inference_mode() def execute_model( self, scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: output = self.model_runner.execute_model(scheduler_output) + # TODO(woosuk): Send the output to the engine process. return output - def shutdown_inc(self): - self.model_runner.shutdown_inc() - - @property - def max_model_len(self) -> int: - return self.model_config.max_model_len - - @property - def vocab_size(self) -> int: - return self.model_runner.vocab_size - - def get_cache_block_size_bytes(self) -> int: - """Get the size of the KV cache block size in bytes. - """ - return HPUCacheEngine.get_cache_block_size(self.cache_config, - self.model_config, - self.parallel_config) - def init_worker_distributed_environment( parallel_config: ParallelConfig, @@ -279,81 +169,30 @@ def init_worker_distributed_environment( local_rank: int = -1, ) -> None: """Initialize the distributed environment.""" - backend = hpu_backend_string() - init_distributed_environment(parallel_config.world_size, - rank, - distributed_init_method, - local_rank, - backend=backend) + init_distributed_environment(parallel_config.world_size, rank, + distributed_init_method, local_rank, backend='hccl') ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size) - if torch.distributed.is_initialized(): - torch_world_size = torch.distributed.get_world_size() - if torch_world_size != parallel_config.world_size: - raise RuntimeError( - "torch.distributed is already initialized but the torch world " - "size does not match parallel_config.world_size " - f"({torch_world_size} vs. {parallel_config.world_size}).") - elif not distributed_init_method: - raise ValueError( - "distributed_init_method must be set if torch.distributed " - "is not already initialized") - else: - backend = hpu_backend_string() - torch.distributed.init_process_group( - backend=backend, - world_size=parallel_config.world_size, - rank=rank, - init_method=distributed_init_method, - ) - - # A small all_reduce for warmup & checking conformance. - device = hpu_device_string() - dummy_tensor_hpu = torch.ones(1).to(device) - torch.distributed.all_reduce(dummy_tensor_hpu) - assert dummy_tensor_hpu.item() == parallel_config.world_size - ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, - parallel_config.pipeline_parallel_size) - - -def raise_if_cache_size_invalid(num_gpu_blocks, block_size, - max_model_len) -> None: - if num_gpu_blocks <= 0: - raise ValueError("No available memory for the cache blocks. " - "Try increasing `gpu_memory_utilization` when " - "initializing the engine.") - max_seq_len = block_size * num_gpu_blocks - if max_model_len > max_seq_len: - raise ValueError( - f"The model's max seq len ({max_model_len}) " - "is larger than the maximum number of tokens that can be " - f"stored in KV cache ({max_seq_len}). Try increasing " - "`gpu_memory_utilization` or decreasing `max_model_len` when " - "initializing the engine.") -class HPUCacheEngine(CacheEngine): - - def _allocate_kv_cache( - self, - num_blocks: int, - device: str, - ) -> List[Tuple[torch.Tensor, torch.Tensor]]: - """Allocates KV cache on the specified device.""" - kv_cache_shape = self.attn_backend.get_kv_cache_shape( - num_blocks, self.block_size, self.num_kv_heads, self.head_size) - kv_cache: List[Tuple[torch.Tensor, torch.Tensor]] = [] - dtype = self.dtype - if device != 'hpu' and not is_fake_hpu() \ - and self.dtype == torch.float8_e4m3fn: - dtype = torch.uint8 - for _ in range(self.num_attention_layers): - key_cache = torch.zeros(kv_cache_shape, dtype=dtype, device=device) - value_cache = torch.zeros(kv_cache_shape, - dtype=dtype, - device=device) - kv_layer = (key_cache, value_cache) - kv_cache.append(kv_layer) - return kv_cache +def _get_cache_block_size( + cache_config: CacheConfig, + model_config: ModelConfig, + parallel_config: ParallelConfig, +) -> int: + head_size = model_config.get_head_size() + num_heads = model_config.get_num_kv_heads(parallel_config) + num_attention_layers = model_config.get_num_attention_layers( + parallel_config) + + key_cache_block = cache_config.block_size * num_heads * head_size + value_cache_block = key_cache_block + total = num_attention_layers * (key_cache_block + value_cache_block) + if cache_config.cache_dtype == "auto": + dtype = model_config.dtype + else: + dtype = STR_DTYPE_TO_TORCH_DTYPE[cache_config.cache_dtype] + dtype_size = get_dtype_size(dtype) + return dtype_size * total