Skip to content

Commit

Permalink
Refactor idle instance termination (#2188)
Browse files Browse the repository at this point in the history
This commit removes code duplication between
`process_instances._terminate` and
`process_instances._maybe_terminate_idle_instance`
  • Loading branch information
jvstme authored Jan 15, 2025
1 parent cbe58a3 commit d1f8cfe
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 51 deletions.
34 changes: 9 additions & 25 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@
fleet_model_to_fleet,
get_create_instance_offers,
)
from dstack._internal.server.services.jobs import (
terminate_job_provisioning_data_instance,
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.placement import (
get_fleet_placement_groups,
Expand Down Expand Up @@ -160,9 +157,7 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
and instance.termination_policy == TerminationPolicy.DESTROY_AFTER_IDLE
and instance.job_id is None
):
# terminates the instance and sets instance.status to TERMINATED (along other fields)
# if termination_idle_time is reached, noop otherwise
await _maybe_terminate_idle_instance(instance)
await _mark_terminating_if_idle_duration_expired(instance)
if instance.status == InstanceStatus.PENDING:
if instance.remote_connection_info is not None:
await _add_remote(instance)
Expand All @@ -184,34 +179,20 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel):
await session.commit()


async def _maybe_terminate_idle_instance(instance: InstanceModel):
current_time = get_current_datetime()
async def _mark_terminating_if_idle_duration_expired(instance: InstanceModel):
idle_duration = _get_instance_idle_duration(instance)
idle_seconds = instance.termination_idle_time
delta = datetime.timedelta(seconds=idle_seconds)
if idle_duration > delta:
jpd = get_instance_provisioning_data(instance)
if jpd is None:
logger.error(
"Failed to terminate idle instance %s. provisioning_data is None.",
instance.name,
)
else:
await terminate_job_provisioning_data_instance(
project=instance.project, job_provisioning_data=jpd
)
instance.deleted = True
instance.deleted_at = current_time
instance.finished_at = current_time
instance.status = InstanceStatus.TERMINATED
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Idle timeout"
logger.info(
"Instance %s terminated by termination policy: idle time %ss",
"Instance %s idle duration expired: idle time %ss. Terminating",
instance.name,
str(idle_duration.seconds),
extra={
"instance_name": instance.name,
"instance_status": InstanceStatus.TERMINATED.value,
"instance_status": instance.status.value,
},
)

Expand Down Expand Up @@ -792,9 +773,12 @@ async def _terminate(instance: InstanceModel) -> None:
)
if backend is None:
logger.error(
"Failed to terminate instance %s. Backend not available.", instance.name
"Failed to terminate instance %s. Backend %s not available.",
instance.name,
jpd.backend,
)
else:
logger.debug("Terminating runner instance %s", jpd.hostname)
try:
await run_async(
backend.compute().terminate_instance,
Expand Down
22 changes: 0 additions & 22 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,6 @@ def get_job_provisioning_data(job_model: JobModel) -> Optional[JobProvisioningDa
return JobProvisioningData.__response__.parse_raw(job_model.job_provisioning_data)


async def terminate_job_provisioning_data_instance(
project: ProjectModel, job_provisioning_data: JobProvisioningData
):
backend = await get_project_backend_by_type(
project=project,
backend_type=job_provisioning_data.backend,
)
if backend is None:
logger.error(
"Failed to terminate the instance. "
f"Backend {job_provisioning_data.backend} is not configured in project {project.name}."
)
return
logger.debug("Terminating runner instance %s", job_provisioning_data.hostname)
await run_async(
backend.compute().terminate_instance,
job_provisioning_data.instance_id,
job_provisioning_data.region,
job_provisioning_data.backend_data,
)


def delay_job_instance_termination(job_model: JobModel):
job_model.remove_at = get_current_datetime() + datetime.timedelta(seconds=15)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession):
instance.termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE
instance.last_job_processed_at = get_current_datetime() + dt.timedelta(minutes=-19)
await session.commit()
with patch(
"dstack._internal.server.background.tasks.process_instances.terminate_job_provisioning_data_instance"
):
await process_instances()
await process_instances()
await session.refresh(instance)
assert instance is not None
assert instance.status == InstanceStatus.TERMINATED
Expand Down

0 comments on commit d1f8cfe

Please sign in to comment.