Skip to content

Commit

Permalink
Update init_job for branch next-2.0 dispatch.user feature for user …
Browse files Browse the repository at this point in the history
…permission mapping

Add get_jobs class method which returns table of jobs, which can be filtered
Update queries to use restrist filter for user
  • Loading branch information
meganerd committed Sep 25, 2023
1 parent 1878676 commit 4bcb58c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions changes/224.added
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Add init_job Nautobot subcommand, which starts a Nautobot job by job name.
Add get_jobs Nautobot subcommand, which gets all Nautobot jobs or a filtered list of jobs.
45 changes: 40 additions & 5 deletions nautobot_chatops/workers/nautobot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,18 +1023,53 @@ def get_circuit_providers(dispatcher, *args):
return CommandStatusChoices.STATUS_SUCCEEDED


@subcommand_of("nautobot")
def get_jobs(dispatcher, job_filters: str = ""): # We can use a Literal["enabled", "installed", "runnable"] here instead
"""Get jobs from Nautobot.
Args:
job_filters (str):
"""
# Check for filters in user supplied input
filters = ["enabled", "installed", "runnable"]
if any([key in job_filters for key in filters]):
filter_args = {key: job_filters[key] for key in filters if key in job_filters}
jobs = Job.objects.restrict(dispatch.user, "view").filter(**filter_args) # enabled=True, installed=True, runnable=True
else:
jobs = Job.objects.restrict(dispatch.user, "view").all()

header = ["Name", "ID"]
rows = [
(
str(job.name),
str(job.id),
)
for job in jobs
]

dispatcher.send_large_table(header, rows)

return CommandStatusChoices.STATUS_SUCCEEDED


@subcommand_of("nautobot")
def init_job(dispatcher, job_name):
"""Initiate a job in Nautobot by job name."""
# Replace this with user mapping
job_username = "meganerd"

# Get instance of the user who will run the job
user = get_user_model()
user_instance = user.objects.get(username=job_username)
try:
user_instance = user.objects.get(username=dispatch.user)
except user.DoesNotExist: # Unsure if we need to check this case?
dispatcher.send_error(f"User {dispatch.user} not found")
return (CommandStatusChoices.STATUS_FAILED, f'User "{dispatch.user}" not found')

# Get the job model instance using job name
job_model = Job.objects.get(name=job_name)
try:
job_model = Job.objects.restrict(dispatch.user, "view").get(name=job_name)
except Job.DoesNotExist:
dispatcher.send_error(f"Job {job_name} not found")
return (CommandStatusChoices.STATUS_FAILED, f'Job "{job_name}" not found')

job_class_path = job_model.class_path

# Create an instance of job result
Expand Down

0 comments on commit 4bcb58c

Please sign in to comment.