diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 4543c9ef0..814ed1eca 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -75,9 +75,6 @@ fleet_model_to_fleet, get_create_instance_offers, ) -from dstack._internal.server.services.jobs import ( - terminate_job_provisioning_data_instance, -) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.placement import ( get_fleet_placement_groups, @@ -160,9 +157,7 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): and instance.termination_policy == TerminationPolicy.DESTROY_AFTER_IDLE and instance.job_id is None ): - # terminates the instance and sets instance.status to TERMINATED (along other fields) - # if termination_idle_time is reached, noop otherwise - await _maybe_terminate_idle_instance(instance) + await _mark_terminating_if_idle_duration_expired(instance) if instance.status == InstanceStatus.PENDING: if instance.remote_connection_info is not None: await _add_remote(instance) @@ -184,34 +179,20 @@ async def _process_instance(session: AsyncSession, instance: InstanceModel): await session.commit() -async def _maybe_terminate_idle_instance(instance: InstanceModel): - current_time = get_current_datetime() +async def _mark_terminating_if_idle_duration_expired(instance: InstanceModel): idle_duration = _get_instance_idle_duration(instance) idle_seconds = instance.termination_idle_time delta = datetime.timedelta(seconds=idle_seconds) if idle_duration > delta: - jpd = get_instance_provisioning_data(instance) - if jpd is None: - logger.error( - "Failed to terminate idle instance %s. provisioning_data is None.", - instance.name, - ) - else: - await terminate_job_provisioning_data_instance( - project=instance.project, job_provisioning_data=jpd - ) - instance.deleted = True - instance.deleted_at = current_time - instance.finished_at = current_time - instance.status = InstanceStatus.TERMINATED + instance.status = InstanceStatus.TERMINATING instance.termination_reason = "Idle timeout" logger.info( - "Instance %s terminated by termination policy: idle time %ss", + "Instance %s idle duration expired: idle time %ss. Terminating", instance.name, str(idle_duration.seconds), extra={ "instance_name": instance.name, - "instance_status": InstanceStatus.TERMINATED.value, + "instance_status": instance.status.value, }, ) @@ -792,9 +773,12 @@ async def _terminate(instance: InstanceModel) -> None: ) if backend is None: logger.error( - "Failed to terminate instance %s. Backend not available.", instance.name + "Failed to terminate instance %s. Backend %s not available.", + instance.name, + jpd.backend, ) else: + logger.debug("Terminating runner instance %s", jpd.hostname) try: await run_async( backend.compute().terminate_instance, diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index c2cf5ee80..8452a736d 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -126,28 +126,6 @@ def get_job_provisioning_data(job_model: JobModel) -> Optional[JobProvisioningDa return JobProvisioningData.__response__.parse_raw(job_model.job_provisioning_data) -async def terminate_job_provisioning_data_instance( - project: ProjectModel, job_provisioning_data: JobProvisioningData -): - backend = await get_project_backend_by_type( - project=project, - backend_type=job_provisioning_data.backend, - ) - if backend is None: - logger.error( - "Failed to terminate the instance. " - f"Backend {job_provisioning_data.backend} is not configured in project {project.name}." - ) - return - logger.debug("Terminating runner instance %s", job_provisioning_data.hostname) - await run_async( - backend.compute().terminate_instance, - job_provisioning_data.instance_id, - job_provisioning_data.region, - job_provisioning_data.backend_data, - ) - - def delay_job_instance_termination(job_model: JobModel): job_model.remove_at = get_current_datetime() + datetime.timedelta(seconds=15) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index e71bcec0f..16fd2319c 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -299,10 +299,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): instance.termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE instance.last_job_processed_at = get_current_datetime() + dt.timedelta(minutes=-19) await session.commit() - with patch( - "dstack._internal.server.background.tasks.process_instances.terminate_job_provisioning_data_instance" - ): - await process_instances() + await process_instances() await session.refresh(instance) assert instance is not None assert instance.status == InstanceStatus.TERMINATED