Skip to content

Commit

Permalink
Have streaming objects get values from RunnerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrews committed Oct 17, 2024
1 parent 1121854 commit 9be5cd1
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 83 deletions.
35 changes: 21 additions & 14 deletions src/ansible_runner/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseExecutionMode(Enum):

# Metadata string values
class MetaValues(Enum):
STREAMABLE = 'streamable'
TRANSMIT = 'transmit'


@dataclass
Expand All @@ -82,38 +82,38 @@ class BaseConfig:
# No other config objects make use of positional parameters, so this should be fine.
#
# Example use case: RunnerConfig("/tmp/demo", playbook="main.yml", ...)
private_data_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
private_data_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

artifact_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifact_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
check_job_event_data: bool = False
container_auth_data: dict[str, str] | None = None
container_image: str = ""
container_image: str | None = None
container_options: list[str] | None = None
container_volume_mounts: list[str] | None = None
container_workdir: str | None = None
envvars: dict[str, Any] | None = None
fact_cache: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
fact_cache: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
fact_cache_type: str = 'jsonfile'
host_cwd: str | None = None
ident: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
ident: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
json_mode: bool = False
keepalive_seconds: int | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
keepalive_seconds: int | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
passwords: dict[str, str] | None = None
process_isolation: bool = False
process_isolation_executable: str = defaults.default_process_isolation_executable
project_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
project_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
quiet: bool = False
rotate_artifacts: int = 0
settings: dict | None = None
ssh_key: str | None = None
suppress_env_files: bool = False
timeout: int | None = None

event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

_CONTAINER_ENGINES = ('docker', 'podman')

Expand All @@ -123,6 +123,8 @@ def __post_init__(self) -> None:
self.command: list[str] = []
self.registry_auth_path: str
self.container_name: str = "" # like other properties, not accurate until prepare is called
if self.container_image is None:
self.container_image = ''

# ignore this for now since it's worker-specific and would just trip up old runners
# self.keepalive_seconds = keepalive_seconds
Expand All @@ -139,6 +141,7 @@ def __post_init__(self) -> None:
raise ConfigurationError(f"Unable to create private_data_dir {self.private_data_dir}") from error
else:
self.private_data_dir = tempfile.mkdtemp(prefix=defaults.AUTO_CREATE_NAMING, dir=defaults.AUTO_CREATE_DIR)
register_for_cleanup(self.private_data_dir)

if self.artifact_dir is None:
self.artifact_dir = os.path.join(self.private_data_dir, 'artifacts')
Expand All @@ -147,7 +150,9 @@ def __post_init__(self) -> None:

if self.ident is None:
self.ident = str(uuid4())
self.ident_set_by_user = False
else:
self.ident_set_by_user = True
self.ident = str(self.ident)

