From 4f2ae67b4be90ecdcc3f08fe53e4eee41d7c21b0 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 26 Dec 2024 14:33:05 +0500 Subject: [PATCH] Support volumes for TPUs (#2144) * Fix GCP GetRealDeviceName to use deviceName * Support volumes for TPUs * Fix TPU volume attachment when provisioning on run submission * Mention volumes in TPU example * Fix error message --- docs/docs/reference/server/config.yml.md | 3 +- examples/accelerators/amd/README.md | 7 +- examples/accelerators/tpu/README.md | 27 +-- runner/internal/shim/backends/gcp.go | 5 +- .../_internal/core/backends/gcp/compute.py | 165 ++++++++++++++---- .../_internal/core/backends/gcp/resources.py | 4 + src/dstack/_internal/core/models/instances.py | 2 + 7 files changed, 163 insertions(+), 50 deletions(-) diff --git a/docs/docs/reference/server/config.yml.md b/docs/docs/reference/server/config.yml.md index d1f51168f..20916b5ed 100644 --- a/docs/docs/reference/server/config.yml.md +++ b/docs/docs/reference/server/config.yml.md @@ -455,8 +455,9 @@ gcloud projects list --format="json(projectId)" ``` tpu.nodes.create - tpu.nodes.delete tpu.nodes.get + tpu.nodes.update + tpu.nodes.delete tpu.operations.get tpu.operations.list ``` diff --git a/examples/accelerators/amd/README.md b/examples/accelerators/amd/README.md index d8c13265f..bb0dd67db 100644 --- a/examples/accelerators/amd/README.md +++ b/examples/accelerators/amd/README.md @@ -1,11 +1,12 @@ # AMD -If you're using the `runpod` backend or have set up an [SSH fleets](https://dstack.ai/docs/concepts/fleets#ssh-fleets) -with on-prem AMD GPUs, you can use AMD GPUs. +`dstack` supports running dev environments, tasks, and services on AMD GPUs. +You can do that by setting up an [SSH fleet](https://dstack.ai/docs/concepts/fleets#ssh-fleets) +with on-prem AMD GPUs or configuring a backend that offers AMD GPUs such as the `runpod` backend. ## Deployment -You can use any serving framework, such as TGI and vLLM. Here's an example of a [service](https://dstack.ai/docs/services) that deploys +Most serving frameworks including vLLM and TGI have AMD support. Here's an example of a [service](https://dstack.ai/docs/services) that deploys Llama 3.1 70B in FP16 using [TGI :material-arrow-top-right-thin:{ .external }](https://huggingface.co/docs/text-generation-inference/en/installation_amd){:target="_blank"} and [vLLM :material-arrow-top-right-thin:{ .external }](https://docs.vllm.ai/en/latest/getting_started/amd-installation.html){:target="_blank"}. === "TGI" diff --git a/examples/accelerators/tpu/README.md b/examples/accelerators/tpu/README.md index 788ba72a7..41c33b12a 100644 --- a/examples/accelerators/tpu/README.md +++ b/examples/accelerators/tpu/README.md @@ -1,17 +1,24 @@ # TPU -If you're using the `gcp` backend, you can use TPUs. Just specify the TPU version and the number of cores -(separated by a dash), in the `gpu` property under `resources`. - -> Currently, maximum 8 TPU cores can be specified, so the maximum supported values are `v2-8`, `v3-8`, `v4-8`, `v5litepod-8`, -> and `v5e-8`. Multi-host TPU support, allowing for larger numbers of cores, is coming soon. +If you've configured the `gcp` backend in `dstack`, you can run dev environments, tasks, and services on [TPUs](https://cloud.google.com/tpu/docs/intro-to-tpu). +Choose a TPU instance by specifying the TPU version and the number of cores (e.g. `v5litepod-8`) in the `gpu` property under `resources`, +or request TPUs by specifying `tpu` as `vendor` ([see examples](https://dstack.ai/docs/guides/protips/#gpu)). Below are a few examples on using TPUs for deployment and fine-tuning. +!!! info "Multi-host TPUs" + Currently, `dstack` supports only single-host TPUs, which means that + the maximum supported number of cores is `8` (e.g. `v2-8`, `v3-8`, `v5litepod-8`, `v5p-8`, `v6e-8`). + Multi-host TPU support is on the roadmap. + +!!! info "TPU storage" + By default, each TPU VM contains a 100GB boot disk and its size cannot be changed. + If you need more storage, attach additional disks using [Volumes](https://dstack.ai/docs/concepts/volumes/). + ## Deployment -You can use any serving framework, such as vLLM, TGI. Here's an example of a [service](https://dstack.ai/docs/services) that deploys -Llama 3.1 8B using +Many serving frameworks including vLLM and TGI have TPU support. +Here's an example of a [service](https://dstack.ai/docs/services) that deploys Llama 3.1 8B using [Optimum TPU :material-arrow-top-right-thin:{ .external }](https://github.com/huggingface/optimum-tpu){:target="_blank"} and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm-project/vllm){:target="_blank"}. @@ -40,7 +47,7 @@ and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm- ``` - Note, for `Optimum TPU` by default `MAX_INPUT_TOKEN` is set to 4095, consequently we must set `MAX_BATCH_PREFILL_TOKENS` to 4095. + Note that for Optimum TPU `MAX_INPUT_TOKEN` is set to 4095 by default. We must also set `MAX_BATCH_PREFILL_TOKENS` to 4095. ??? info "Docker image" The official Docker image `huggingface/optimum-tpu:latest` doesn’t support Llama 3.1-8B. @@ -92,7 +99,7 @@ and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm- ### Memory requirements -Below are the approximate memory requirements for serving LLMs with their corresponding TPUs. +Below are the approximate memory requirements for serving LLMs with the minimal required TPU configuration: | Model size | bfloat16 | TPU | int8 | TPU | |------------|----------|--------------|-------|----------------| @@ -152,7 +159,7 @@ resources: ### Memory requirements -Below are the approximate memory requirements for fine-tuning LLMs with their corresponding TPUs. +Below are the approximate memory requirements for fine-tuning LLMs with the minimal required TPU configuration: | Model size | LoRA | TPU | |------------|-------|--------------| diff --git a/runner/internal/shim/backends/gcp.go b/runner/internal/shim/backends/gcp.go index f82eec60a..b0724bd5b 100644 --- a/runner/internal/shim/backends/gcp.go +++ b/runner/internal/shim/backends/gcp.go @@ -15,12 +15,11 @@ func NewGCPBackend() *GCPBackend { } // GetRealDeviceName resolves device names according to https://cloud.google.com/compute/docs/disks/disk-symlinks -// The server registers device name as pd-{volumeID} func (e *GCPBackend) GetRealDeviceName(volumeID, deviceName string) (string, error) { // Try resolving first partition or external volumes - realDeviceName, err := os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-pd-%s-part1", volumeID)) + realDeviceName, err := os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-%s-part1", deviceName)) if err != nil { - realDeviceName, err = os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-pd-%s", volumeID)) + realDeviceName, err = os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-%s", deviceName)) if err != nil { return "", fmt.Errorf("failed to resolve symlink for volume %s: %w", volumeID, err) } diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 8adbd9fca..af360e2ed 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -81,11 +81,12 @@ def __init__(self, config: GCPConfig): def get_offers( self, requirements: Optional[Requirements] = None ) -> List[InstanceOfferWithAvailability]: + regions = get_or_error(self.config.regions) offers = get_catalog_offers( backend=BackendType.GCP, requirements=requirements, configurable_disk_size=CONFIGURABLE_DISK_SIZE, - extra_filter=_supported_instances_and_zones(self.config.regions), + extra_filter=_supported_instances_and_zones(regions), ) quotas: Dict[str, Dict[str, float]] = defaultdict(dict) for region in self.regions_client.list(project=self.config.project_id): @@ -180,14 +181,17 @@ def create_instance( } labels = {k: v for k, v in labels.items() if gcp_resources.is_valid_label_value(v)} labels = merge_tags(tags=labels, backend_tags=self.config.tags) - tpu = ( + is_tpu = ( _is_tpu(instance_offer.instance.resources.gpus[0].name) if instance_offer.instance.resources.gpus else False ) - if tpu: + if is_tpu: instance_id = f"tpu-{instance_config.instance_name}" startup_script = _get_tpu_startup_script(authorized_keys) + # GCP does not allow attaching disks while TPUs is creating, + # so we need to attach the disks on creation. + data_disks = _get_tpu_data_disks(self.config.project_id, instance_config.volumes) for zone in zones: tpu_node = gcp_resources.create_tpu_node_struct( instance_name=instance_offer.instance.name, @@ -199,6 +203,7 @@ def create_instance( subnetwork=subnetwork, allocate_public_ip=allocate_public_ip, service_account=self.config.vm_service_account, + data_disks=data_disks, ) create_node_request = tpu_v2.CreateNodeRequest( parent=f"projects/{self.config.project_id}/locations/{zone}", @@ -233,7 +238,7 @@ def create_instance( username="ubuntu", ssh_proxy=None, dockerized=True, - backend_data=json.dumps({"is_tpu": tpu, "zone": zone}), + backend_data=json.dumps({"is_tpu": is_tpu, "zone": zone}), ) raise NoCapacityError() @@ -312,7 +317,7 @@ def update_provisioning_data( if is_tpu: node_request = tpu_v2.GetNodeRequest( - name=f"projects/dstack/locations/{zone}/nodes/{provisioning_data.instance_id}", + name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{provisioning_data.instance_id}", ) try: instance = self.tpu_client.get_node(request=node_request) @@ -371,6 +376,7 @@ def run_job( SSHKey(public=project_ssh_public_key.strip()), ], user=run.user, + volumes=volumes, ) if len(volumes) > 0: volume = volumes[0] @@ -486,7 +492,7 @@ def register_volume(self, volume: Volume) -> VolumeProvisioningData: disk_type=gcp_resources.full_resource_name_to_name(disk.type_), ).json(), ) - raise ComputeError("Persistent disk %s not found", volume.configuration.volume_id) + raise ComputeError(f"Persistent disk {volume.configuration.volume_id} not found") def create_volume(self, volume: Volume) -> VolumeProvisioningData: zone = gcp_resources.get_availability_zone( @@ -557,6 +563,11 @@ def delete_volume(self, volume: Volume): logger.debug("Deleted persistent disk for volume %s", volume.name) def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentData: + logger.debug( + "Attaching persistent disk for volume %s to instance %s", + volume.volume_id, + instance_id, + ) zone = get_or_error(volume.provisioning_data).availability_zone try: disk = self.disk_client.get( @@ -564,43 +575,113 @@ def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentDat zone=zone, disk=volume.volume_id, ) - disk_url = disk.self_link - attached_disk = compute_v1.AttachedDisk() - attached_disk.source = disk_url - attached_disk.auto_delete = False - attached_disk.device_name = f"pd-{volume.volume_id}" + # This method has no information if the instance is a TPU or a VM, + # so we first try to see if there is a TPU with such name + try: + get_node_request = tpu_v2.GetNodeRequest( + name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{instance_id}", + ) + tpu_node = self.tpu_client.get_node(get_node_request) + except google.api_core.exceptions.NotFound: + tpu_node = None + + if tpu_node is not None: + # Python API to attach a disk to a TPU is not documented, + # so we follow the code from the gcloud CLI: + # https://github.com/twistedpair/google-cloud-sdk/blob/26ab5a281d56b384cc25750f3279a27afe5b499f/google-cloud-sdk/lib/googlecloudsdk/command_lib/compute/tpus/tpu_vm/util.py#L113 + source_disk = ( + f"projects/{self.config.project_id}/zones/{zone}/disks/{volume.volume_id}" + ) + # create_instance() has already attached the disks + # if the TPU is provisioned on the run submission via run_job() + for i, disk in enumerate(tpu_node.data_disks, start=1): + if disk.source_disk == source_disk: + device_name = f"persistent-disk-{i}" + logger.debug( + "Persistent disk for volume %s is already attached to instance %s", + volume.volume_id, + instance_id, + ) + return VolumeAttachmentData(device_name=device_name) + attached_disk = tpu_v2.AttachedDisk( + source_disk=source_disk, + mode=tpu_v2.AttachedDisk.DiskMode.READ_WRITE, + ) + tpu_node.data_disks.append(attached_disk) + # Cannot set device name for TPUs, so use default naming + device_name = f"persistent-disk-{len(tpu_node.data_disks)}" + update_node_request = tpu_v2.UpdateNodeRequest( + node=tpu_node, + update_mask="dataDisks", + ) + operation = self.tpu_client.update_node(update_node_request) + gcp_resources.wait_for_operation(operation, "persistent disk attachment") + else: + attached_disk = compute_v1.AttachedDisk() + attached_disk.source = disk_url + attached_disk.auto_delete = False + attached_disk.device_name = f"pd-{volume.volume_id}" + device_name = attached_disk.device_name - logger.debug( - "Attaching persistent disk for volume %s to instance %s", - volume.volume_id, - instance_id, - ) - operation = self.instances_client.attach_disk( - project=self.config.project_id, - zone=zone, - instance=instance_id, - attached_disk_resource=attached_disk, - ) - gcp_resources.wait_for_extended_operation(operation, "persistent disk attachment") + operation = self.instances_client.attach_disk( + project=self.config.project_id, + zone=zone, + instance=instance_id, + attached_disk_resource=attached_disk, + ) + gcp_resources.wait_for_extended_operation(operation, "persistent disk attachment") except google.api_core.exceptions.NotFound: - raise ComputeError("Persistent disk not found") + raise ComputeError("Persistent disk or instance not found") logger.debug( "Attached persistent disk for volume %s to instance %s", volume.volume_id, instance_id ) - return VolumeAttachmentData( - device_name=attached_disk.device_name, - ) + return VolumeAttachmentData(device_name=device_name) def detach_volume(self, volume: Volume, instance_id: str): - operation = self.instances_client.detach_disk( - project=self.config.project_id, - zone=get_or_error(volume.provisioning_data).availability_zone, - instance=instance_id, - device_name=get_or_error(volume.attachment_data).device_name, + logger.debug( + "Detaching persistent disk for volume %s from instance %s", + volume.volume_id, + instance_id, + ) + zone = get_or_error(volume.provisioning_data).availability_zone + # This method has no information if the instance is a TPU or a VM, + # so we first try to see if there is a TPU with such name + try: + get_node_request = tpu_v2.GetNodeRequest( + name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{instance_id}", + ) + tpu_node = self.tpu_client.get_node(get_node_request) + except google.api_core.exceptions.NotFound: + tpu_node = None + + if tpu_node is not None: + source_disk = ( + f"projects/{self.config.project_id}/zones/{zone}/disks/{volume.volume_id}" + ) + tpu_node.data_disks = [ + disk for disk in tpu_node.data_disks if disk.source_disk != source_disk + ] + update_node_request = tpu_v2.UpdateNodeRequest( + node=tpu_node, + update_mask="dataDisks", + ) + operation = self.tpu_client.update_node(update_node_request) + gcp_resources.wait_for_operation(operation, "persistent disk detachment") + else: + operation = self.instances_client.detach_disk( + project=self.config.project_id, + zone=get_or_error(volume.provisioning_data).availability_zone, + instance=instance_id, + device_name=get_or_error(volume.attachment_data).device_name, + ) + gcp_resources.wait_for_extended_operation(operation, "persistent disk detachment") + logger.debug( + "Detached persistent disk for volume %s from instance %s", + volume.volume_id, + instance_id, ) - gcp_resources.wait_for_extended_operation(operation, "persistent disk detachment") def _get_vpc_subnet( @@ -729,3 +810,21 @@ def _get_volume_price(size: int) -> float: # https://cloud.google.com/compute/disks-image-pricing#persistentdisk # The price is different in different regions. Take max across supported regions. return size * 0.12 + + +def _get_tpu_data_disks( + project_id: str, volumes: Optional[List[Volume]] +) -> List[tpu_v2.AttachedDisk]: + if volumes is None: + return [] + return [_get_tpu_data_disk_for_volume(project_id, volume) for volume in volumes] + + +def _get_tpu_data_disk_for_volume(project_id: str, volume: Volume) -> tpu_v2.AttachedDisk: + zone = get_or_error(volume.provisioning_data).availability_zone + source_disk = f"projects/{project_id}/zones/{zone}/disks/{volume.volume_id}" + attached_disk = tpu_v2.AttachedDisk( + source_disk=source_disk, + mode=tpu_v2.AttachedDisk.DiskMode.READ_WRITE, + ) + return attached_disk diff --git a/src/dstack/_internal/core/backends/gcp/resources.py b/src/dstack/_internal/core/backends/gcp/resources.py index de50aa023..dde92931d 100644 --- a/src/dstack/_internal/core/backends/gcp/resources.py +++ b/src/dstack/_internal/core/backends/gcp/resources.py @@ -369,6 +369,7 @@ def create_tpu_node_struct( subnetwork: Optional[str] = None, allocate_public_ip: bool = True, service_account: Optional[str] = None, + data_disks: Optional[List[tpu_v2.AttachedDisk]] = None, ) -> tpu_v2.Node: node = tpu_v2.Node() if spot: @@ -388,6 +389,9 @@ def create_tpu_node_struct( email=service_account, scope=["https://www.googleapis.com/auth/cloud-platform"], ) + if data_disks is not None: + for disk in data_disks: + node.data_disks.append(disk) return node diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 8f118d750..3dec65aaa 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -7,6 +7,7 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreModel from dstack._internal.core.models.envs import Env +from dstack._internal.core.models.volumes import Volume from dstack._internal.utils.common import pretty_resources @@ -103,6 +104,7 @@ class InstanceConfiguration(CoreModel): availability_zone: Optional[str] = None placement_group_name: Optional[str] = None reservation: Optional[str] = None + volumes: Optional[List[Volume]] = None def get_public_keys(self) -> List[str]: return [ssh_key.public.strip() for ssh_key in self.ssh_keys]