Skip to content

Commit

Permalink
feat: support get_pytest_cases
Browse files Browse the repository at this point in the history
  • Loading branch information
hfudev committed Jan 8, 2025
1 parent e4eb9ea commit 2a5c1c4
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ repos:
- click
- pytest
- tomlkit
- idf-build-apps~=2.6
- pytest-embedded~=1.12
16 changes: 16 additions & 0 deletions idf_ci/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,18 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

from .idf_pytest.models import PytestApp, PytestCase
from .idf_pytest.plugin import IdfPytestPlugin
from .idf_pytest.script import get_pytest_cases
from .profiles import IniProfileManager, TomlProfileManager
from .settings import CiSettings

__all__ = [
'CiSettings',
'IdfPytestPlugin',
'IniProfileManager',
'PytestApp',
'PytestCase',
'TomlProfileManager',
'get_pytest_cases',
]
15 changes: 15 additions & 0 deletions idf_ci/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

import os
import typing as t

PathLike = t.Union[str, os.PathLike]


class Undefined(str):
def __new__(cls, *args, **kwargs):
return super().__new__(cls, 'undefined')


UNDEF = Undefined()
5 changes: 0 additions & 5 deletions idf_ci/compat.py → idf_ci/idf_pytest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

import os
import typing as t

PathLike = t.Union[str, os.PathLike]
177 changes: 177 additions & 0 deletions idf_ci/idf_pytest/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

import logging
import os
import typing as t
from functools import cached_property

from _pytest.python import Function
from idf_build_apps.utils import to_list
from pytest_embedded.plugin import parse_multi_dut_args

LOGGER = logging.getLogger(__name__)


class PytestApp:
"""
Represents a pytest app.
"""

def __init__(self, path: str, target: str, config: str) -> None:
self.path = os.path.abspath(path)
self.target = target
self.config = config

def __hash__(self) -> int:
return hash((self.path, self.target, self.config))

@property
def build_dir(self) -> str:
"""
Returns the build directory for the app.
.. note::
Matches the build_dir (by default build_@t_@w) in the build profile.
:return: The build directory for the app.
"""
return os.path.join(self.path, f'build_{self.target}_{self.config}')


class PytestCase:
"""
Represents a pytest test case.
"""

def __init__(self, apps: t.List[PytestApp], item: Function) -> None:
self.apps = apps
self.item = item

@classmethod
def get_param(cls, item: Function, key: str, default: t.Any = None) -> t.Any:
# funcargs is not calculated while collection
# callspec is something defined in parametrize
if not hasattr(item, 'callspec'):
return default

return item.callspec.params.get(key, default) or default

@classmethod
def from_item(cls, item: Function, *, cli_target: str) -> t.Optional['PytestCase']:
"""
Turn pytest item to PytestCase
"""
count = cls.get_param(item, 'count', 1)

# default app_path is where the test script locates
app_paths = to_list(parse_multi_dut_args(count, cls.get_param(item, 'app_path', os.path.dirname(item.path))))
configs = to_list(parse_multi_dut_args(count, cls.get_param(item, 'config', 'default')))
targets = to_list(parse_multi_dut_args(count, cls.get_param(item, 'target'))) # defined fixture

cli_targets = cli_target.split(',')
if count > 1 and targets == [None] * count:
targets = cli_targets
elif targets is None:
if count == len(cli_targets):
LOGGER.debug('No param "target" for test case "%s", use CLI target "%s"', item.name, cli_target)
targets = cli_targets
else:
LOGGER.warning(
'No param "target" for test case "%s". '
'current DUT count is %d, while CLI target count is %d. Skipping the test.',
item.name,
count,
len(cli_targets),
)
return None

return PytestCase(
apps=[PytestApp(app_paths[i], targets[i], configs[i]) for i in range(count)],
item=item,
)

def __hash__(self) -> int:
return hash((self.path, self.name, self.apps, self.all_markers))

@cached_property
def path(self) -> str:
return str(self.item.path)

@cached_property
def name(self) -> str:
return self.item.originalname

@cached_property
def targets(self) -> t.List[str]:
return [app.target for app in self.apps]

@cached_property
def configs(self) -> t.List[str]:
return [app.config for app in self.apps]

@cached_property
def caseid(self) -> str:
if self.is_single_dut:
return f'{self.targets[0]}.{self.configs[0]}.{self.name}'
else:
return f'{tuple(self.targets)}.{tuple(self.configs)}.{self.name}'

@cached_property
def is_single_dut(self) -> bool:
return True if len(self.apps) == 1 else False

@cached_property
def is_host_test(self) -> bool:
return 'host_test' in self.all_markers or 'linux' in self.targets

@cached_property
def is_in_ci(self) -> bool:
return 'CI_JOB_ID' in os.environ or 'GITHUB_ACTIONS' in os.environ

@cached_property
def target_selector(self) -> str:
return ','.join(app.target for app in self.apps)

# the following markers could be changed dynamically, don't use cached_property
@property
def all_markers(self) -> t.Set[str]:
return {marker.name for marker in self.item.iter_markers()}

