Skip to content

Commit

Permalink
Forbid deleting backends with active instances or volumes (#2131)
Browse files Browse the repository at this point in the history
* Log delete operations

* Forbid deleting backends with active instances or volumes

* Test backends deletion with active resources

* Fix log message
  • Loading branch information
r4victor authored Dec 23, 2024
1 parent 0101ba0 commit 63add29
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 7 deletions.
7 changes: 7 additions & 0 deletions src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,10 @@ def is_available(self) -> bool:
self.IDLE,
self.BUSY,
)

def is_active(self) -> bool:
return self not in self.finished_statuses()

@classmethod
def finished_statuses(cls) -> List["InstanceStatus"]:
return [cls.TERMINATING, cls.TERMINATED]
7 changes: 7 additions & 0 deletions src/dstack/_internal/core/models/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ class VolumeStatus(str, Enum):
ACTIVE = "active"
FAILED = "failed"

def is_active(self) -> bool:
return self not in self.finished_statuses()

@classmethod
def finished_statuses(cls) -> List["VolumeStatus"]:
return [cls.FAILED]


class VolumeConfiguration(CoreModel):
type: Literal["volume"] = "volume"
Expand Down
5 changes: 3 additions & 2 deletions src/dstack/_internal/server/routers/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from dstack._internal.server.security.permissions import Authenticated, ProjectAdmin
from dstack._internal.server.services import backends
from dstack._internal.server.services.backends import handlers as backends_handlers
from dstack._internal.server.services.config import (
ServerConfigManager,
create_backend_config_yaml,
Expand Down Expand Up @@ -87,8 +88,8 @@ async def delete_backends(
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectAdmin()),
):
_, project = user_project
await backends.delete_backends(
session=session, project=project, backends_types=body.backends_names
await backends_handlers.delete_backends_safe(
session=session, project=project, backends_types=body.backends_names, error=True
)
if settings.SERVER_CONFIG_ENABLED:
await ServerConfigManager().sync_config(session=session)
Expand Down
15 changes: 12 additions & 3 deletions src/dstack/_internal/server/services/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import heapq
from typing import Callable, Coroutine, Dict, Iterable, List, Optional, Tuple, Type, Union
from typing import Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union
from uuid import UUID

from sqlalchemy import delete, update
Expand Down Expand Up @@ -229,16 +229,25 @@ async def get_config_info(
async def delete_backends(
session: AsyncSession,
project: ProjectModel,
backends_types: Iterable[BackendType],
backends_types: List[BackendType],
):
if BackendType.DSTACK in backends_types:
raise ServerClientError("Cannot delete dstack backend")
current_backends_types = set(b.type for b in project.backends)
deleted_backends_types = current_backends_types.intersection(backends_types)
if len(deleted_backends_types) == 0:
return
await session.execute(
delete(BackendModel).where(
BackendModel.type.in_(backends_types),
BackendModel.type.in_(deleted_backends_types),
BackendModel.project_id == project.id,
)
)
logger.info(
"Deleted backends %s in project %s",
[b.value for b in deleted_backends_types],
project.name,
)


BackendTuple = Tuple[BackendModel, Backend]
Expand Down
98 changes: 98 additions & 0 deletions src/dstack/_internal/server/services/backends/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import List

from sqlalchemy.ext.asyncio import AsyncSession

from dstack._internal.core.errors import ServerClientError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.server.models import ProjectModel
from dstack._internal.server.services.backends import delete_backends
from dstack._internal.server.services.fleets import list_project_fleet_models
from dstack._internal.server.services.volumes import list_project_volumes
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)


async def delete_backends_safe(
session: AsyncSession,
project: ProjectModel,
backends_types: List[BackendType],
error: bool = True,
):
try:
await _check_active_instances(
session=session,
project=project,
backends_types=backends_types,
error=error,
)
await _check_active_volumes(
session=session,
project=project,
backends_types=backends_types,
error=error,
)
except ServerClientError as e:
if error:
raise
logger.warning("%s", e.msg)
await delete_backends(
session=session,
project=project,
backends_types=backends_types,
)


async def _check_active_instances(
session: AsyncSession,
project: ProjectModel,
backends_types: List[BackendType],
error: bool,
):
fleet_models = await list_project_fleet_models(
session=session,
project=project,
)
for fleet_model in fleet_models:
for instance in fleet_model.instances:
if instance.status.is_active() and instance.backend in backends_types:
if error:
msg = (
f"Backend {instance.backend.value} has active instances."
" Delete instances before deleting the backend."
)
else:
msg = (
f"Backend {instance.backend.value} has active instances."
" The backend will be deleted but instances may be left hanging."
)
raise ServerClientError(msg)


async def _check_active_volumes(
session: AsyncSession,
project: ProjectModel,
backends_types: List[BackendType],
error: bool,
):
volume_models = await list_project_volumes(
session=session,
project=project,
)
for volume_model in volume_models:
if (
volume_model.status.is_active()
and volume_model.provisioning_data is not None
and volume_model.provisioning_data.backend in backends_types
):
if error:
msg = (
f"Backend {volume_model.provisioning_data.backend.value} has active volumes."
" Delete volumes before deleting the backend."
)
else:
msg = (
f"Backend {volume_model.provisioning_data.backend.value} has active volumes."
" The backend will be deleted but volumes may be left hanging."
)
raise ServerClientError(msg)
8 changes: 6 additions & 2 deletions src/dstack/_internal/server/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import encryption as encryption_services
from dstack._internal.server.services import projects as projects_services
from dstack._internal.server.services.backends.handlers import delete_backends_safe
from dstack._internal.server.services.encryption import AnyEncryptionKeyConfig
from dstack._internal.server.services.permissions import (
DefaultPermissions,
Expand Down Expand Up @@ -595,8 +596,11 @@ async def _apply_project_config(
)
except Exception as e:
logger.warning("Failed to configure backend %s: %s", config_info.type, e)
await backends_services.delete_backends(
session=session, project=project, backends_types=backends_to_delete
await delete_backends_safe(
session=session,
project=project,
backends_types=list(backends_to_delete),
error=False,
)

async def _init_config(
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/_internal/server/services/repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
)
from dstack._internal.server.services.storage import get_default_storage
from dstack._internal.utils.common import run_async
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)


async def list_repos(
Expand Down Expand Up @@ -170,6 +173,7 @@ async def delete_repos(
delete(RepoModel).where(RepoModel.project_id == project.id, RepoModel.name.in_(repos_ids))
)
await session.commit()
logger.info("Deleted repos %s in project %s", repos_ids, project.name)


async def get_repo_creds(
Expand Down Expand Up @@ -263,6 +267,7 @@ async def delete_repo_creds(
)
)
await session.commit()
logger.info("Deleted repo creds for repo %s user %s", repo.name, user.name)


async def upload_code(
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/services/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def delete_users(
):
await session.execute(delete(UserModel).where(UserModel.name.in_(usernames)))
await session.commit()
logger.info("Deleted users %s by user %s", usernames, user.name)


async def get_user_model_by_name(
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ async def create_volume(
else None,
instances=[],
deleted_at=deleted_at,
deleted=True if deleted_at else False,
)
session.add(vm)
await session.commit()
Expand Down Expand Up @@ -641,8 +642,10 @@ def get_volume_provisioning_data(
size_gb: int = 100,
availability_zone: Optional[str] = None,
backend_data: Optional[str] = None,
backend: Optional[BackendType] = None,
) -> VolumeProvisioningData:
return VolumeProvisioningData(
backend=backend,
volume_id=volume_id,
size_gb=size_gb,
availability_zone=availability_zone,
Expand Down
105 changes: 105 additions & 0 deletions src/tests/_internal/server/routers/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from datetime import datetime, timezone
from operator import itemgetter
from unittest.mock import Mock, patch

Expand All @@ -11,14 +12,21 @@
from dstack._internal.core.backends.oci import region as oci_region
from dstack._internal.core.errors import BackendAuthError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.core.models.users import GlobalRole, ProjectRole
from dstack._internal.core.models.volumes import VolumeStatus
from dstack._internal.server.models import BackendModel
from dstack._internal.server.services.projects import add_project_member
from dstack._internal.server.testing.common import (
create_backend,
create_fleet,
create_instance,
create_pool,
create_project,
create_user,
create_volume,
get_auth_headers,
get_volume_provisioning_data,
)

FAKE_OCI_CLIENT_CREDS = {
Expand Down Expand Up @@ -1189,6 +1197,103 @@ async def test_deletes_backends(self, test_db, session: AsyncSession, client: As
res = await session.execute(select(BackendModel))
assert len(res.scalars().all()) == 0

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_returns_400_if_backend_has_active_instances(
self, test_db, session: AsyncSession, client: AsyncClient
):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.ADMIN
)
backend = await create_backend(session=session, project_id=project.id)
pool = await create_pool(session=session, project=project)
fleet = await create_fleet(session=session, project=project)
instance1 = await create_instance(
session=session,
project=project,
pool=pool,
status=InstanceStatus.TERMINATED,
backend=backend.type,
)
instance2 = await create_instance(
session=session,
project=project,
pool=pool,
status=InstanceStatus.IDLE,
backend=backend.type,
)
fleet.instances.append(instance1)
fleet.instances.append(instance2)
await session.commit()
response = await client.post(
f"/api/project/{project.name}/backends/delete",
headers=get_auth_headers(user.token),
json={"backends_names": [backend.type.value]},
)
assert response.status_code == 400
res = await session.execute(select(BackendModel))
assert len(res.scalars().all()) == 1
fleet.instances.pop()
await session.commit()
response = await client.post(
f"/api/project/{project.name}/backends/delete",
headers=get_auth_headers(user.token),
json={"backends_names": [backend.type.value]},
)
assert response.status_code == 200
res = await session.execute(select(BackendModel))
assert len(res.scalars().all()) == 0

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_returns_400_if_backend_has_active_volumes(
self, test_db, session: AsyncSession, client: AsyncClient
):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.ADMIN
)
backend = await create_backend(session=session, project_id=project.id)
await create_volume(
session=session,
project=project,
user=user,
backend=backend.type,
volume_provisioning_data=get_volume_provisioning_data(backend=backend.type),
status=VolumeStatus.ACTIVE,
deleted_at=datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc),
)
volume2 = await create_volume(
session=session,
project=project,
user=user,
backend=backend.type,
volume_provisioning_data=get_volume_provisioning_data(backend=backend.type),
status=VolumeStatus.ACTIVE,
)
await session.commit()
response = await client.post(
f"/api/project/{project.name}/backends/delete",
headers=get_auth_headers(user.token),
json={"backends_names": [backend.type.value]},
)
assert response.status_code == 400
res = await session.execute(select(BackendModel))
assert len(res.scalars().all()) == 1
await session.delete(volume2)
await session.commit()
response = await client.post(
f"/api/project/{project.name}/backends/delete",
headers=get_auth_headers(user.token),
json={"backends_names": [backend.type.value]},
)
assert response.status_code == 200
res = await session.execute(select(BackendModel))
assert len(res.scalars().all()) == 0


class TestGetConfigInfo:
@pytest.mark.asyncio
Expand Down

0 comments on commit 63add29

Please sign in to comment.