diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 36dd25f..9ae31b1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,3 +31,5 @@ repos: - click - pytest - tomlkit + - idf-build-apps~=2.6 + - pytest-embedded~=1.12 diff --git a/idf_ci/__init__.py b/idf_ci/__init__.py index ad4cb6a..4b1808e 100644 --- a/idf_ci/__init__.py +++ b/idf_ci/__init__.py @@ -1,2 +1,18 @@ # SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 + +from .idf_pytest.models import PytestApp, PytestCase +from .idf_pytest.plugin import IdfPytestPlugin +from .idf_pytest.script import get_pytest_cases +from .profiles import IniProfileManager, TomlProfileManager +from .settings import CiSettings + +__all__ = [ + 'CiSettings', + 'IdfPytestPlugin', + 'IniProfileManager', + 'PytestApp', + 'PytestCase', + 'TomlProfileManager', + 'get_pytest_cases', +] diff --git a/idf_ci/_compat.py b/idf_ci/_compat.py new file mode 100644 index 0000000..0e12a67 --- /dev/null +++ b/idf_ci/_compat.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import typing as t + +PathLike = t.Union[str, os.PathLike] + + +class Undefined(str): + def __new__(cls, *args, **kwargs): + return super().__new__(cls, 'undefined') + + +UNDEF = Undefined() diff --git a/idf_ci/compat.py b/idf_ci/idf_pytest/__init__.py similarity index 60% rename from idf_ci/compat.py rename to idf_ci/idf_pytest/__init__.py index 681a2c6..ad4cb6a 100644 --- a/idf_ci/compat.py +++ b/idf_ci/idf_pytest/__init__.py @@ -1,7 +1,2 @@ # SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - -import os -import typing as t - -PathLike = t.Union[str, os.PathLike] diff --git a/idf_ci/idf_pytest/models.py b/idf_ci/idf_pytest/models.py new file mode 100644 index 0000000..85afe56 --- /dev/null +++ b/idf_ci/idf_pytest/models.py @@ -0,0 +1,177 @@ +# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import typing as t +from functools import cached_property + +from _pytest.python import Function +from idf_build_apps.utils import to_list +from pytest_embedded.plugin import parse_multi_dut_args + +LOGGER = logging.getLogger(__name__) + + +class PytestApp: + """ + Represents a pytest app. + """ + + def __init__(self, path: str, target: str, config: str) -> None: + self.path = os.path.abspath(path) + self.target = target + self.config = config + + def __hash__(self) -> int: + return hash((self.path, self.target, self.config)) + + @property + def build_dir(self) -> str: + """ + Returns the build directory for the app. + + .. note:: + + Matches the build_dir (by default build_@t_@w) in the build profile. + + :return: The build directory for the app. + """ + return os.path.join(self.path, f'build_{self.target}_{self.config}') + + +class PytestCase: + """ + Represents a pytest test case. + """ + + def __init__(self, apps: t.List[PytestApp], item: Function) -> None: + self.apps = apps + self.item = item + + @classmethod + def get_param(cls, item: Function, key: str, default: t.Any = None) -> t.Any: + # funcargs is not calculated while collection + # callspec is something defined in parametrize + if not hasattr(item, 'callspec'): + return default + + return item.callspec.params.get(key, default) or default + + @classmethod + def from_item(cls, item: Function, *, cli_target: str) -> t.Optional['PytestCase']: + """ + Turn pytest item to PytestCase + """ + count = cls.get_param(item, 'count', 1) + + # default app_path is where the test script locates + app_paths = to_list(parse_multi_dut_args(count, cls.get_param(item, 'app_path', os.path.dirname(item.path)))) + configs = to_list(parse_multi_dut_args(count, cls.get_param(item, 'config', 'default'))) + targets = to_list(parse_multi_dut_args(count, cls.get_param(item, 'target'))) # defined fixture + + cli_targets = cli_target.split(',') + if count > 1 and targets == [None] * count: + targets = cli_targets + elif targets is None: + if count == len(cli_targets): + LOGGER.debug('No param "target" for test case "%s", use CLI target "%s"', item.name, cli_target) + targets = cli_targets + else: + LOGGER.warning( + 'No param "target" for test case "%s". ' + 'current DUT count is %d, while CLI target count is %d. Skipping the test.', + item.name, + count, + len(cli_targets), + ) + return None + + return PytestCase( + apps=[PytestApp(app_paths[i], targets[i], configs[i]) for i in range(count)], + item=item, + ) + + def __hash__(self) -> int: + return hash((self.path, self.name, self.apps, self.all_markers)) + + @cached_property + def path(self) -> str: + return str(self.item.path) + + @cached_property + def name(self) -> str: + return self.item.originalname + + @cached_property + def targets(self) -> t.List[str]: + return [app.target for app in self.apps] + + @cached_property + def configs(self) -> t.List[str]: + return [app.config for app in self.apps] + + @cached_property + def caseid(self) -> str: + if self.is_single_dut: + return f'{self.targets[0]}.{self.configs[0]}.{self.name}' + else: + return f'{tuple(self.targets)}.{tuple(self.configs)}.{self.name}' + + @cached_property + def is_single_dut(self) -> bool: + return True if len(self.apps) == 1 else False + + @cached_property + def is_host_test(self) -> bool: + return 'host_test' in self.all_markers or 'linux' in self.targets + + @cached_property + def is_in_ci(self) -> bool: + return 'CI_JOB_ID' in os.environ or 'GITHUB_ACTIONS' in os.environ + + @cached_property + def target_selector(self) -> str: + return ','.join(app.target for app in self.apps) + + # the following markers could be changed dynamically, don't use cached_property + @property + def all_markers(self) -> t.Set[str]: + return {marker.name for marker in self.item.iter_markers()} + + def all_built_in_app_lists(self, app_lists: t.Optional[t.List[str]] = None) -> t.Optional[str]: + """ + Check if all binaries of the test case are built in the app lists. + + :param app_lists: app lists to check + :return: debug string if not all binaries are built in the app lists, None otherwise + """ + if app_lists is None: + # ignore this feature + return None + + bin_found = [0] * len(self.apps) + for i, app in enumerate(self.apps): + if app.build_dir in app_lists: + bin_found[i] = 1 + + if sum(bin_found) == 0: + msg = f'Skip test case {self.name} because all following binaries are not listed in the app lists: ' + for app in self.apps: + msg += f'\n - {app.build_dir}' + + print(msg) + return msg + + if sum(bin_found) == len(self.apps): + return None + + # some found, some not, looks suspicious + msg = f'Found some binaries of test case {self.name} are not listed in the app lists.' + for i, app in enumerate(self.apps): + if bin_found[i] == 0: + msg += f'\n - {app.build_dir}' + + msg += '\nMight be a issue of .build-test-rules.yml files' + print(msg) + return msg diff --git a/idf_ci/idf_pytest/plugin.py b/idf_ci/idf_pytest/plugin.py new file mode 100644 index 0000000..78e4d1e --- /dev/null +++ b/idf_ci/idf_pytest/plugin.py @@ -0,0 +1,139 @@ +# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import importlib.util +import logging +import os +import re +import sys +import typing as t +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from _pytest.fixtures import FixtureRequest +from pytest_embedded.plugin import multi_dut_argument, multi_dut_fixture + +from .models import PytestCase + +_MODULE_NOT_FOUND_REGEX = re.compile(r"No module named '(.+?)'") + + +def _try_import(path: Path): + spec = importlib.util.spec_from_file_location('', path) + # write these if to make mypy happy + if spec: + module = importlib.util.module_from_spec(spec) + if spec.loader and module: + spec.loader.exec_module(module) + + +class IdfPytestPlugin: + def __init__(self, *, cli_target: str) -> None: + self.cli_target = cli_target + + self._all_items_to_cases_d: t.Dict[pytest.Item, PytestCase] = {} + self._testing_items: t.Set[pytest.Item] = set() + + @property + def cases(self) -> t.List[PytestCase]: + return [c for i, c in self._all_items_to_cases_d.items() if i in self._testing_items] + + @pytest.fixture + @multi_dut_argument + def target(self, request: FixtureRequest) -> str: + _t = getattr(request, 'param', None) or request.config.getoption('target', None) + if not _t: + raise ValueError( + '"target" shall either be defined in pytest.mark.parametrize ' + 'or be passed in command line by --target' + ) + return _t + + @pytest.fixture + @multi_dut_argument + def config(self, request: FixtureRequest) -> str: + return getattr(request, 'param', None) or 'default' + + @pytest.fixture + @multi_dut_fixture + def build_dir( + self, + request: FixtureRequest, + app_path: str, + target: t.Optional[str], + config: t.Optional[str], + ) -> str: + """ + Check local build dir with the following priority: + + 1. build__ + 2. build_ + 3. build_ + 4. build + + Returns: + valid build directory + """ + check_dirs = [] + build_dir_arg = request.config.getoption('build_dir', None) + if build_dir_arg: + check_dirs.append(build_dir_arg) + if target is not None and config is not None: + check_dirs.append(f'build_{target}_{config}') + if target is not None: + check_dirs.append(f'build_{target}') + if config is not None: + check_dirs.append(f'build_{config}') + check_dirs.append('build') + + for check_dir in check_dirs: + binary_path = os.path.join(app_path, check_dir) + if os.path.isdir(binary_path): + logging.info(f'found valid binary path: {binary_path}') + return check_dir + + logging.warning('checking binary path: %s... missing... try another place', binary_path) + + raise ValueError( + f'no build dir valid. Please build the binary via "idf.py -B {check_dirs[0]} build" and run pytest again' + ) + + @pytest.hookimpl(tryfirst=True) + def pytest_pycollect_makemodule( + self, + module_path: Path, + ): + # no need to install third-party packages for collecting + # try to eliminate ModuleNotFoundError in test scripts + while True: + try: + _try_import(module_path) + except ModuleNotFoundError as e: + if res := _MODULE_NOT_FOUND_REGEX.search(e.msg): + # redirect_stderr somehow breaks the sys.stderr.write() method + # fix it when implement proper logging + pkg = res.group(1) + if sys.__stderr__: + sys.__stderr__.write(f'WARNING:Mocking missed package while collecting: {pkg}\n') + sys.modules[pkg] = MagicMock() + continue + else: + break + + def pytest_collection_modifyitems(self, items): + for item in items: + if case := PytestCase.from_item(item, cli_target=self.cli_target): + self._all_items_to_cases_d[item] = case + + # filter by target + if self.cli_target != 'all': + res = [] + for item, case in self._all_items_to_cases_d.items(): + if case.target_selector == self.cli_target: + res.append(item) + items[:] = res + + # add them to self._testing_items + for item in items: + self._testing_items.add(item) diff --git a/idf_ci/idf_pytest/script.py b/idf_ci/idf_pytest/script.py new file mode 100644 index 0000000..c239dff --- /dev/null +++ b/idf_ci/idf_pytest/script.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import io +import logging +import os.path +import typing as t +from contextlib import redirect_stderr, redirect_stdout + +import pytest +from _pytest.config import ExitCode + +from idf_ci._compat import UNDEF, PathLike, Undefined +from idf_ci.profiles import IniProfileManager + +from .models import PytestCase +from .plugin import IdfPytestPlugin + +LOGGER = logging.getLogger(__name__) + + +def get_pytest_cases( + paths: t.List[str], + target: str = 'all', + *, + profiles: t.List[PathLike] = UNDEF, # type: ignore +) -> t.List[PytestCase]: + if isinstance(profiles, Undefined): + profiles = ['default'] + + profile_o = IniProfileManager( + profiles, + os.path.join(os.path.dirname(__file__), '..', 'templates', 'default_test_profile.ini'), + ) + LOGGER.debug('config file: %s', profile_o.merged_profile_path) + LOGGER.debug('config file content: %s', profile_o.read(profile_o.merged_profile_path)) + + with io.StringIO() as out_b, io.StringIO() as err_b: + with redirect_stdout(out_b), redirect_stderr(err_b): + plugin = IdfPytestPlugin(cli_target=target) + res = pytest.main( + [ + *paths, + '--collect-only', + '-c', + profile_o.merged_profile_path, + '--rootdir', + os.getcwd(), + '--target', + 'all', + ], + plugins=[plugin], + ) + stdout_msg = out_b.getvalue() + stderr_msg = err_b.getvalue() + + if res == ExitCode.OK: + return plugin.cases + + raise RuntimeError( + f'pytest collection failed at {", ".join(paths)}.\n' f'stdout: {stdout_msg}\n' f'stderr: {stderr_msg}' + ) diff --git a/idf_ci/profiles.py b/idf_ci/profiles.py index 4acbba5..5889504 100644 --- a/idf_ci/profiles.py +++ b/idf_ci/profiles.py @@ -11,7 +11,7 @@ from tomlkit import dump, load -from .compat import PathLike +from ._compat import PathLike def _merge_dicts(source: t.Dict, target: t.Dict) -> t.Dict: @@ -51,7 +51,9 @@ def merge(self) -> None: @contextmanager def _merged_profile_writer(self) -> t.Generator[t.IO[str], None, None]: - with tempfile.NamedTemporaryFile(mode='w', delete=False) as fw: + # seems like .ini suffix is required to let pytest recognize that this is a config file + # otherwise -c won't work + with tempfile.NamedTemporaryFile(suffix='.ini', mode='w', delete=False) as fw: yield fw self._merged_profile_path = fw.name diff --git a/pyproject.toml b/pyproject.toml index 9517f0f..6be0393 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,8 @@ dependencies = [ "tomlkit", # we need both read and write # build related "idf-build-apps~=2.6", + # test related + "pytest-embedded-idf[serial]~=1.12", ] [project.urls] @@ -66,12 +68,18 @@ select = [ 'RUF', # ruff ] +[tool.ruff.lint.flake8-unused-arguments] +ignore-variadic-names = true + [tool.ruff.format] quote-style = "single" docstring-code-format = true [tool.mypy] python_version = "3.8" +[[tool.mypy.overrides]] +module = ["pytest_embedded.plugin.*"] +follow_untyped_imports = true [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/tests/test_plugin.py b/tests/test_plugin.py new file mode 100644 index 0000000..795438b --- /dev/null +++ b/tests/test_plugin.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +import textwrap + +import pytest + +from idf_ci import get_pytest_cases + + +class TestGetPytestCases: + @pytest.fixture(autouse=True) + def setup_test_scripts(self, tmp_path): + original_dir = os.getcwd() + os.chdir(tmp_path) + + test_script = tmp_path / f'pytest_{os.urandom(10).hex()}.py' + + test_script.write_text( + textwrap.dedent(""" + import not_exists + + import os + import tempfile + import pytest + + @pytest.mark.parametrize('target, config', [ + ('esp32s2', 'def_2'), + ('esp32s3', 'def_3'), + ], indirect=True) + def test_single_dut(dut, config): # FIXME: config shall not be mandatory + pass + + @pytest.mark.parametrize('count, target', [ + (2, 'esp32'), + (2, 'esp32s2|esp32s3'), + ], indirect=True) + def test_multi_dut(dut): + pass + + @pytest.mark.parametrize('count, app_path, target, config', [ + (3, None, 'esp32s2', None), + (2, 'subdir', 'esp32s3', 'foo'), + ], indirect=True) + def test_multi_dut_with_custom_app_path(dut, config): # FIXME: config shall not be mandatory + pass + + def test_no_param(): + pass + """) + ) + + yield + + os.chdir(original_dir) + + def test_collect_single_dut(self, temp_dir): + cases = get_pytest_cases([temp_dir], 'esp32s2') + assert len(cases) == 2 + assert cases[0].caseid == 'esp32s2.def_2.test_single_dut' + assert cases[0].apps[0].build_dir == os.path.join(temp_dir, 'build_esp32s2_def_2') + assert cases[1].caseid == 'esp32s2.default.test_no_param' + + cases = get_pytest_cases([temp_dir], 'esp32s3') + assert len(cases) == 2 + assert cases[0].caseid == 'esp32s3.def_3.test_single_dut' + assert cases[1].caseid == 'esp32s3.default.test_no_param' + + def test_collect_multi_dut(self, temp_dir): + cases = get_pytest_cases([temp_dir], 'esp32,esp32') + assert len(cases) == 1 + assert cases[0].caseid == "('esp32', 'esp32').('default', 'default').test_multi_dut" + assert len(cases[0].apps) == 2 + assert cases[0].apps[0].build_dir == os.path.join(temp_dir, 'build_esp32_default') + assert cases[0].apps[1].build_dir == os.path.join(temp_dir, 'build_esp32_default') + + def test_collect_multi_dut_with_custom_app_path(self, temp_dir): + cases = get_pytest_cases([temp_dir], 'esp32s2,esp32s2,esp32s2') + assert len(cases) == 1 + assert ( + cases[0].caseid + == "('esp32s2', 'esp32s2', 'esp32s2').('default', 'default', 'default').test_multi_dut_with_custom_app_path" + ) + assert cases[0].apps[0].build_dir == os.path.join(temp_dir, 'build_esp32s2_default') + assert cases[0].apps[1].build_dir == os.path.join(temp_dir, 'build_esp32s2_default') + assert cases[0].apps[2].build_dir == os.path.join(temp_dir, 'build_esp32s2_default') + + cases = get_pytest_cases([temp_dir], 'esp32s3,esp32s3') + assert len(cases) == 1 + assert cases[0].caseid == "('esp32s3', 'esp32s3').('foo', 'foo').test_multi_dut_with_custom_app_path" + assert cases[0].apps[0].build_dir == os.path.join(temp_dir, 'subdir', 'build_esp32s3_foo') + assert cases[0].apps[1].build_dir == os.path.join(temp_dir, 'subdir', 'build_esp32s3_foo')