def all_built_in_app_lists(self, app_lists: t.Optional[t.List[str]] = None) -> t.Optional[str]:
"""
Check if all binaries of the test case are built in the app lists.
:param app_lists: app lists to check
:return: debug string if not all binaries are built in the app lists, None otherwise
"""
if app_lists is None:
# ignore this feature
return None

bin_found = [0] * len(self.apps)
for i, app in enumerate(self.apps):
if app.build_dir in app_lists:
bin_found[i] = 1

if sum(bin_found) == 0:
msg = f'Skip test case {self.name} because all following binaries are not listed in the app lists: '
for app in self.apps:
msg += f'\n - {app.build_dir}'

print(msg)
return msg

if sum(bin_found) == len(self.apps):
return None

# some found, some not, looks suspicious
msg = f'Found some binaries of test case {self.name} are not listed in the app lists.'
for i, app in enumerate(self.apps):
if bin_found[i] == 0:
msg += f'\n - {app.build_dir}'

msg += '\nMight be a issue of .build-test-rules.yml files'
print(msg)
return msg
139 changes: 139 additions & 0 deletions idf_ci/idf_pytest/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0

import importlib.util
import logging
import os
import re
import sys
import typing as t
from pathlib import Path
from unittest.mock import MagicMock

import pytest
from _pytest.fixtures import FixtureRequest
from pytest_embedded.plugin import multi_dut_argument, multi_dut_fixture

from .models import PytestCase

_MODULE_NOT_FOUND_REGEX = re.compile(r"No module named '(.+?)'")


def _try_import(path: Path):
spec = importlib.util.spec_from_file_location('', path)
# write these if to make mypy happy
if spec:
module = importlib.util.module_from_spec(spec)
if spec.loader and module:
spec.loader.exec_module(module)


class IdfPytestPlugin:
def __init__(self, *, cli_target: str) -> None:
self.cli_target = cli_target

self._all_items_to_cases_d: t.Dict[pytest.Item, PytestCase] = {}
self._testing_items: t.Set[pytest.Item] = set()

@property
def cases(self) -> t.List[PytestCase]:
return [c for i, c in self._all_items_to_cases_d.items() if i in self._testing_items]

@pytest.fixture
@multi_dut_argument
def target(self, request: FixtureRequest) -> str:
_t = getattr(request, 'param', None) or request.config.getoption('target', None)
if not _t:
raise ValueError(
'"target" shall either be defined in pytest.mark.parametrize '
'or be passed in command line by --target'
)
return _t

@pytest.fixture
@multi_dut_argument
def config(self, request: FixtureRequest) -> str:
return getattr(request, 'param', None) or 'default'

@pytest.fixture
@multi_dut_fixture
def build_dir(
self,
request: FixtureRequest,
app_path: str,
target: t.Optional[str],
config: t.Optional[str],
) -> str:
"""
Check local build dir with the following priority:
1. build_<target>_<config>
2. build_<target>
3. build_<config>
4. build
Returns:
valid build directory
"""
check_dirs = []
build_dir_arg = request.config.getoption('build_dir', None)
if build_dir_arg:
check_dirs.append(build_dir_arg)
if target is not None and config is not None:
check_dirs.append(f'build_{target}_{config}')
if target is not None:
check_dirs.append(f'build_{target}')
if config is not None:
check_dirs.append(f'build_{config}')
check_dirs.append('build')

for check_dir in check_dirs:
binary_path = os.path.join(app_path, check_dir)
if os.path.isdir(binary_path):
logging.info(f'found valid binary path: {binary_path}')
return check_dir

logging.warning('checking binary path: %s... missing... try another place', binary_path)

raise ValueError(
f'no build dir valid. Please build the binary via "idf.py -B {check_dirs[0]} build" and run pytest again'
)

@pytest.hookimpl(tryfirst=True)
def pytest_pycollect_makemodule(
self,
module_path: Path,
):
# no need to install third-party packages for collecting
# try to eliminate ModuleNotFoundError in test scripts
while True:
try:
_try_import(module_path)
except ModuleNotFoundError as e:
if res := _MODULE_NOT_FOUND_REGEX.search(e.msg):
# redirect_stderr somehow breaks the sys.stderr.write() method
# fix it when implement proper logging
pkg = res.group(1)
if sys.__stderr__:
sys.__stderr__.write(f'WARNING:Mocking missed package while collecting: {pkg}\n')
sys.modules[pkg] = MagicMock()
continue
else:
break

def pytest_collection_modifyitems(self, items):
for item in items:
if case := PytestCase.from_item(item, cli_target=self.cli_target):
self._all_items_to_cases_d[item] = case

# filter by target
if self.cli_target != 'all':
res = []
for item, case in self._all_items_to_cases_d.items():
if case.target_selector == self.cli_target:
res.append(item)
items[:] = res

# add them to self._testing_items
for item in items:
self._testing_items.add(item)
Loading

0 comments on commit 2a5c1c4

Please sign in to comment.