Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that _external_ids keys are strings #369

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions pulsar/managers/base/external.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging
from string import Template
from typing import (
Dict,
Any,
)

from pulsar.managers import status
from .directory import DirectoryBaseManager
Expand All @@ -18,7 +22,7 @@ class ExternalBaseManager(DirectoryBaseManager):

def __init__(self, name, app, **kwds):
super().__init__(name, app, **kwds)
self._external_ids = {}
self._external_ids: Dict[str, Any] = {}
self.job_name_template = kwds.get('job_name_template', DEFAULT_JOB_NAME_TEMPLATE)

def clean(self, job_id):
Expand Down Expand Up @@ -46,11 +50,11 @@ def _register_external_id(self, job_id, external_id):
if isinstance(external_id, bytes):
external_id = external_id.decode("utf-8")
self._job_directory(job_id).store_metadata(JOB_FILE_EXTERNAL_ID, external_id)
self._external_ids[job_id] = external_id
self._external_ids[str(job_id)] = external_id
return external_id

def _external_id(self, job_id):
return self._external_ids.get(job_id, None)
return self._external_ids.get(str(job_id), None)

def _job_name(self, job_id):
env = self._job_template_env(job_id)
Expand All @@ -59,9 +63,9 @@ def _job_name(self, job_id):
def _recover_active_job(self, job_id):
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
self._external_ids[job_id] = external_id
self._external_ids[str(job_id)] = external_id
else:
raise Exception("Could not determine external ID for job_id [%s]" % job_id)

def _deactivate_job(self, job_id):
del self._external_ids[job_id]
del self._external_ids[str(job_id)]
Loading