self.artifact_dir = os.path.join(self.artifact_dir, self.ident)
Expand Down Expand Up @@ -185,7 +190,6 @@ def prepare_env(self, runner_mode: str = 'pexpect') -> None:
Manages reading environment metadata files under ``private_data_dir`` and merging/updating
with existing values so the :py:class:`ansible_runner.runner.Runner` object can read and use them easily
"""

if self.ident is None:
raise ConfigurationError("ident value cannot be None")
if self.artifact_dir is None:
Expand Down Expand Up @@ -520,6 +524,9 @@ def wrap_args_for_containerization(self,
if self.private_data_dir is None:
raise ConfigurationError("private_data_dir value cannot be None")

if self.container_image is None:
raise ConfigurationError("container_image value cannot be None")

new_args = [self.process_isolation_executable]
new_args.extend(['run', '--rm'])

Expand Down
17 changes: 13 additions & 4 deletions src/ansible_runner/config/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,27 @@ def extra_vars(self, value):
def streamable_attributes(self) -> dict[str, Any]:
"""Get the set of streamable attributes that have a value that is different from the default.
The field metadata indicates if the attribute is streamable. By default, an attribute
The field metadata indicates if the attribute is streamable from Transmit. By default, an attribute
is considered streamable (must be explicitly disabled).
:return: A dict of attribute names and their values.
"""
retval = {}
for field_obj in fields(self):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.STREAMABLE, True):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.TRANSMIT, True):
continue
current_value = getattr(self, field_obj.name)
if not field_obj.default == current_value:
retval[field_obj.name] = current_value

if field_obj.default == current_value:
continue

# Treat an empty current value (e.g., {} or "") as the same as a default of None to prevent
# streaming unnecessary empty values.
if field_obj.default is None and current_value in ({}, "", []):
continue

retval[field_obj.name] = current_value

return retval

def prepare(self):
Expand Down
10 changes: 3 additions & 7 deletions src/ansible_runner/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import sys
import threading
import logging
from dataclasses import asdict

from ansible_runner import output
from ansible_runner._internal._dump_artifacts import dump_artifacts
Expand Down Expand Up @@ -90,18 +89,15 @@ def init_runner(
config.cancel_callback = signal_handler()

if streamer == 'transmit':
kwargs = asdict(config)
stream_transmitter = Transmitter(only_transmit_kwargs, _output=_output, **kwargs)
stream_transmitter = Transmitter(config, only_transmit_kwargs, _output=_output)
return stream_transmitter

if streamer == 'worker':
kwargs = asdict(config)
stream_worker = Worker(_input=_input, _output=_output, **kwargs)
stream_worker = Worker(config, _input=_input, _output=_output)
return stream_worker

if streamer == 'process':
kwargs = asdict(config)
stream_processor = Processor(_input=_input, **kwargs)
stream_processor = Processor(config, _input=_input)
return stream_processor

if config.process_isolation:
Expand Down
59 changes: 23 additions & 36 deletions src/ansible_runner/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import stat
import sys
import tempfile
import uuid
import traceback

Expand All @@ -15,10 +14,10 @@
from typing import BinaryIO

import ansible_runner
from ansible_runner.config.runner import RunnerConfig
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.loader import ArtifactLoader
import ansible_runner.plugins
from ansible_runner.utils import register_for_cleanup
from ansible_runner.utils.streaming import stream_dir, unstream_dir


Expand All @@ -38,16 +37,14 @@ def __init__(self, settings):


class Transmitter:
def __init__(self, only_transmit_kwargs: bool, _output: BinaryIO | None, **kwargs):
def __init__(self, config: RunnerConfig, only_transmit_kwargs: bool = False, _output: BinaryIO | None = None):
if _output is None:
_output = sys.stdout.buffer
self._output = _output
self.private_data_dir = os.path.abspath(kwargs['private_data_dir'])
self.private_data_dir = os.path.abspath(config.private_data_dir) if config.private_data_dir else ""
self.only_transmit_kwargs = only_transmit_kwargs
if 'keepalive_seconds' in kwargs:
kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg

self.kwargs = kwargs
self.kwargs = config.streamable_attributes()

self.status = "unstarted"
self.rc = None
Expand All @@ -70,12 +67,13 @@ def run(self):


class Worker:
def __init__(self, _input=None, _output=None, keepalive_seconds: float | None = None, **kwargs):
def __init__(self, config: RunnerConfig, _input=None, _output=None):
if _input is None:
_input = sys.stdin.buffer
if _output is None:
_output = sys.stdout.buffer

keepalive_seconds: float | int | None = config.keepalive_seconds
if keepalive_seconds is None: # if we didn't get an explicit int value, fall back to envvar
# FIXME: emit/log a warning and silently continue if this value won't parse
keepalive_seconds = float(os.environ.get('ANSIBLE_RUNNER_KEEPALIVE_SECONDS', 0))
Expand All @@ -88,14 +86,10 @@ def __init__(self, _input=None, _output=None, keepalive_seconds: float | None =
self._input = _input
self._output = _output

self.kwargs = kwargs
self.kwargs = config.streamable_attributes()
self.job_kwargs = None

private_data_dir = kwargs.get('private_data_dir')
if private_data_dir is None:
private_data_dir = tempfile.mkdtemp()
register_for_cleanup(private_data_dir)
self.private_data_dir = private_data_dir
self.private_data_dir = config.private_data_dir

self.status = "unstarted"
self.rc = None
Expand Down Expand Up @@ -251,43 +245,36 @@ def finished_callback(self, runner_obj):


class Processor:
def __init__(self, _input=None, status_handler=None, event_handler=None,
artifacts_handler=None, cancel_callback=None, finished_callback=None, **kwargs):
def __init__(self, config: RunnerConfig, _input: BinaryIO | None = None):
if _input is None:
_input = sys.stdin.buffer
self._input = _input

self.quiet = kwargs.get('quiet')
self.quiet = config.quiet

private_data_dir = kwargs.get('private_data_dir')
if private_data_dir is None:
private_data_dir = tempfile.mkdtemp()
self.private_data_dir = private_data_dir
self.private_data_dir: str = config.private_data_dir or ''
self._loader = ArtifactLoader(self.private_data_dir)

settings = kwargs.get('settings')
settings = config.settings
if settings is None:
try:
settings = self._loader.load_file('env/settings', Mapping)
settings = self._loader.load_file('env/settings', Mapping) # type: ignore
except ConfigurationError:
settings = {}
self.config = MockConfig(settings)

if kwargs.get('artifact_dir'):
self.artifact_dir = os.path.abspath(kwargs.get('artifact_dir'))
else:
project_artifacts = os.path.abspath(os.path.join(self.private_data_dir, 'artifacts'))
if ident := kwargs.get('ident'):
self.artifact_dir = os.path.join(project_artifacts, str(ident))
else:
self.artifact_dir = project_artifacts
self.artifact_dir = config.artifact_dir
if self.artifact_dir and not config.ident_set_by_user:
# If an ident value was not explicitly supplied, for some reason, we don't bother with
# using a subdir named with the ident value.
self.artifact_dir, _ = os.path.split(self.artifact_dir)

self.status_handler = status_handler
self.event_handler = event_handler
self.artifacts_handler = artifacts_handler
self.status_handler = config.status_handler
self.event_handler = config.event_handler
self.artifacts_handler = config.artifacts_handler

self.cancel_callback = cancel_callback # FIXME: unused
self.finished_callback = finished_callback
self.cancel_callback = config.cancel_callback # FIXME: unused
self.finished_callback = config.finished_callback

self.status = "unstarted"
self.rc = None
Expand Down
Loading

0 comments on commit 9be5cd1

Please sign in to comment.