diff --git a/doc/changes/changes_2.0.0.md b/doc/changes/changes_2.0.0.md index c2ddd204f..39f24af6f 100644 --- a/doc/changes/changes_2.0.0.md +++ b/doc/changes/changes_2.0.0.md @@ -29,3 +29,4 @@ If you need further versions, please open an issue. * #308: Unified ports for database, BucketFS, and SSH * #322: Added additional tests for environment variable LOG_ENV_VARIABLE_NAME * #359: Fixed custom logging path not working if dir does not exist. +* #304: Create SSH access replacements for calls to `docker.exec_run()` diff --git a/exasol_integration_test_docker_environment/lib/api/spawn_test_environment.py b/exasol_integration_test_docker_environment/lib/api/spawn_test_environment.py index 08e8ca620..d141473a4 100644 --- a/exasol_integration_test_docker_environment/lib/api/spawn_test_environment.py +++ b/exasol_integration_test_docker_environment/lib/api/spawn_test_environment.py @@ -71,6 +71,7 @@ def spawn_test_environment( """ def str_or_none(x: any) -> str: return str(x) if x is not None else None + parsed_db_mem_size = humanfriendly.parse_size(db_mem_size) if parsed_db_mem_size < humanfriendly.parse_size("1 GiB"): raise ArgumentConstraintError("db_mem_size", "needs to be at least 1 GiB") diff --git a/exasol_integration_test_docker_environment/lib/base/db_os_executor.py b/exasol_integration_test_docker_environment/lib/base/db_os_executor.py new file mode 100644 index 000000000..7d13a7d7c --- /dev/null +++ b/exasol_integration_test_docker_environment/lib/base/db_os_executor.py @@ -0,0 +1,142 @@ +from abc import abstractmethod +import fabric +import docker +from docker import DockerClient +from typing import Protocol, runtime_checkable +from docker.models.containers import Container, ExecResult +from exasol_integration_test_docker_environment \ + .lib.base.ssh_access import SshKey +from exasol_integration_test_docker_environment \ + .lib.data.database_info import DatabaseInfo +from exasol_integration_test_docker_environment.lib.docker \ + import ContextDockerClient + + +class DockerClientFactory: + """ + Create a Docker client. + """ + def __init__(self, timeout: int = 100000): + self._timeout = timeout + + def client(self) -> DockerClient: + with ContextDockerClient(timeout=self._timeout) as client: + return client + + +# Avoid TypeError: Instance and class checks can only be +# used with @runtime_checkable protocols +# raised by unit tests +@runtime_checkable +class DbOsExecutor(Protocol): + """ + This class provides an abstraction to execute operating system + commands on the database host, e.g. inside a Docker Container. See + concrete implementations in sub-classes ``DockerExecutor`` and + ``SshExecutor``. + """ + @abstractmethod + def exec(self, cmd: str) -> ExecResult: + ... + + +class DockerExecutor(DbOsExecutor): + def __init__(self, docker_client: DockerClient, container_name: str): + self._client = docker_client + self._container_name = container_name + self._container = None + + def __enter__(self): + self._container = self._client.containers.get(self._container_name) + return self + + def __exit__(self, type_, value, traceback): + self.close() + + def __del__(self): + self.close() + + def exec(self, cmd: str) -> ExecResult: + return self._container.exec_run(cmd) + + def close(self): + self._container = None + if self._client is not None: + self._client.close() + self._client = None + + +class SshExecutor(DbOsExecutor): + def __init__(self, connect_string: str, key_file: str): + self._connect_string = connect_string + self._key_file = key_file + self._connection = None + + def __enter__(self): + key = SshKey.read_from(self._key_file) + self._connection = fabric.Connection( + self._connect_string, + connect_kwargs={ "pkey": key.private }, + ) + return self + + def __exit__(self, type_, value, traceback): + self.close() + + def __del__(self): + self.close() + + def exec(self, cmd: str) -> ExecResult: + result = self._connection.run(cmd) + output = result.stdout.encode("utf-8") + return ExecResult(result.exited, output) + + def close(self): + if self._connection is not None: + self._connection.close() + self._connection = None + + +# Avoid TypeError: Instance and class checks can only be +# used with @runtime_checkable protocols +# raised by integration tests +@runtime_checkable +class DbOsExecFactory(Protocol): + """ + This class defines an abstract method ``executor()`` to be implemented by + inheriting factories. + """ + + @abstractmethod + def executor(self) -> DbOsExecutor: + """ + Create an executor for executing commands inside of the operating + system of the database. + """ + ... + + +class DockerExecFactory(DbOsExecFactory): + def __init__(self, container_name: str, client_factory: DockerClientFactory): + self._container_name = container_name + self._client_factory = client_factory + + def executor(self) -> DbOsExecutor: + client = self._client_factory.client() + return DockerExecutor(client, self._container_name) + + +class SshExecFactory(DbOsExecFactory): + @classmethod + def from_database_info(cls, info: DatabaseInfo): + return SshExecFactory( + f"{info.ssh_info.user}@{info.host}:{info.ports.ssh}", + info.ssh_info.key_file, + ) + + def __init__(self, connect_string: str, ssh_key_file: str): + self._connect_string = connect_string + self._key_file = ssh_key_file + + def executor(self) -> DbOsExecutor: + return SshExecutor(self._connect_string, self._key_file) diff --git a/exasol_integration_test_docker_environment/lib/test_environment/abstract_spawn_test_environment.py b/exasol_integration_test_docker_environment/lib/test_environment/abstract_spawn_test_environment.py index 1a3373d3b..e50db2b1b 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/abstract_spawn_test_environment.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/abstract_spawn_test_environment.py @@ -144,24 +144,33 @@ def _create_network(self, attempt): def create_network_task(self, attempt: int): raise AbstractMethodException() - def _spawn_database_and_test_container(self, - network_info: DockerNetworkInfo, - certificate_volume_info: Optional[DockerVolumeInfo], - attempt: int) -> Tuple[DatabaseInfo, Optional[ContainerInfo]]: - certificate_volume_name = certificate_volume_info.volume_name if certificate_volume_info is not None else None - dependencies_tasks = { - DATABASE: self.create_spawn_database_task(network_info, certificate_volume_info, attempt) - } - if self.test_container_content is not None: - dependencies_tasks[TEST_CONTAINER] = \ - self.create_spawn_test_container_task(network_info, certificate_volume_name, attempt) - database_and_test_container_info_future = yield from self.run_dependencies(dependencies_tasks) - database_and_test_container_info = \ - self.get_values_from_futures(database_and_test_container_info_future) - test_container_info = None + def _spawn_database_and_test_container( + self, + network_info: DockerNetworkInfo, + certificate_volume_info: Optional[DockerVolumeInfo], + attempt: int, + ) -> Tuple[DatabaseInfo, Optional[ContainerInfo]]: + def volume_name(info): + return None if info is None else info.volume_name + + child_tasks = { + DATABASE: self.create_spawn_database_task( + network_info, + certificate_volume_info, + attempt, + ) + } if self.test_container_content is not None: - test_container_info = database_and_test_container_info[TEST_CONTAINER] - database_info = database_and_test_container_info[DATABASE] + certificate_volume_name = volume_name(certificate_volume_info) + child_tasks[TEST_CONTAINER] = self.create_spawn_test_container_task( + network_info, + certificate_volume_name, + attempt, + ) + futures = yield from self.run_dependencies(child_tasks) + results = self.get_values_from_futures(futures) + database_info = results[DATABASE] + test_container_info = results[TEST_CONTAINER] if self.test_container_content is not None else None return database_info, test_container_info def create_spawn_database_task(self, diff --git a/exasol_integration_test_docker_environment/lib/test_environment/create_certificates/create_ssl_certificates_task.py b/exasol_integration_test_docker_environment/lib/test_environment/create_certificates/create_ssl_certificates_task.py index 2aaac81fe..032691a9c 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/create_certificates/create_ssl_certificates_task.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/create_certificates/create_ssl_certificates_task.py @@ -112,26 +112,27 @@ def create_certificate(self, image_infos: Dict[str, ImageInfo]) -> None: with self._get_docker_client() as docker_client: try: - test_container = \ - docker_client.containers.create( - image=certificate_container_image_info.get_target_complete_name(), - name="certificate_resources", - network_mode=None, - command="sleep infinity", - detach=True, - volumes=volumes, - labels={"test_environment_name": self.environment_name, - "container_type": "certificate_resources"}, - runtime=self.docker_runtime - ) - test_container.start() + container = docker_client.containers.create( + image=certificate_container_image_info.get_target_complete_name(), + name="certificate_resources", + network_mode=None, + command="sleep infinity", + detach=True, + volumes=volumes, + labels={ + "test_environment_name": self.environment_name, + "container_type": "certificate_resources", + }, + runtime=self.docker_runtime + ) + container.start() self.logger.info("Creating certificates...") cmd = f"bash /scripts/create_certificates.sh " \ f"{self._construct_complete_host_name} {CERTIFICATES_MOUNT_PATH}" - exit_code, output = test_container.exec_run(cmd) + exit_code, output = container.exec_run(cmd) self.logger.info(output.decode('utf-8')) if exit_code != 0: raise RuntimeError(f"Error creating certificates:'{output.decode('utf-8')}'") finally: - test_container.stop() - test_container.remove() + container.stop() + container.remove() diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/docker_db_log_based_bucket_sync_checker.py b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/docker_db_log_based_bucket_sync_checker.py index 17588900d..dcd8511ba 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/docker_db_log_based_bucket_sync_checker.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/docker_db_log_based_bucket_sync_checker.py @@ -3,8 +3,11 @@ from docker.models.containers import Container -from exasol_integration_test_docker_environment.lib.test_environment.database_setup.bucketfs_sync_checker import \ - BucketFSSyncChecker +from exasol_integration_test_docker_environment \ + .lib.test_environment.database_setup.bucketfs_sync_checker \ + import BucketFSSyncChecker +from exasol_integration_test_docker_environment \ + .lib.base.db_os_executor import DbOsExecutor class DockerDBLogBasedBucketFSSyncChecker(BucketFSSyncChecker): @@ -13,12 +16,14 @@ def __init__(self, logger, database_container: Container, pattern_to_wait_for: str, log_file_to_check: str, - bucketfs_write_password: str): + bucketfs_write_password: str, + executor: DbOsExecutor): self.logger = logger self.pattern_to_wait_for = pattern_to_wait_for self.log_file_to_check = log_file_to_check self.database_container = database_container self.bucketfs_write_password = bucketfs_write_password + self.executor = executor def prepare_upload(self): self.start_exit_code, self.start_output = self.find_pattern_in_logfile() diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/find_exaplus_in_db_container.py b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/find_exaplus_in_db_container.py index d3342c2fa..91e84c4fe 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/find_exaplus_in_db_container.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/find_exaplus_in_db_container.py @@ -1,9 +1,14 @@ from pathlib import PurePath -import docker.models.containers +import docker +from exasol_integration_test_docker_environment.lib.base.db_os_executor \ + import DbOsExecutor -def find_exaplus(db_container: docker.models.containers.Container) -> PurePath: +def find_exaplus( + db_container: docker.models.containers.Container, + os_executor: DbOsExecutor, +) -> PurePath: """ Tries to find path of exaplus in given container in directories where exaplus is normally installed. :db_container Container where to search for exaplus diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/upload_file_to_db.py b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/upload_file_to_db.py index d7c06e12e..2af0f5442 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_setup/upload_file_to_db.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_setup/upload_file_to_db.py @@ -15,11 +15,19 @@ from exasol_integration_test_docker_environment.lib.base.json_pickle_parameter import JsonPickleParameter from exasol_integration_test_docker_environment.lib.base.still_running_logger import StillRunningLogger, \ StillRunningLoggerThread -from exasol_integration_test_docker_environment.lib.data.environment_info import EnvironmentInfo -from exasol_integration_test_docker_environment.lib.test_environment.database_setup.docker_db_log_based_bucket_sync_checker import \ - DockerDBLogBasedBucketFSSyncChecker -from exasol_integration_test_docker_environment.lib.test_environment.database_setup.time_based_bucketfs_sync_waiter import \ - TimeBasedBucketFSSyncWaiter +from exasol_integration_test_docker_environment.lib.data.environment_info \ + import EnvironmentInfo +from exasol_integration_test_docker_environment \ + .lib.test_environment.database_setup.docker_db_log_based_bucket_sync_checker \ + import DockerDBLogBasedBucketFSSyncChecker +from exasol_integration_test_docker_environment \ + .lib.test_environment.database_setup.time_based_bucketfs_sync_waiter \ + import TimeBasedBucketFSSyncWaiter +from exasol_integration_test_docker_environment \ + .lib.base.db_os_executor import ( + DbOsExecutor, + DbOsExecFactory, + ) @dataclasses.dataclass @@ -35,6 +43,7 @@ class UploadFileToBucketFS(DockerBaseTask): reuse_uploaded = luigi.BoolParameter(False, significant=False) bucketfs_write_password = luigi.Parameter( significant=False, visibility=luigi.parameter.ParameterVisibility.HIDDEN) + executor_factory=JsonPickleParameter(DbOsExecFactory, significant=False) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -54,16 +63,20 @@ def run_task(self): else: database_container = None if not self.should_be_reused(upload_target): - self.upload_and_wait(database_container, - file_to_upload, - upload_target, - log_file, - pattern_to_wait_for, - sync_time_estimation) - self.return_object(UploadResult( - upload_target=upload_target, - reused=False - )) + with self.executor_factory.executor() as executor: + self.upload_and_wait( + database_container, + file_to_upload, + upload_target, + log_file, + pattern_to_wait_for, + sync_time_estimation, + db_os_executor=executor, + ) + self.return_object(UploadResult( + upload_target=upload_target, + reused=False + )) else: self.logger.warning("Reusing uploaded target %s instead of file %s", upload_target, file_to_upload) @@ -73,37 +86,57 @@ def run_task(self): reused=True )) - def upload_and_wait(self, database_container, - file_to_upload: str, upload_target: str, - log_file: str, pattern_to_wait_for: str, - sync_time_estimation: int): - still_running_logger = StillRunningLogger(self.logger, - "file upload of %s to %s" - % (file_to_upload, upload_target)) + def upload_and_wait( + self, + database_container, + file_to_upload: str, + upload_target: str, + log_file: str, + pattern_to_wait_for: str, + sync_time_estimation: int, + db_os_executor: DbOsExecutor, + ): + still_running_logger = StillRunningLogger( + self.logger, + f"file upload of {file_to_upload} to {upload_target}", + ) thread = StillRunningLoggerThread(still_running_logger) thread.start() - sync_checker = self.get_sync_checker(database_container, sync_time_estimation, - log_file, pattern_to_wait_for) + sync_checker = self.get_sync_checker( + database_container, + sync_time_estimation, + log_file, + pattern_to_wait_for, + db_os_executor=db_os_executor, + ) sync_checker.prepare_upload() try: - output = self.upload_file(file_to_upload=file_to_upload, upload_target=upload_target) + output = self.upload_file( + file_to_upload=file_to_upload, + upload_target=upload_target, + ) sync_checker.wait_for_bucketfs_sync() self.write_logs(output) finally: thread.stop() thread.join() - def get_sync_checker(self, database_container: Container, - sync_time_estimation: int, - log_file: str, - pattern_to_wait_for: str): + def get_sync_checker( + self, + database_container: Container, + sync_time_estimation: int, + log_file: str, + pattern_to_wait_for: str, + db_os_executor: DbOsExecutor, + ): if database_container is not None: return DockerDBLogBasedBucketFSSyncChecker( database_container=database_container, log_file_to_check=log_file, pattern_to_wait_for=pattern_to_wait_for, logger=self.logger, - bucketfs_write_password=str(self.bucketfs_write_password) + bucketfs_write_password=str(self.bucketfs_write_password), + executor=db_os_executor, ) else: return TimeBasedBucketFSSyncWaiter(sync_time_estimation) @@ -149,12 +182,12 @@ def upload_file(self, file_to_upload: str, upload_target: str): self.logger.info("upload file %s to %s", file_to_upload, upload_target) bucket_name, path_in_bucket, file_in_bucket = self.split_upload_target(upload_target) - bucket_config = self.generate_bucket_config(bucket_name) upload.upload_file_to_bucketfs( bucket_config=bucket_config, bucket_file_path=f"{path_in_bucket}/{file_in_bucket}", - local_file_path=Path(file_to_upload)) + local_file_path=Path(file_to_upload) + ) return f"File '{file_to_upload}' to '{upload_target}'" def write_logs(self, output): diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/db_container_log_thread.py b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/db_container_log_thread.py index 1ed5d1b8d..07cf7ce60 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/db_container_log_thread.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/db_container_log_thread.py @@ -27,23 +27,27 @@ def stop(self): self.finish = True def run(self): - with ContainerLogHandler(self.log_file, self.logger, self.description) as log_handler: - still_running_logger = StillRunningLogger( - self.logger, self.description) - while not self.finish: - self.current_timestamp = math.floor(time.time()) - log = self.container.logs(since=self.previous_timestamp, until=self.current_timestamp) - if len(log) != 0: - still_running_logger.log() - log_handler.handle_log_lines(log) - log_line = log.decode("utf-8").lower() - if ("error" in log_line and not "sshd was not started" in log_line) \ - or "exception" in log_line \ - or ("returned with state 1" in log_line - and not "(membership) returned with state 1" in log_line): # exclude webui not found in 7.0.0 - self.logger.info("ContainerLogHandler error message, %s", log_line) - self.error_message = log_line - self.finish = True - self.previous_timestamp = self.current_timestamp - self.complete_log = log_handler.get_complete_log().copy() - time.sleep(1) + try: + with ContainerLogHandler(self.log_file, self.logger, self.description) as log_handler: + still_running_logger = StillRunningLogger( + self.logger, self.description) + while not self.finish: + self.current_timestamp = math.floor(time.time()) + log = self.container.logs(since=self.previous_timestamp, until=self.current_timestamp) + if len(log) != 0: + still_running_logger.log() + log_handler.handle_log_lines(log) + log_line = log.decode("utf-8").lower() + if ("error" in log_line and not "sshd was not started" in log_line) \ + or "exception" in log_line \ + or ("returned with state 1" in log_line + and not "(membership) returned with state 1" in log_line): # exclude webui not found in 7.0.0 + self.logger.info("ContainerLogHandler error message, %s", log_line) + self.error_message = log_line + self.finish = True + self.previous_timestamp = self.current_timestamp + self.complete_log = log_handler.get_complete_log().copy() + time.sleep(1) + except Exception as e: + self.finish = True + self.logger.exception("Caught exception in DBContainerLogThread.run.") \ No newline at end of file diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/is_database_ready_thread.py b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/is_database_ready_thread.py index 1722e58d6..ac2893883 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/is_database_ready_thread.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/is_database_ready_thread.py @@ -1,4 +1,5 @@ import time +from logging import Logger from pathlib import PurePath from threading import Thread @@ -8,16 +9,18 @@ from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo from exasol_integration_test_docker_environment.lib.test_environment.database_setup.find_exaplus_in_db_container import \ find_exaplus +from exasol_integration_test_docker_environment.lib.base.db_os_executor import \ + DbOsExecFactory class IsDatabaseReadyThread(Thread): - def __init__(self, - logger, + logger: Logger, database_info: DatabaseInfo, database_container: Container, database_credentials: DatabaseCredentials, - docker_db_image_version: str): + docker_db_image_version: str, + executor_factory: DbOsExecFactory): super().__init__() self.logger = logger self.database_credentials = database_credentials @@ -28,37 +31,43 @@ def __init__(self, self.output_db_connection = None self.output_bucketfs_connection = None self.docker_db_image_version = docker_db_image_version + self.executor_factory = executor_factory def stop(self): self.logger.info("Stop IsDatabaseReadyThread") self.finish = True def run(self): - db_connection_command = "" - bucket_fs_connection_command = "" try: - exaplus_path = find_exaplus(self._db_container) - db_connection_command = self.create_db_connection_command(exaplus_path) - bucket_fs_connection_command = self.create_bucketfs_connection_command() - except RuntimeError as e: - self.logger.error(e) + with self.executor_factory.executor() as executor: + db_connection_command = "" + bucket_fs_connection_command = "" + try: + exaplus_path = find_exaplus(self._db_container, executor) + db_connection_command = self.create_db_connection_command(exaplus_path) + bucket_fs_connection_command = self.create_bucketfs_connection_command() + except RuntimeError as e: + self.logger.exception("Caught exception while searching for exaplus.") + self.finish = True + while not self.finish: + (exit_code_db_connection, self.output_db_connection) = \ + self._db_container.exec_run(cmd=db_connection_command) + (exit_code_bucketfs_connection, self.output_bucketfs_connection) = \ + self._db_container.exec_run(cmd=bucket_fs_connection_command) + if exit_code_db_connection == 0 and exit_code_bucketfs_connection == 0: + self.finish = True + self.is_ready = True + time.sleep(1) + except Exception as e: self.finish = True - while not self.finish: - (exit_code_db_connection, self.output_db_connection) = \ - self._db_container.exec_run(cmd=db_connection_command) - (exit_code_bucketfs_connection, self.output_bucketfs_connection) = \ - self._db_container.exec_run(cmd=bucket_fs_connection_command) - if exit_code_db_connection == 0 and exit_code_bucketfs_connection == 0: - self.finish = True - self.is_ready = True - time.sleep(1) + self.logger.exception("Caught exception in IsDatabaseReadyThread.run.") def create_db_connection_command(self, exaplus_path: PurePath): username = self.database_credentials.db_user password = self.database_credentials.db_password connection_options = f"""-c 'localhost:{self._database_info.ports.database}' -u '{username}' -p '{password}'""" - cmd = f"""{exaplus_path} {connection_options} -sql 'select 1;' -jdbcparam 'validateservercertificate=0'""" + cmd = f"""{exaplus_path} {connection_options} -sql 'select 1;' -jdbcparam 'validateservercertificate=0'""" bash_cmd = f"""bash -c "{cmd}" """ return bash_cmd diff --git a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/wait_for_test_docker_database.py b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/wait_for_test_docker_database.py index f2dc0b7d0..268726dd2 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/wait_for_test_docker_database.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/database_waiters/wait_for_test_docker_database.py @@ -14,6 +14,8 @@ DBContainerLogThread from exasol_integration_test_docker_environment.lib.test_environment.database_waiters.is_database_ready_thread import \ IsDatabaseReadyThread +from exasol_integration_test_docker_environment.lib.base.db_os_executor import \ + DbOsExecFactory class WaitForTestDockerDatabase(DockerBaseTask, DatabaseCredentialsParameter): @@ -22,6 +24,7 @@ class WaitForTestDockerDatabase(DockerBaseTask, DatabaseCredentialsParameter): db_startup_timeout_in_seconds = luigi.IntParameter(10 * 60, significant=False) attempt = luigi.IntParameter(1) docker_db_image_version = luigi.Parameter() + executor_factory = JsonPickleParameter(DbOsExecFactory, significant=False) def run_task(self): with self._get_docker_client() as docker_client: @@ -49,11 +52,14 @@ def start_wait_threads(self, db_container: Container): startup_log_file, "Database Startup %s" % db_container.name) container_log_thread.start() - is_database_ready_thread = IsDatabaseReadyThread(self.logger, - self.database_info, - db_container, - self.get_database_credentials(), - self.docker_db_image_version) + is_database_ready_thread = IsDatabaseReadyThread( + self.logger, + self.database_info, + db_container, + self.get_database_credentials(), + self.docker_db_image_version, + self.executor_factory, + ) is_database_ready_thread.start() return container_log_thread, is_database_ready_thread @@ -99,7 +105,7 @@ def log_database_not_ready(self, container_log_thread: DBContainerLogThread, {is_database_ready_thread.output_db_connection} ========== IsDatabaseReadyThread output bucketfs connection: ============ {is_database_ready_thread.output_bucketfs_connection} -========== Container-Log: ============ +========== Container-Log: ============ {container_log} """ self.logger.warning( diff --git a/exasol_integration_test_docker_environment/lib/test_environment/spawn_test_environment_with_docker_db.py b/exasol_integration_test_docker_environment/lib/test_environment/spawn_test_environment_with_docker_db.py index 957c92e97..b81287677 100644 --- a/exasol_integration_test_docker_environment/lib/test_environment/spawn_test_environment_with_docker_db.py +++ b/exasol_integration_test_docker_environment/lib/test_environment/spawn_test_environment_with_docker_db.py @@ -26,7 +26,16 @@ from exasol_integration_test_docker_environment \ .lib.test_environment.spawn_test_database \ import SpawnTestDockerDatabase - +from exasol_integration_test_docker_environment \ + .lib.base.db_os_executor import ( + DockerClientFactory, + DbOsExecFactory, + SshExecFactory, + DockerExecFactory, + ) +from exasol_integration_test_docker_environment \ + .lib.test_environment.parameter.docker_db_test_environment_parameter \ + import DbOsAccess class SpawnTestEnvironmentWithDockerDB( AbstractSpawnTestEnvironment, @@ -67,6 +76,12 @@ def create_network_task(self, attempt: int): attempt=attempt, ) + def _executor_factory(self, database_info: DatabaseInfo) -> DbOsExecFactory: + if self.db_os_access == DbOsAccess.SSH: + return SshExecFactory.from_database_info(database_info) + client_factory = DockerClientFactory(timeout=100000) + return DockerExecFactory(self.db_container_name, client_factory) + def create_spawn_database_task( self, network_info: DockerNetworkInfo, @@ -91,4 +106,5 @@ def create_wait_for_database_task(self, attempt: int, database_info: DatabaseInf database_info=database_info, attempt=attempt, docker_db_image_version=self.docker_db_image_version, + executor_factory=self._executor_factory(database_info) ) diff --git a/exasol_integration_test_docker_environment/test/test_cli_test_environment.py b/exasol_integration_test_docker_environment/test/test_cli_test_environment.py deleted file mode 100644 index 328c61001..000000000 --- a/exasol_integration_test_docker_environment/test/test_cli_test_environment.py +++ /dev/null @@ -1,80 +0,0 @@ -import unittest -from typing import List - -import docker.models.containers - -from exasol_integration_test_docker_environment.lib.docker import ContextDockerClient -from exasol_integration_test_docker_environment.lib.test_environment.database_setup.find_exaplus_in_db_container import \ - find_exaplus -from exasol_integration_test_docker_environment.testing import utils -from exasol_integration_test_docker_environment.testing.exaslct_test_environment import ExaslctTestEnvironment - - -class DockerTestEnvironmentTest(unittest.TestCase): - - @classmethod - def setUpClass(cls): - print(f"SetUp {cls.__name__}") - cls.test_environment = \ - ExaslctTestEnvironment( - cls, - utils.INTEGRATION_TEST_DOCKER_ENVIRONMENT_DEFAULT_BIN, - clean_images_at_close=False) - # TODO cls.test_environment.clean_images() - cls.docker_environment_name = cls.__name__ - cls.spawned_docker_test_environments = \ - cls.test_environment.spawn_docker_test_environments(name=cls.docker_environment_name) - - @classmethod - def tearDownClass(cls): - utils.close_environments(cls.spawned_docker_test_environments, cls.test_environment) - - def test_db_container_started(self): - def assert_exactly_one(prefix: str, all: List[str], selected: List[str] = None): - selected = selected if selected is not None else all - log = self.spawned_docker_test_environments \ - .on_host_docker_environment \ - .completed_process.stdout.decode('utf8') - self.assertEqual(len(selected), 1, f"{prefix} in {all}.\nStartup log was: {log}") - with ContextDockerClient() as docker_client: - containers = [c.name for c in docker_client.containers.list() if self.docker_environment_name in c.name] - assert_exactly_one("Not exactly 1 container", containers) - db_containers = [c for c in containers if "db_container" in c] - assert_exactly_one("Found no db container", containers, db_containers) - - def test_db_available(self): - db_container_name = self.spawned_docker_test_environments \ - .on_host_docker_environment \ - .environment_info \ - .database_info \ - .container_info \ - .container_name - with ContextDockerClient() as docker_client: - db_container = docker_client.containers.get(db_container_name) - command = self.db_connection_command(db_container) - exit_code, output = db_container.exec_run(command) - self.assertEqual( - exit_code, - 0, - f"Error while executing 'exaplus' in test container. Got output:\n {output}", - ) - - def db_connection_command(self, db_container: docker.models.containers.Container): - on_host = self.spawned_docker_test_environments.on_host_docker_environment - db_info = on_host.environment_info.database_info - connection_options = ( - f"-c '{db_info.host}:{db_info.ports.database}' " - f"-u '{on_host.db_username}' " - f"-p '{on_host.db_password}'" - ) - exaplus = find_exaplus(db_container) - command = ( - f"{exaplus} {connection_options} " - "-sql 'select 1;' " - "-jdbcparam 'validateservercertificate=0'" - ) - return f'bash -c "{command}" ' - - -if __name__ == '__main__': - unittest.main() diff --git a/exasol_integration_test_docker_environment/test/test_upload.py b/exasol_integration_test_docker_environment/test/test_upload.py deleted file mode 100644 index e0de43f0f..000000000 --- a/exasol_integration_test_docker_environment/test/test_upload.py +++ /dev/null @@ -1,197 +0,0 @@ -import tempfile -import unittest -from pathlib import Path -from sys import stderr -from typing import List - -import luigi -from exasol.bucketfs import Service, Bucket, as_string -from exasol_bucketfs_utils_python import list_files - -from exasol_integration_test_docker_environment.lib.api.common import generate_root_task, run_task -from exasol_integration_test_docker_environment.lib.test_environment.database_setup.upload_file_to_db import \ - UploadFileToBucketFS, UploadResult -from exasol_integration_test_docker_environment.testing import utils -from exasol_integration_test_docker_environment.testing.api_test_environment import ApiTestEnvironment - -BUCKET_NAME = "default" -PATH_IN_BUCKET = "upload_test" - - -def construct_upload_target(file_to_upload: str) -> str: - return f"{BUCKET_NAME}/{PATH_IN_BUCKET}/{file_to_upload}" - - -class TestfileUpload(UploadFileToBucketFS): - path = luigi.Parameter() - file_to_upload = luigi.Parameter() # type: str - - def get_log_file(self) -> str: - return "/exa/logs/cored/*bucketfsd*" - - def get_pattern_to_wait_for(self) -> str: - return f"{self.file_to_upload}.*linked" - - def get_file_to_upload(self) -> str: - return f"{Path(str(self.path)) / str(self.file_to_upload)}" - - def get_upload_target(self) -> str: - return construct_upload_target(self.file_to_upload) - - def get_sync_time_estimation(self) -> int: - """Estimated time in seconds which the bucketfs needs to extract and sync a uploaded file""" - return 10 - - -class TestUpload(unittest.TestCase): - - @classmethod - def setUpClass(cls): - print(f"SetUp {cls.__name__}", file=stderr) - cls.test_environment = ApiTestEnvironment(cls) - cls.docker_environment_name = cls.__name__ - cls.environment = \ - cls.test_environment.spawn_docker_test_environment(name=cls.docker_environment_name) - - @classmethod - def tearDownClass(cls): - utils.close_environments(cls.environment, cls.test_environment) - - def _upload(self, temp_dir: str, file_to_upload: str, reuse: bool) -> UploadResult: - task_creator = lambda: generate_root_task(task_class=TestfileUpload, - path=temp_dir, - file_to_upload=file_to_upload, - environment_name=self.environment.name, - test_environment_info=self.environment.environment_info, - bucketfs_write_password=self.environment.bucketfs_password, - reuse_uploaded=reuse) - result = run_task(task_creator=task_creator, log_level="INFO") - return result - - def _get_bucket(self) -> Bucket: - db_info = self.environment.environment_info.database_info - URL = f"http://{db_info.host}:{db_info.ports.bucketfs}" - CREDENTAILS = {BUCKET_NAME: {"username": self.environment.bucketfs_username, - "password": self.environment.bucketfs_password}} - bucketfs = Service(URL, CREDENTAILS) - return bucketfs.buckets[BUCKET_NAME] - - def _download_file(self, filename: str) -> str: - file_content = as_string(self._get_bucket().download(f"{PATH_IN_BUCKET}/{filename}")) - return file_content - - def _assert_file_upload(self, file_name: str, - upload_result: UploadResult, - expected_reuse: bool, - expected_content: str): - files = self._get_bucket().files - self.assertEqual(upload_result, UploadResult( - upload_target=construct_upload_target(file_name), - reused=expected_reuse - )) - self.assertIn(f"{PATH_IN_BUCKET}/{file_name}", list(files)) - content = self._download_file(file_name) - self.assertEqual(content, expected_content) - - def _create_file_and_upload(self, - local_directory: str, - content: str, - upload_target: str, - reuse: bool) -> UploadResult: - with open(f"{local_directory}/{upload_target}", "w") as f: - f.write(content) - return self._upload(local_directory, upload_target, reuse) - - def test_upload_without_reuse(self): - with tempfile.TemporaryDirectory() as temp_directory: - file_one = "test1.txt" - result_file_one = self._create_file_and_upload( - local_directory=temp_directory, - content=file_one, - upload_target=file_one, - reuse=False - ) - file_two = "test2.txt" - result_file_two = self._create_file_and_upload( - local_directory=temp_directory, - content=file_two, - upload_target=file_two, - reuse=False - ) - self._assert_file_upload(file_name=file_one, - upload_result=result_file_one, - expected_reuse=False, - expected_content=file_one) - self._assert_file_upload(file_name=file_two, - upload_result=result_file_two, - expected_reuse=False, - expected_content=file_two) - - def test_upload_with_reuse(self): - with tempfile.TemporaryDirectory() as temp_directory: - file_reuse = "test_reuse.txt" - result = self._create_file_and_upload( - local_directory=temp_directory, - content=file_reuse, - upload_target=file_reuse, - reuse=True - ) - self._assert_file_upload(file_name=file_reuse, - upload_result=result, - expected_reuse=False, - expected_content=file_reuse) - - def test_reupload_with_reuse(self): - with tempfile.TemporaryDirectory() as temp_directory: - file_reupload_reuse = "test_reupload_reuse.txt" - result = self._create_file_and_upload( - local_directory=temp_directory, - content=file_reupload_reuse, - upload_target=file_reupload_reuse, - reuse=True - ) - self._assert_file_upload(file_name=file_reupload_reuse, - upload_result=result, - expected_reuse=False, - expected_content=file_reupload_reuse) - - result = self._create_file_and_upload( - local_directory=temp_directory, - content=file_reupload_reuse, - upload_target=file_reupload_reuse, - reuse=True - ) - self._assert_file_upload(file_name=file_reupload_reuse, - upload_result=result, - expected_reuse=True, - expected_content=file_reupload_reuse) - - def test_reupload_without_reuse(self): - with tempfile.TemporaryDirectory() as temp_directory: - file_reupload_no_reuse = "test_reupload_no_reuse.txt" - result = self._create_file_and_upload( - local_directory=temp_directory, - content=file_reupload_no_reuse, - upload_target=file_reupload_no_reuse, - reuse=False - ) - self._assert_file_upload(file_name=file_reupload_no_reuse, - upload_result=result, - expected_reuse=False, - expected_content=file_reupload_no_reuse) - - expected_content_after_reupload = file_reupload_no_reuse + "2" - result = self._create_file_and_upload( - local_directory=temp_directory, - content=expected_content_after_reupload, - upload_target=file_reupload_no_reuse, - reuse=False - ) - self._assert_file_upload(file_name=file_reupload_no_reuse, - upload_result=result, - expected_reuse=False, - expected_content=expected_content_after_reupload) - - -if __name__ == '__main__': - unittest.main() diff --git a/noxfile.py b/noxfile.py index 9278dab60..a8f0f77b5 100644 --- a/noxfile.py +++ b/noxfile.py @@ -192,22 +192,34 @@ def run_tests(session: nox.Session, db_version: str): @nox.parametrize("db_version", get_db_versions()) def run_minimal_tests(session: nox.Session, db_version: str): """Run the minimal tests in the poetry environment""" - with session.chdir(ROOT): - env = {"EXASOL_VERSION": db_version} - minimal_tests = ( + env = {"EXASOL_VERSION": db_version} + minimal_tests = { + "old-itest": [ "test_api_test_environment.py", - "test_cli_test_environment.py", + # "test_cli_test_environment.py", "test_doctor.py", "test_termination_handler.py", + ], + "new-itest": [ + "test_cli_environment.py" + ], + "unit": "./test/unit", + } + session.run("pytest", minimal_tests["unit"]) + for test in minimal_tests["new-itest"]: + session.run( + "pytest", + f"./test/integration/{test}", + env=env, ) - for test in minimal_tests: + with session.chdir(ROOT): + for test in minimal_tests["old-itest"]: session.run( "python", "-u", f"./exasol_integration_test_docker_environment/test/{test}", env=env, ) - session.run("pytest", './test/unit') @nox.session(name="get-all-db-versions", python=False) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index e1a22c872..4f523e584 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -4,6 +4,7 @@ import pytest from exasol_integration_test_docker_environment.lib.docker import ContextDockerClient +from test.integration.helpers import normalize_request_name from exasol_integration_test_docker_environment.testing import utils from exasol_integration_test_docker_environment \ .testing.api_test_environment import ApiTestEnvironment @@ -19,7 +20,7 @@ @pytest.fixture def cli_isolation(request) -> Iterator[ExaslctTestEnvironment]: - testname = request.node.name + testname = normalize_request_name(request.node.name) environment = ExaslctTestEnvironment( test_object=None, executable="itde", @@ -32,7 +33,7 @@ def cli_isolation(request) -> Iterator[ExaslctTestEnvironment]: @pytest.fixture def api_isolation(request) -> Iterator[ApiTestEnvironment]: - testname = request.node.name + testname = normalize_request_name(request.node.name) environment = ApiTestEnvironment(test_object=None, name=testname) yield environment utils.close_environments(environment) diff --git a/test/integration/helpers.py b/test/integration/helpers.py index b4ecfb8df..53bf05874 100644 --- a/test/integration/helpers.py +++ b/test/integration/helpers.py @@ -1,6 +1,25 @@ import contextlib +import re +from typing import Any, cast +from unittest.mock import Mock from exasol_integration_test_docker_environment.lib.docker import ContextDockerClient +from exasol_integration_test_docker_environment \ + .lib.test_environment.parameter.docker_db_test_environment_parameter \ + import DbOsAccess +from exasol_integration_test_docker_environment.lib.base.db_os_executor import ( + SshExecFactory, + DockerExecFactory, + DbOsExecFactory, + DockerClientFactory, +) +from exasol_integration_test_docker_environment.lib.data.database_info \ + import DatabaseInfo + + +def normalize_request_name(name: str): + name = re.sub(r"[\[\]._]+", "_", name) + return re.sub(r"^_+|_+$", "", name) def exact_matcher(names): @@ -10,10 +29,23 @@ def exact_matcher(names): def superset_matcher(names): return lambda value: all(x in value for x in names) - @contextlib.contextmanager def container_named(*names, matcher=None): matcher = matcher if matcher else exact_matcher(names) with ContextDockerClient() as client: matches = [c for c in client.containers.list() if matcher(c.name)] yield matches[0] if matches else None + + +def get_executor_factory( + dbinfo: DatabaseInfo, + db_os_access: DbOsAccess=DbOsAccess.DOCKER_EXEC, +) -> DbOsExecFactory: + if db_os_access == DbOsAccess.SSH: + return SshExecFactory.from_database_info(dbinfo) + client_factory = DockerClientFactory(timeout=100000) + return DockerExecFactory(dbinfo.container_info.container_name, client_factory) + + +def mock_cast(obj: Any) -> Mock: + return cast(Mock, obj) diff --git a/test/integration/test_bucketfs_upload.py b/test/integration/test_bucketfs_upload.py new file mode 100644 index 000000000..d4b196338 --- /dev/null +++ b/test/integration/test_bucketfs_upload.py @@ -0,0 +1,172 @@ +import luigi +import os +import pytest + +from typing import List, Optional + +from exasol_integration_test_docker_environment \ + .testing.api_test_environment import ApiTestEnvironment +from dataclasses import dataclass +from exasol.bucketfs import Service, Bucket, as_string +from exasol_integration_test_docker_environment \ + .lib.test_environment.database_setup.upload_file_to_db \ + import ( + UploadFileToBucketFS, + UploadResult, + ) +from exasol_integration_test_docker_environment.lib.api.common import ( + generate_root_task, + run_task, +) +from exasol_integration_test_docker_environment \ + .lib.data.environment_info import EnvironmentInfo +from exasol_integration_test_docker_environment \ + .lib.data.database_info import DatabaseInfo +from exasol_integration_test_docker_environment \ + .lib.test_environment.parameter \ + .docker_db_test_environment_parameter import DbOsAccess +from exasol_integration_test_docker_environment \ + .lib.base.db_os_executor import ( + DbOsExecFactory, + DockerExecFactory, + SshExecFactory, + ) +from test.integration.helpers import get_executor_factory + +BUCKET_NAME = "default" + + +def bucketfs_path(path: str, relative: bool = False) -> str: + parent = "upload_test" + suffix = f"{parent}/{path}" + if relative: + return suffix + return f"{BUCKET_NAME}/{suffix}" + + +class ArgumentError(Exception): + """Invalid arguments to BucketFsAccess.upload()""" + + +class BucketFsAccess: + class FileUploadTask(UploadFileToBucketFS): + local_path = luigi.Parameter() + target = luigi.Parameter() + + def get_log_file(self) -> str: + return "/exa/logs/cored/*bucketfsd*" + + def get_pattern_to_wait_for(self) -> str: + filename = os.path.basename(self.local_path) + return f"{filename}.*linked" + + def get_file_to_upload(self) -> str: + return str(self.local_path) + + def get_upload_target(self) -> str: + return self.target + + def get_sync_time_estimation(self) -> int: + """Estimated time in seconds which the bucketfs needs to extract and sync a uploaded file""" + return 10 + + def __init__(self, environment: ApiTestEnvironment, executor_factory: DbOsExecFactory): + self.environment = environment + self.executor_factory = executor_factory + + def _get_bucket(self) -> Bucket: + db_info = self.environment.environment_info.database_info + url = f"http://{db_info.host}:{db_info.ports.bucketfs}" + credentials = { BUCKET_NAME: { + "username": self.environment.bucketfs_username, + "password": self.environment.bucketfs_password + } } + bucketfs = Service(url, credentials) + return bucketfs.buckets[BUCKET_NAME] + + def list(self) -> List[str]: + return self._get_bucket().files + + def upload(self, + local_path: str, + relative: Optional[str] = None, + target: Optional[str] = None, + reuse: bool = False) -> UploadResult: + if not (relative or target): + raise ArgumentError("Either relative or target must be specified.") + if relative: + local_path = f"{local_path}/{relative}" + target = bucketfs_path(target or relative) + task_creator = lambda: generate_root_task( + task_class=self.FileUploadTask, + local_path=local_path, + target=target, + environment_name=self.environment.name, + test_environment_info=self.environment.environment_info, + bucketfs_write_password=self.environment.bucketfs_password, + reuse_uploaded=reuse, + executor_factory=self.executor_factory, + ) + result = run_task(task_creator=task_creator, log_level="INFO") + return result + + def download(self, relative: str) -> str: + path = bucketfs_path(path=relative, relative=True) + return as_string(self._get_bucket().download(path)) + + +class UploadValidator: + def __init__(self, tmp_path: str, bucketfs: BucketFsAccess, reuse: bool): + self.tmp_path = tmp_path + self.bucketfs = bucketfs + self.reuse = reuse + self.filename = None + self.actual_result = None + + def upload(self, filename: str, content: str): + with open(f"{self.tmp_path}/{filename}", "w") as f: + f.write(content) + self.filename = filename + self.actual_result = self.bucketfs.upload( + self.tmp_path, + relative=filename, + reuse=self.reuse, + ) + return self + + def validate(self, expected_content: str, expected_reuse: bool): + expected_result = UploadResult( + upload_target=bucketfs_path(self.filename), + reused=expected_reuse, + ) + assert self.actual_result == expected_result + assert bucketfs_path(self.filename, relative=True) in self.bucketfs.list() + assert expected_content == self.bucketfs.download(self.filename) + + +@pytest.mark.parametrize("db_os_access", [DbOsAccess.DOCKER_EXEC, DbOsAccess.SSH]) +def test_upload_without_reuse(api_database, tmp_path, db_os_access): + with api_database() as db: + dbinfo = db.environment_info.database_info + executor_factory = get_executor_factory(dbinfo, db_os_access) + bucketfs = BucketFsAccess(db, executor_factory) + filename = "sample-file.txt" + validator = UploadValidator(tmp_path, bucketfs, reuse=False) + validator.upload(filename, "old content") \ + .validate("old content", expected_reuse=False) + validator.upload(filename, "new content") \ + .validate("new content", expected_reuse=False) + + +@pytest.mark.parametrize("db_os_access", [DbOsAccess.DOCKER_EXEC, DbOsAccess.SSH]) +def test_upload_with_reuse(api_database, tmp_path, db_os_access): + with api_database() as db: + dbinfo = db.environment_info.database_info + executor_factory = get_executor_factory(dbinfo, db_os_access) + bucketfs = BucketFsAccess(db, executor_factory) + filename = "sample-file.txt" + validator = UploadValidator(tmp_path, bucketfs, reuse=True) + validator.upload(filename, "old content") \ + .validate("old content", expected_reuse=False) + validator.upload(filename, "new content") \ + .validate("old content", expected_reuse=True) diff --git a/test/integration/test_cli_environment.py b/test/integration/test_cli_environment.py new file mode 100644 index 000000000..615daf6bd --- /dev/null +++ b/test/integration/test_cli_environment.py @@ -0,0 +1,96 @@ +import docker +import pytest + +from inspect import cleandoc +from typing import List, Optional +from exasol_integration_test_docker_environment.lib.docker import ContextDockerClient +from exasol_integration_test_docker_environment.testing.spawned_test_environments \ + import SpawnedTestEnvironments +from exasol_integration_test_docker_environment.lib.base.db_os_executor \ + import DbOsExecFactory +from exasol_integration_test_docker_environment.testing.exaslct_docker_test_environment \ + import ExaslctDockerTestEnvironment +from exasol_integration_test_docker_environment \ + .lib.test_environment.database_setup.find_exaplus_in_db_container \ + import find_exaplus +from exasol_integration_test_docker_environment.lib.test_environment.parameter.docker_db_test_environment_parameter \ + import DbOsAccess +from test.integration.helpers import get_executor_factory + +class NumberCheck: + def __init__(self, db: SpawnedTestEnvironments, all: List[str]): + self.db = db + self.all = all + + def count(self, selected: Optional[List[str]] = None): + return len(selected if selected is not None else self.all) + + @property + def log(self) -> str: + return ( + self + .db + .on_host_docker_environment + .completed_process + .stdout + .decode('utf8') + ) + + def fail(self, prefix) -> str: + return cleandoc( + f""" + {prefix} in {self.all}. + Startup log was: + {self.log} + """ + ) + + +def smoke_test_sql(exaplus_path: str, env: ExaslctDockerTestEnvironment) -> str: + def quote(s): + return f"'{s}'" + + db_info = env.environment_info.database_info + command = [ + str(exaplus_path), + "-c", quote(f"{db_info.host}:{db_info.ports.database}"), + "-u", quote(env.db_username), + "-p", quote(env.db_password), + ] + command += [ + "-sql", + quote("select 1;"), + "-jdbcparam", + "validateservercertificate=0", + ] + command = " ".join(command) + return f'bash -c "{command}" ' + + +def test_db_container_started(cli_database): + with cli_database() as db: + with ContextDockerClient() as docker_client: + name = db.on_host_docker_environment.name + containers = [c.name for c in docker_client.containers.list() if name in c.name] + check = NumberCheck(db, containers) + assert check.count() ==1, check.fail("Not exactly 1 container") + + db_containers = [c for c in containers if "db_container" in c] + check = NumberCheck(db, containers) + assert check.count(db_containers) == 1, check.fail("Found no db container") + + +@pytest.mark.parametrize("db_os_access", [DbOsAccess.DOCKER_EXEC, DbOsAccess.SSH]) +def test_db_available(cli_database, db_os_access): + with cli_database() as db: + with ContextDockerClient() as docker_client: + dbinfo = db.on_host_docker_environment.environment_info.database_info + db_container_name = dbinfo.container_info.container_name + db_container = docker_client.containers.get(db_container_name) + executor_factory = get_executor_factory(dbinfo, db_os_access) + with executor_factory.executor() as executor: + exaplus = find_exaplus(db_container, executor) + command = smoke_test_sql(exaplus, db.on_host_docker_environment) + exit_code, output = db_container.exec_run(command) + assert exit_code == 0, \ + f"Error while executing 'exaplus' in test container. Got output:\n {output}" diff --git a/test/integration/test_ssh_access.py b/test/integration/test_ssh_access.py index 264ddc497..b99813558 100644 --- a/test/integration/test_ssh_access.py +++ b/test/integration/test_ssh_access.py @@ -1,10 +1,43 @@ +import contextlib +import docker +from docker.models.containers import Container as DockerContainer import fabric import io import os import pytest +import time + +from exasol_integration_test_docker_environment.lib.test_environment.ports import ( + find_free_ports, + Ports, +) +from exasol_integration_test_docker_environment.lib.data.ssh_info \ + import SshInfo +from exasol_integration_test_docker_environment.lib.data.database_info \ + import DatabaseInfo +from exasol_integration_test_docker_environment \ + .lib.test_environment.parameter.docker_db_test_environment_parameter \ + import DbOsAccess +from exasol_integration_test_docker_environment.lib.data.container_info \ + import ContainerInfo from exasol_integration_test_docker_environment.lib.base.ssh_access import SshKey, SshKeyCache -from test.integration.helpers import container_named +from test.integration.helpers import ( + container_named, + get_executor_factory, + normalize_request_name, +) + + +@pytest.fixture +def fabric_stdin(monkeypatch): + """ + Mock stdin to avoid ThreadException when reading from stdin while + stdout is captured by pytest: OSError: pytest: reading from stdin while + output is captured! Consider using ``-s``. + See https://github.com/fabric/fabric/issues/2005 + """ + monkeypatch.setattr('sys.stdin', io.StringIO('')) def test_generate_ssh_key_file(api_database): @@ -18,19 +51,71 @@ def test_generate_ssh_key_file(api_database): assert " itde-ssh-access" in command[1].decode("utf-8") -def test_ssh_access(api_database, monkeypatch): +def test_ssh_access(api_database, fabric_stdin): params = { "db_os_access": "SSH" } with api_database(additional_parameters=params) as db: container_name = db.environment_info.database_info.container_info.container_name with container_named(container_name) as container: command = container.exec_run("cat /root/.ssh/authorized_keys") key = SshKey.from_cache() - # Mock stdin to avoid ThreadException when reading from - # stdin while stdout is capture by pytest. - # See https://github.com/fabric/fabric/issues/2005 - monkeypatch.setattr('sys.stdin', io.StringIO('')) result = fabric.Connection( f"root@localhost:{db.ports.ssh}", connect_kwargs={ "pkey": key.private }, ).run('ls /exa/etc/EXAConf') assert result.stdout == "/exa/etc/EXAConf\n" + + +@pytest.fixture +def sshd_container(request): + testname = normalize_request_name(request.node.name) + @contextlib.contextmanager + def create_context( + ssh_port_forward: int, + public_key: str + ) -> DockerContainer: + client = docker.from_env() + container = client.containers.run( + name=testname, + image="linuxserver/openssh-server:9.3_p2-r0-ls123", + detach=True, + ports={ '2222/tcp': ssh_port_forward }, + environment={ "PUBLIC_KEY": public_key }, + ) + try: + yield container + finally: + container.stop() + container.remove() + + return create_context + + +@pytest.mark.parametrize("db_os_access", [DbOsAccess.SSH, DbOsAccess.DOCKER_EXEC]) +def test_db_os_executor_factory(sshd_container, db_os_access, fabric_stdin): + def database_info(container_name, ssh_port_forward): + ssh_info = SshInfo( + user="linuxserver.io", + key_file=SshKeyCache().private_key, + ) + return DatabaseInfo( + host="localhost", + ports=Ports(-1, -1, ssh_port_forward), + reused=False, + container_info=ContainerInfo( + container_name=container_name, + ip_address=None, + network_aliases=[], + network_info=None, + ), + ssh_info=ssh_info, + ) + + ssh_port_forward = find_free_ports(1)[0] + public_key = SshKey.from_cache().public_key_as_string() + with sshd_container(ssh_port_forward, public_key) as container: + dbinfo = database_info(container.name, ssh_port_forward) + factory = get_executor_factory(dbinfo, ssh_port_forward) + with factory.executor() as executor: + exit_code, output = executor.exec("ls /keygen.sh") + output = output.decode('utf-8').strip() + assert (exit_code, output) == (0, "/keygen.sh") diff --git a/test/unit/test_db_os_executor.py b/test/unit/test_db_os_executor.py new file mode 100644 index 000000000..3327c9e3a --- /dev/null +++ b/test/unit/test_db_os_executor.py @@ -0,0 +1,76 @@ +from unittest.mock import ( + MagicMock, + create_autospec, + call, +) +from docker import DockerClient +from docker.models.containers import Container as DockerContainer + +from exasol_integration_test_docker_environment \ + .lib.base.db_os_executor import ( + DockerClientFactory, + DbOsExecutor, + DockerExecutor, + SshExecutor, + DockerExecFactory, + SshExecFactory, +) +from exasol_integration_test_docker_environment.lib.test_environment.ports import Ports +from exasol_integration_test_docker_environment.lib.data.ssh_info import SshInfo +from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo +from exasol_integration_test_docker_environment.lib.docker \ + import ContextDockerClient +from test.integration.helpers import mock_cast + + +def test_executor_closes_client(): + container = create_autospec(DockerContainer) + client:Union[MagicMock, DockerClient] = create_autospec(DockerClient) + client.containers.get = MagicMock(return_value=container) + with DockerExecutor(client, "container_name") as executor: + executor.exec("sample command") + container.exec_run.assert_called_with("sample command") + client.close.assert_not_called() + client.close.assert_called() + + +def test_ssh_exec_factory(): + factory = SshExecFactory("connect_string", "ssh_key_file") + executor = factory.executor() + assert isinstance(executor, DbOsExecutor) \ + and type(executor) is SshExecutor + + +def test_docker_exec_factory(): + client_factory = create_autospec(DockerClientFactory) + factory = DockerExecFactory("container_name", client_factory) + executor = factory.executor() + assert isinstance(executor, DbOsExecutor) \ + and type(executor) is DockerExecutor + + +def test_docker_client_factory_usage(): + client = create_autospec(DockerClient) + factory = create_autospec(DockerClientFactory) + factory.client = MagicMock(return_value=client) + testee = DockerExecFactory("container_name", factory) + executor = testee.executor() + assert executor._client == client \ + and mock_cast(factory.client).mock_calls == [call()] + + +def test_ssh_exec_factory_from_database_info(): + ports = Ports(1,2,3) + ssh_info = SshInfo("my_user", "my_key_file") + dbinfo = DatabaseInfo( + "my_host", + ports, + reused=False, + container_info=None, + ssh_info=ssh_info, + forwarded_ports=None, + ) + factory = SshExecFactory.from_database_info(dbinfo) + executor = factory.executor() + assert executor._connect_string == "my_user@my_host:3" + assert executor._key_file == "my_key_file"