Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support volumes for TPUs #2144

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/docs/reference/server/config.yml.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ gcloud projects list --format="json(projectId)"

```
tpu.nodes.create
tpu.nodes.delete
tpu.nodes.get
tpu.nodes.update
tpu.nodes.delete
tpu.operations.get
tpu.operations.list
```
Expand Down
7 changes: 4 additions & 3 deletions examples/accelerators/amd/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# AMD

If you're using the `runpod` backend or have set up an [SSH fleets](https://dstack.ai/docs/concepts/fleets#ssh-fleets)
with on-prem AMD GPUs, you can use AMD GPUs.
`dstack` supports running dev environments, tasks, and services on AMD GPUs.
You can do that by setting up an [SSH fleet](https://dstack.ai/docs/concepts/fleets#ssh-fleets)
with on-prem AMD GPUs or configuring a backend that offers AMD GPUs such as the `runpod` backend.

## Deployment

You can use any serving framework, such as TGI and vLLM. Here's an example of a [service](https://dstack.ai/docs/services) that deploys
Most serving frameworks including vLLM and TGI have AMD support. Here's an example of a [service](https://dstack.ai/docs/services) that deploys
Llama 3.1 70B in FP16 using [TGI :material-arrow-top-right-thin:{ .external }](https://huggingface.co/docs/text-generation-inference/en/installation_amd){:target="_blank"} and [vLLM :material-arrow-top-right-thin:{ .external }](https://docs.vllm.ai/en/latest/getting_started/amd-installation.html){:target="_blank"}.

=== "TGI"
Expand Down
27 changes: 17 additions & 10 deletions examples/accelerators/tpu/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
# TPU

If you're using the `gcp` backend, you can use TPUs. Just specify the TPU version and the number of cores
(separated by a dash), in the `gpu` property under `resources`.

> Currently, maximum 8 TPU cores can be specified, so the maximum supported values are `v2-8`, `v3-8`, `v4-8`, `v5litepod-8`,
> and `v5e-8`. Multi-host TPU support, allowing for larger numbers of cores, is coming soon.
If you've configured the `gcp` backend in `dstack`, you can run dev environments, tasks, and services on [TPUs](https://cloud.google.com/tpu/docs/intro-to-tpu).
Choose a TPU instance by specifying the TPU version and the number of cores (e.g. `v5litepod-8`) in the `gpu` property under `resources`,
or request TPUs by specifying `tpu` as `vendor` ([see examples](https://dstack.ai/docs/guides/protips/#gpu)).

Below are a few examples on using TPUs for deployment and fine-tuning.

!!! info "Multi-host TPUs"
Currently, `dstack` supports only single-host TPUs, which means that
the maximum supported number of cores is `8` (e.g. `v2-8`, `v3-8`, `v5litepod-8`, `v5p-8`, `v6e-8`).
Multi-host TPU support is on the roadmap.

!!! info "TPU storage"
By default, each TPU VM contains a 100GB boot disk and its size cannot be changed.
If you need more storage, attach additional disks using [Volumes](https://dstack.ai/docs/concepts/volumes/).

## Deployment

You can use any serving framework, such as vLLM, TGI. Here's an example of a [service](https://dstack.ai/docs/services) that deploys
Llama 3.1 8B using
Many serving frameworks including vLLM and TGI have TPU support.
Here's an example of a [service](https://dstack.ai/docs/services) that deploys Llama 3.1 8B using
[Optimum TPU :material-arrow-top-right-thin:{ .external }](https://github.com/huggingface/optimum-tpu){:target="_blank"}
and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm-project/vllm){:target="_blank"}.

Expand Down Expand Up @@ -40,7 +47,7 @@ and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm-
```
</div>

Note, for `Optimum TPU` by default `MAX_INPUT_TOKEN` is set to 4095, consequently we must set `MAX_BATCH_PREFILL_TOKENS` to 4095.
Note that for Optimum TPU `MAX_INPUT_TOKEN` is set to 4095 by default. We must also set `MAX_BATCH_PREFILL_TOKENS` to 4095.

??? info "Docker image"
The official Docker image `huggingface/optimum-tpu:latest` doesn’t support Llama 3.1-8B.
Expand Down Expand Up @@ -92,7 +99,7 @@ and [vLLM :material-arrow-top-right-thin:{ .external }](https://github.com/vllm-

### Memory requirements

Below are the approximate memory requirements for serving LLMs with their corresponding TPUs.
Below are the approximate memory requirements for serving LLMs with the minimal required TPU configuration:

| Model size | bfloat16 | TPU | int8 | TPU |
|------------|----------|--------------|-------|----------------|
Expand Down Expand Up @@ -152,7 +159,7 @@ resources:

### Memory requirements

Below are the approximate memory requirements for fine-tuning LLMs with their corresponding TPUs.
Below are the approximate memory requirements for fine-tuning LLMs with the minimal required TPU configuration:

| Model size | LoRA | TPU |
|------------|-------|--------------|
Expand Down
5 changes: 2 additions & 3 deletions runner/internal/shim/backends/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ func NewGCPBackend() *GCPBackend {
}

// GetRealDeviceName resolves device names according to https://cloud.google.com/compute/docs/disks/disk-symlinks
// The server registers device name as pd-{volumeID}
func (e *GCPBackend) GetRealDeviceName(volumeID, deviceName string) (string, error) {
// Try resolving first partition or external volumes
realDeviceName, err := os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-pd-%s-part1", volumeID))
realDeviceName, err := os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-%s-part1", deviceName))
if err != nil {
realDeviceName, err = os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-pd-%s", volumeID))
realDeviceName, err = os.Readlink(fmt.Sprintf("/dev/disk/by-id/google-%s", deviceName))
if err != nil {
return "", fmt.Errorf("failed to resolve symlink for volume %s: %w", volumeID, err)
}
Expand Down
165 changes: 132 additions & 33 deletions src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ def __init__(self, config: GCPConfig):
def get_offers(
self, requirements: Optional[Requirements] = None
) -> List[InstanceOfferWithAvailability]:
regions = get_or_error(self.config.regions)
offers = get_catalog_offers(
backend=BackendType.GCP,
requirements=requirements,
configurable_disk_size=CONFIGURABLE_DISK_SIZE,
extra_filter=_supported_instances_and_zones(self.config.regions),
extra_filter=_supported_instances_and_zones(regions),
)
quotas: Dict[str, Dict[str, float]] = defaultdict(dict)
for region in self.regions_client.list(project=self.config.project_id):
Expand Down Expand Up @@ -180,14 +181,17 @@ def create_instance(
}
labels = {k: v for k, v in labels.items() if gcp_resources.is_valid_label_value(v)}
labels = merge_tags(tags=labels, backend_tags=self.config.tags)
tpu = (
is_tpu = (
_is_tpu(instance_offer.instance.resources.gpus[0].name)
if instance_offer.instance.resources.gpus
else False
)
if tpu:
if is_tpu:
instance_id = f"tpu-{instance_config.instance_name}"
startup_script = _get_tpu_startup_script(authorized_keys)
# GCP does not allow attaching disks while TPUs is creating,
# so we need to attach the disks on creation.
data_disks = _get_tpu_data_disks(self.config.project_id, instance_config.volumes)
for zone in zones:
tpu_node = gcp_resources.create_tpu_node_struct(
instance_name=instance_offer.instance.name,
Expand All @@ -199,6 +203,7 @@ def create_instance(
subnetwork=subnetwork,
allocate_public_ip=allocate_public_ip,
service_account=self.config.vm_service_account,
data_disks=data_disks,
)
create_node_request = tpu_v2.CreateNodeRequest(
parent=f"projects/{self.config.project_id}/locations/{zone}",
Expand Down Expand Up @@ -233,7 +238,7 @@ def create_instance(
username="ubuntu",
ssh_proxy=None,
dockerized=True,
backend_data=json.dumps({"is_tpu": tpu, "zone": zone}),
backend_data=json.dumps({"is_tpu": is_tpu, "zone": zone}),
)
raise NoCapacityError()

Expand Down Expand Up @@ -312,7 +317,7 @@ def update_provisioning_data(

if is_tpu:
node_request = tpu_v2.GetNodeRequest(
name=f"projects/dstack/locations/{zone}/nodes/{provisioning_data.instance_id}",
name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{provisioning_data.instance_id}",
)
try:
instance = self.tpu_client.get_node(request=node_request)
Expand Down Expand Up @@ -371,6 +376,7 @@ def run_job(
SSHKey(public=project_ssh_public_key.strip()),
],
user=run.user,
volumes=volumes,
)
if len(volumes) > 0:
volume = volumes[0]
Expand Down Expand Up @@ -486,7 +492,7 @@ def register_volume(self, volume: Volume) -> VolumeProvisioningData:
disk_type=gcp_resources.full_resource_name_to_name(disk.type_),
).json(),
)
raise ComputeError("Persistent disk %s not found", volume.configuration.volume_id)
raise ComputeError(f"Persistent disk {volume.configuration.volume_id} not found")

def create_volume(self, volume: Volume) -> VolumeProvisioningData:
zone = gcp_resources.get_availability_zone(
Expand Down Expand Up @@ -557,50 +563,125 @@ def delete_volume(self, volume: Volume):
logger.debug("Deleted persistent disk for volume %s", volume.name)

def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentData:
logger.debug(
"Attaching persistent disk for volume %s to instance %s",
volume.volume_id,
instance_id,
)
zone = get_or_error(volume.provisioning_data).availability_zone
try:
disk = self.disk_client.get(
project=self.config.project_id,
zone=zone,
disk=volume.volume_id,
)

disk_url = disk.self_link

attached_disk = compute_v1.AttachedDisk()
attached_disk.source = disk_url
attached_disk.auto_delete = False
attached_disk.device_name = f"pd-{volume.volume_id}"
# This method has no information if the instance is a TPU or a VM,
# so we first try to see if there is a TPU with such name
try:
get_node_request = tpu_v2.GetNodeRequest(
name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{instance_id}",
)
tpu_node = self.tpu_client.get_node(get_node_request)
except google.api_core.exceptions.NotFound:
tpu_node = None

if tpu_node is not None:
# Python API to attach a disk to a TPU is not documented,
# so we follow the code from the gcloud CLI:
# https://github.com/twistedpair/google-cloud-sdk/blob/26ab5a281d56b384cc25750f3279a27afe5b499f/google-cloud-sdk/lib/googlecloudsdk/command_lib/compute/tpus/tpu_vm/util.py#L113
source_disk = (
f"projects/{self.config.project_id}/zones/{zone}/disks/{volume.volume_id}"
)
# create_instance() has already attached the disks
# if the TPU is provisioned on the run submission via run_job()
for i, disk in enumerate(tpu_node.data_disks, start=1):
if disk.source_disk == source_disk:
device_name = f"persistent-disk-{i}"
logger.debug(
"Persistent disk for volume %s is already attached to instance %s",
volume.volume_id,
instance_id,
)
return VolumeAttachmentData(device_name=device_name)
attached_disk = tpu_v2.AttachedDisk(
source_disk=source_disk,
mode=tpu_v2.AttachedDisk.DiskMode.READ_WRITE,
)
tpu_node.data_disks.append(attached_disk)
# Cannot set device name for TPUs, so use default naming
device_name = f"persistent-disk-{len(tpu_node.data_disks)}"
update_node_request = tpu_v2.UpdateNodeRequest(
node=tpu_node,
update_mask="dataDisks",
)
operation = self.tpu_client.update_node(update_node_request)
gcp_resources.wait_for_operation(operation, "persistent disk attachment")
else:
attached_disk = compute_v1.AttachedDisk()
attached_disk.source = disk_url
attached_disk.auto_delete = False
attached_disk.device_name = f"pd-{volume.volume_id}"
device_name = attached_disk.device_name

logger.debug(
"Attaching persistent disk for volume %s to instance %s",
volume.volume_id,
instance_id,
)
operation = self.instances_client.attach_disk(
project=self.config.project_id,
zone=zone,
instance=instance_id,
attached_disk_resource=attached_disk,
)
gcp_resources.wait_for_extended_operation(operation, "persistent disk attachment")
operation = self.instances_client.attach_disk(
project=self.config.project_id,
zone=zone,
instance=instance_id,
attached_disk_resource=attached_disk,
)
gcp_resources.wait_for_extended_operation(operation, "persistent disk attachment")
except google.api_core.exceptions.NotFound:
raise ComputeError("Persistent disk not found")
raise ComputeError("Persistent disk or instance not found")
logger.debug(
"Attached persistent disk for volume %s to instance %s", volume.volume_id, instance_id
)
return VolumeAttachmentData(
device_name=attached_disk.device_name,
)
return VolumeAttachmentData(device_name=device_name)

def detach_volume(self, volume: Volume, instance_id: str):
operation = self.instances_client.detach_disk(
project=self.config.project_id,
zone=get_or_error(volume.provisioning_data).availability_zone,
instance=instance_id,
device_name=get_or_error(volume.attachment_data).device_name,
logger.debug(
"Detaching persistent disk for volume %s from instance %s",
volume.volume_id,
instance_id,
)
zone = get_or_error(volume.provisioning_data).availability_zone
# This method has no information if the instance is a TPU or a VM,
# so we first try to see if there is a TPU with such name
try:
get_node_request = tpu_v2.GetNodeRequest(
name=f"projects/{self.config.project_id}/locations/{zone}/nodes/{instance_id}",
)
tpu_node = self.tpu_client.get_node(get_node_request)
except google.api_core.exceptions.NotFound:
tpu_node = None

if tpu_node is not None:
source_disk = (
f"projects/{self.config.project_id}/zones/{zone}/disks/{volume.volume_id}"
)
tpu_node.data_disks = [
disk for disk in tpu_node.data_disks if disk.source_disk != source_disk
]
update_node_request = tpu_v2.UpdateNodeRequest(
node=tpu_node,
update_mask="dataDisks",
)
operation = self.tpu_client.update_node(update_node_request)
gcp_resources.wait_for_operation(operation, "persistent disk detachment")
else:
operation = self.instances_client.detach_disk(
project=self.config.project_id,
zone=get_or_error(volume.provisioning_data).availability_zone,
instance=instance_id,
device_name=get_or_error(volume.attachment_data).device_name,
)
gcp_resources.wait_for_extended_operation(operation, "persistent disk detachment")
logger.debug(
"Detached persistent disk for volume %s from instance %s",
volume.volume_id,
instance_id,
)
gcp_resources.wait_for_extended_operation(operation, "persistent disk detachment")


def _get_vpc_subnet(
Expand Down Expand Up @@ -729,3 +810,21 @@ def _get_volume_price(size: int) -> float:
# https://cloud.google.com/compute/disks-image-pricing#persistentdisk
# The price is different in different regions. Take max across supported regions.
return size * 0.12


def _get_tpu_data_disks(
project_id: str, volumes: Optional[List[Volume]]
) -> List[tpu_v2.AttachedDisk]:
if volumes is None:
return []
return [_get_tpu_data_disk_for_volume(project_id, volume) for volume in volumes]


def _get_tpu_data_disk_for_volume(project_id: str, volume: Volume) -> tpu_v2.AttachedDisk:
zone = get_or_error(volume.provisioning_data).availability_zone
source_disk = f"projects/{project_id}/zones/{zone}/disks/{volume.volume_id}"
attached_disk = tpu_v2.AttachedDisk(
source_disk=source_disk,
mode=tpu_v2.AttachedDisk.DiskMode.READ_WRITE,
)
return attached_disk
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/backends/gcp/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def create_tpu_node_struct(
subnetwork: Optional[str] = None,
allocate_public_ip: bool = True,
service_account: Optional[str] = None,
data_disks: Optional[List[tpu_v2.AttachedDisk]] = None,
) -> tpu_v2.Node:
node = tpu_v2.Node()
if spot:
Expand All @@ -388,6 +389,9 @@ def create_tpu_node_struct(
email=service_account,
scope=["https://www.googleapis.com/auth/cloud-platform"],
)
if data_disks is not None:
for disk in data_disks:
node.data_disks.append(disk)
return node


Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import CoreModel
from dstack._internal.core.models.envs import Env
from dstack._internal.core.models.volumes import Volume
from dstack._internal.utils.common import pretty_resources


Expand Down Expand Up @@ -103,6 +104,7 @@ class InstanceConfiguration(CoreModel):
availability_zone: Optional[str] = None
placement_group_name: Optional[str] = None
reservation: Optional[str] = None
volumes: Optional[List[Volume]] = None

def get_public_keys(self) -> List[str]:
return [ssh_key.public.strip() for ssh_key in self.ssh_keys]
Expand Down
Loading