-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introduce landlock based sandboxing
Co-authored-by: Quentin Kaiser <[email protected]>
- Loading branch information
Showing
7 changed files
with
198 additions
and
4 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from unblob.processing import ExtractionConfig | ||
from unblob.sandbox import Sandbox | ||
|
||
|
||
@pytest.fixture | ||
def log_path(tmp_path): | ||
return tmp_path / "unblob.log" | ||
|
||
|
||
@pytest.fixture | ||
def extraction_config(extraction_config, tmp_path): | ||
extraction_config.extract_root = tmp_path / "extract" / "root" | ||
# parent has to exist | ||
extraction_config.extract_root.parent.mkdir() | ||
return extraction_config | ||
|
||
|
||
@pytest.fixture | ||
def sandbox(extraction_config: ExtractionConfig, log_path: Path): | ||
return Sandbox(extraction_config, log_path, None) | ||
|
||
|
||
def test_necessary_resources_can_be_created_in_sandbox( | ||
sandbox: Sandbox, extraction_config: ExtractionConfig, log_path: Path | ||
): | ||
directory_in_extract_root = extraction_config.extract_root / "path" / "to" / "dir" | ||
file_in_extract_root = directory_in_extract_root / "file" | ||
|
||
sandbox.run(extraction_config.extract_root.mkdir, parents=True) | ||
sandbox.run(directory_in_extract_root.mkdir, parents=True) | ||
|
||
sandbox.run(file_in_extract_root.touch) | ||
sandbox.run(file_in_extract_root.write_text, "file content") | ||
|
||
# log-file is already opened | ||
log_path.touch() | ||
sandbox.run(log_path.write_text, "log line") | ||
|
||
|
||
def test_access_outside_sandbox_is_not_possible(sandbox: Sandbox, tmp_path: Path): | ||
unrelated_dir = tmp_path / "unrelated" / "path" | ||
unrelated_file = tmp_path / "unrelated-file" | ||
|
||
with pytest.raises(PermissionError): | ||
sandbox.run(unrelated_dir.mkdir, parents=True) | ||
|
||
with pytest.raises(PermissionError): | ||
sandbox.run(unrelated_file.touch) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import ctypes | ||
import sys | ||
import threading | ||
from pathlib import Path | ||
from typing import Callable, Iterable, Optional, Type, TypeVar | ||
|
||
from structlog import get_logger | ||
from unblob_native.sandbox import ( | ||
AccessFS, | ||
SandboxError, | ||
restrict_access, | ||
) | ||
|
||
if sys.version_info >= (3, 10): | ||
from typing import ParamSpec | ||
else: | ||
from typing_extensions import ParamSpec | ||
|
||
from unblob.processing import ExtractionConfig | ||
|
||
logger = get_logger() | ||
|
||
P = ParamSpec("P") | ||
R = TypeVar("R") | ||
|
||
|
||
class Sandbox: | ||
"""Configures restricted file-systems to run functions in. | ||
When calling ``run()``, a separate thread will be configured with | ||
minimum required file-system permissions. All subprocesses spawned | ||
from that thread will honor the restrictions. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
config: ExtractionConfig, | ||
log_path: Path, | ||
report_file: Optional[Path], | ||
extra_restrictions: Iterable[AccessFS] = (), | ||
): | ||
self.restrictions = [ | ||
# Python, shared libraries, extractor binaries and so on | ||
AccessFS.read("/"), | ||
# Multiprocessing | ||
AccessFS.read_write("/dev/shm"), # noqa: S108 | ||
# Extracted contents | ||
AccessFS.read_write(config.extract_root), | ||
AccessFS.make_dir(config.extract_root.parent), | ||
AccessFS.read_write(log_path), | ||
*extra_restrictions, | ||
] | ||
|
||
if report_file: | ||
self.restrictions += [ | ||
AccessFS.read_write(report_file), | ||
AccessFS.make_reg(report_file.parent), | ||
] | ||
|
||
def run(self, callback: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: | ||
"""Run callback with restricted filesystem access.""" | ||
exception = None | ||
result = None | ||
|
||
def _run_in_thread(callback, *args, **kwargs): | ||
nonlocal exception, result | ||
|
||
self._try_enter_sandbox() | ||
try: | ||
result = callback(*args, **kwargs) | ||
except BaseException as e: | ||
Check notice Code scanning / CodeQL Except block handles 'BaseException' Note
Except block directly handles BaseException.
|
||
exception = e | ||
|
||
thread = threading.Thread( | ||
target=_run_in_thread, args=(callback, *args), kwargs=kwargs | ||
) | ||
thread.start() | ||
|
||
try: | ||
thread.join() | ||
except KeyboardInterrupt: | ||
raise_in_thread(thread, KeyboardInterrupt) | ||
thread.join() | ||
|
||
if exception: | ||
raise exception # pyright: ignore[reportGeneralTypeIssues] | ||
return result # pyright: ignore[reportReturnType] | ||
|
||
def _try_enter_sandbox(self): | ||
try: | ||
restrict_access(*self.restrictions) | ||
except SandboxError: | ||
logger.warning( | ||
"Sandboxing FS access is unavailable on this system, skipping." | ||
) | ||
|
||
|
||
def raise_in_thread(thread: threading.Thread, exctype: Type) -> None: | ||
if thread.ident is None: | ||
raise RuntimeError("Thread is not started") | ||
|
||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc( | ||
ctypes.c_ulong(thread.ident), ctypes.py_object(exctype) | ||
) | ||
|
||
# success | ||
if res == 1: | ||
return | ||
|
||
# Need to revert the call to restore interpreter state | ||
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(thread.ident), None) | ||
|
||
# Thread could have exited since | ||
if res == 0: | ||
return | ||
|
||
# Something bad have happened | ||
raise RuntimeError("Could not raise exception in thread", thread.ident) |