diff --git a/pulsar/client/client.py b/pulsar/client/client.py index a214b1dd..6c4eb05e 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -536,8 +536,7 @@ def launch( } } spec = {"template": template} - if "k8s_walltime_limit" in params: - spec["activeDeadlineSeconds"] = int(params["k8s_walltime_limit"]) + spec.update(self._job_spec_params(params)) k8s_job_obj = job_object_dict(params, k8s_job_prefix, spec) pykube_client = self._pykube_client job = Job(pykube_client, k8s_job_obj) @@ -578,6 +577,14 @@ def _k8s_job_prefix(self): job_prefix = produce_k8s_job_prefix(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) return job_prefix + def _job_spec_params(self, params): + spec = {} + if "k8s_walltime_limit" in params: + spec["activeDeadlineSeconds"] = int(params["k8s_walltime_limit"]) + if "k8s_job_ttl_secs_after_finished" in params and params.get("k8s_cleanup_job") != "never": + spec["ttlSecondsAfterFinished"] = int(params["k8s_job_ttl_secs_after_finished"]) + return spec + def _pulsar_container_resources(self, params): return self._container_resources(params, container='pulsar')