diff --git a/.gitignore b/.gitignore index 4c9b6b7..76e7a0b 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,13 @@ __pycache__/ *.py[cod] *$py.class +# Cython-generated C/C++ files +*.c +*.cpp + +# Cython-generated compilation annotations +*.html + # C extensions *.so diff --git a/.pep8speaks.yml b/.pep8speaks.yml index c9ed0d2..7dd3a5c 100644 --- a/.pep8speaks.yml +++ b/.pep8speaks.yml @@ -9,6 +9,7 @@ pycodestyle: - E241 # multiple spaces after ':' - E704 # multiple statements on one line (def) - W503 # line break before binary operator + - E722 # do not use bare 'except' # - E402 # module level import not at top of file # - E731 # do not assign a lambda expression, use a def # - C406 # Unnecessary list literal - rewrite as a dict literal. diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..97dfb56 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.linting.enabled": true, + "cSpell.language": "it-IT,en-GB" +} diff --git a/README.md b/README.md index 0953c43..08801ba 100644 --- a/README.md +++ b/README.md @@ -145,18 +145,39 @@ python -O src/nome-del-file.py ``` Di default, la libreria per leggere i dati è `PyROOT` (quando installata); altrimenti, viene utilizzata `uproot`. -Per forzare l'utilizzo di `uproot` anche quando `PyROOT` è installata, impostare la variabile d'ambiente `FORCE_UPROOT`. +Per forzare l'utilizzo di `uproot` anche quando `PyROOT` è installata, impostare la variabile d'ambiente `FORCE_UPROOT` (il valore assegnato non è importante, basta che in Python si auto-converta in `True` – per esempio, `1`, `42`, `__import__("math").pi` \[sconsigliato], o `True` stesso). +Per disabilitare `FORCE_UPROOT`, assegnare un valore che in Python si auto-converta in `False`, come `0`, `list()` \[sconsigliato] o `False` stesso. Alternativamente, rimuovere la variabile d'ambiente (assegnandole un valore nullo, `FORCE_UPROOT=`). + Su UNIX: ```bash -export FORCE_UPROOT=1 # Anche '=True' va bene +# Imposta la variabile d'ambiente per tutta la sessione +export FORCE_UPROOT=1 +# Esegui normalmente i programmi python src/file.py -``` +# Più avanti, per disattivarla, si dovrà rimuoverla... +export FORCE_UPROOT="" +# ... o impostarla a `False` +export FORCE_UPROOT=0 -O, per evitare di usare `export`: +# --- oppure ---- -```bash +# Imposta la variabile d'ambiente solo per questo comando FORCE_UPROOT=1 python src/file.py +# Dopo che il comando è stato eseguito, la variabile d'ambiente *non* è più impostata. +``` + +Su Windows: + +```powershell +# Imposta la variabile d'ambiente +set FORCE_UPROOT=1 +# Esegui normalmente i programmi +python src/file.py +# Più avanti, la si dovrà rimuovere... +set FORCE_UPROOT= +# ... o impostare a `False` +set FORCE_UPROOT=0 ``` ### Stagisti diff --git a/compile.py b/compile.py new file mode 100644 index 0000000..f7c18b4 --- /dev/null +++ b/compile.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Un programma di utility che compila in Cython i moduli richiesti. + + python compile.py [COMMAND] [ARGUMENTS] + python compile.py help + python compile.py commands +""" +import os +from pathlib import Path +import sys +from typing import NoReturn +import subprocess +import shutil + + +CYTHON_VERSION = "3.0.0a10" + + +def _cython_dep_error() -> NoReturn: + print(f"""\ +ERROR: No compatible Cython version found. + +Please install this Cython version: + pip install Cython={CYTHON_VERSION} +""", file=sys.stderr) + sys.exit(1) + + +try: + import cython + from Cython.Build.Cythonize import main as cythonize +except ModuleNotFoundError: + _cython_dep_error() +else: + if cython.__version__ != CYTHON_VERSION: + _cython_dep_error() + + +SRC = Path(__file__).parent / "src" +TARGETS = [f.stem for f in SRC.glob("*.py")] +PYTHON_FRAMES: bool = True + + +def list_targets() -> None: + """Ottieni una lista di tutti i moduli disponibili. + + python compile.py list + """ + print("all", *TARGETS, sep=", ") + + +def build(*targets: str) -> int: + """Compila con Cython i moduli specificati. + + python compile.py build *[TARGETS] + python compile.py build log root stagisti + """ + if "all" in targets: + return build(*TARGETS) + for target in targets: + if target not in TARGETS: + continue + sources = [str(f.resolve()) for f in [SRC / f"{target}.py", SRC / f"{target}.pxd"] if f.exists()] + print(f"--> Building {target} ({', '.join(sources)})") + try: + args = [ + "-3i", "--annotate-fullc", + "-j", str(os.cpu_count()), + # "-X", f"linetrace={PYTHON_FRAMES}", + # "-X", f"profile={PYTHON_FRAMES}", + "--lenient", + *sources, + ] + print(f"$ cythonize {' '.join(args)}") + cythonize(args) + except SystemExit as e: + return e.code + return 0 + + +def rm(*paths: str | Path): + """Elimina i file e le cartelle in `paths`.""" + for path in paths: + if not isinstance(path, Path): + path = Path(path) + if not path.exists(): + continue + print(f"Removing {path.relative_to(SRC.parent)}") + if path.is_dir(): + shutil.rmtree(path) + else: + os.unlink(path) + + +def clean(*targets) -> None: + """Rimuovi gli elementi creati durante la `build`. + + python compile.py clean *[TARGETS] + python compile.py clean root log + python compile.py clean all + python compile.py clean + """ + if not targets or "all" in targets: + rm( + *SRC.glob("*.c"), + *SRC.glob("*.html"), + *SRC.glob("*.so"), + *SRC.glob("*.pyd"), + SRC / "build", + ) + return + for target in targets: + rm( + SRC / f"{target}.c", + SRC / f"{target}.html", + *SRC.glob(f"{target}.*.so"), + *SRC.glob(f"build/lib.*/{target}.*.so"), + ) + + +RUN = r"""\ +print(f'\n--> Importing $$') +import $$ +func = getattr($$, 'main', getattr($$, 'test', None)) +print(f'\n--> $$ has been imported from {$$.__file__}') +if func: + print(f'--> Running $$.{func.__name__}()') + func() +""" + + +def run(*argv: str) -> int: + """Compila ed esegui il modulo dato con gli argomenti dati. + + python compile.py run *[OPZIONI PYTHON] [PROGRAMMA] *[ARGOMENTI/OPZIONI PROGRAMMA] + python compile.py run -O root -vv data.root + """ + args = list(argv) + target = "" + for arg in args: + if not arg.startswith("-"): + target = arg + break + if not target: + raise ValueError("A target must be specified!") + build(target) + os.chdir(SRC) + index = args.index(target) + args[index] = RUN.replace("$$", target) + args.insert(index, "-c") + args.insert(0, sys.executable) + return subprocess.run(args, check=False).returncode + + +def help(cmd: str | None = None, /) -> None: + """Get help for a given command. + + python compile.py help [COMMAND] + python compile.py help commands + """ + if cmd is None: + print(__doc__) + help("help") + else: + print(COMMANDS.get(cmd, help).__doc__) + + +def list_commands() -> None: + """List the available commands. + + python compile.py commands + """ + print(*COMMANDS, sep=" ") + + +COMMANDS = dict( + run=run, + build=build, + clean=clean, + list=list_targets, + help=help, + commands=list_commands, +) + + +def cli(argv: list[str]) -> int | None: + """Interfaccia da riga di comando.""" + if len(argv) < 1: + return help() + first = argv.pop(0) + if first in COMMANDS: + cmd = COMMANDS[first] + else: + cmd = build + argv = [first] + argv + return cmd(*argv) + + +if __name__ == "__main__": + sys.exit(cli(sys.argv[1:]) or 0) diff --git a/pyproject.toml b/pyproject.toml index 0ab736b..70398eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,33 @@ ignore = [ "E221", # multiple spaces before operator "E241", # multiple spaces after ':' "E704", # multiple statements on one line (def) - "W503" # line break before binary operator + "E722", # bare except + "W503", # line break before binary operator ] in-place = true recursive = true aggressive = 3 + + +[tool.pylint.main] +max-line-length = 120 + +[tool.pylint."MESSAGES CONTROL"] +disable = [ + # Errors + # Warnings + "W0123", # use of eval + "W0621", # Redefining name '*' from outer scope + "W0622", # Redefining built-in '*' + "W0702", # No exception type(s) specified + "W1203", # Use %s formatting in logging functions + # Conventions + "C0103", # Variable name "*" doesn't conform to snake_case naming style + # Refactoring + "R0903", # too few public methods (*/2) + "R0912", # too many branches (*/12) + "R0914", # Too many local variables (*/15) + "R0915", # Too many statements (*/50) + "R1704", # Redefining argument with the local name '*' + "E0611", # No name * in module * +] diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..d1b2896 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[pycodestyle] +max-line-length = 120 +ignore = E221,E241,E704,E722,W503 +statistics = True + diff --git a/src/classe.py b/src/classe.py index 7da75bc..f414eea 100755 --- a/src/classe.py +++ b/src/classe.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Per comprendere il funzionamento delle classi.""" +# pylint: disable=protected-access,no-member from __future__ import annotations class Rettangolo: + """Classe che “simula” un rettangolo.""" + a: float b: float @@ -15,18 +18,22 @@ def __init__(self, a: float, b: float) -> None: @property def area(self) -> float: + """Calcola l'area del rettangolo.""" return self.a * self.b @property def perimetro(self) -> float: + """Calcola il perimetro del rettangolo.""" return (self.a + self.b) * 2 def __mul__(self, i: int | float) -> Rettangolo: + """Operazione di moltiplicazione.""" if not isinstance(i, (int, float)): return NotImplemented return Rettangolo(self.a * i, self.b * i) def __repr__(self) -> str: + """Rappresentazione dell'oggetto come stringa.""" return f"" @@ -38,19 +45,23 @@ def __init__(self, lato: float) -> None: super().__init__(lato, lato) def __repr__(self) -> str: + """Rappresentazione dell'oggetto come stringa.""" return f"" def __mul__(self, i: int | float) -> Quadrato: + """Operazione di moltiplicazione.""" # Va ridefinito perché deve ritornare un `Quadrato`, non un `Rettangolo`. return Quadrato(self.lato * i) @property def lato(self): + """Il lato di questo quadrato.""" return self.a # o `self.b`, in realtà è indifferente class Account: """Una classe per dimostrare l'utilizzo di `@property` e le variabili “private”.""" + __money: float def __init__(self) -> None: @@ -64,32 +75,34 @@ def money(self): def pay(self, price: float) -> None: """Paga (= rimuovi dal conto) €`price`.""" - # Senza `abs(...)` un prezzo negativo aumenterebbe i soldi sul conto. Così, il segno viene semplicemente ignorato. + # Senza `abs(...)` un prezzo negativo aumenterebbe i soldi sul conto. + # Così, il segno viene semplicemente ignorato. self.__money -= abs(price) def __repr__(self) -> str: + """Rappresentazione dell'oggetto come stringa.""" return f"" - -def privatevar_example(): +def private_var_example(): """Dimostrazione delle variabili private in Python.""" + # type: ignore # print(f"{x:.2f}") -> stampa a schermo `x` con 2 cifre decimali a = Account() print(f"€{a.money:.2f}") # €200.00 a.pay(3.14) print(f"€{a.money:.2f}") # €196.86 - #print(a.__money) # AttributeError! - #a.__money += 100 # AttributeError! + # print(a.__money) # AttributeError! + # a.__money += 100 # AttributeError! # Purtroppo, però, in Python le variabili “private” non sono veramente private... # per accedervi, utilizzare .___: - print(f"€{a._Account__money:.2f}") # €196.86 + print(f"€{a._Account__money:.2f}") # €196.86 a._Account__money += 100 # Modifico la variabile “privata” - print(f"€{a.money:.2f}") # €296.86 - + print(f"€{a.money:.2f}") # €296.86 def inheritance_example(): + """Dimostrazione dell'«albero» delle classi.""" a = Quadrato(3) print(f"{a.lato=}") print(f"{a.perimetro=}") @@ -102,15 +115,16 @@ def inheritance_example(): print(f"{Quadrato(10).area=}") - print(Quadrato(10)) # Scrive la stringa ritornata da `Quadrato.__repr__(...)` - print(Quadrato(10)*2) # Equivale a `Quadrato(10).__mul__(2)` - print(Quadrato(10)*Quadrato(10)) # TypeError! (`__mul__(...)`, ovvero l'operazione `*`, è definito solo per interi o decimali, non altri quadrati) + print(Quadrato(10)) # Scrive la stringa ritornata da `Quadrato.__repr__(...)` + print(Quadrato(10) * 2) # Equivale a `Quadrato(10).__mul__(2)` + print(Quadrato(10) * Quadrato(10)) # TypeError! + # (`__mul__(...)`, ovvero l'operazione `*`, è definito solo per interi o decimali, non altri quadrati) if __name__ == "__main__": print("~~~ classe.py ~~~") - # # # # # # # # # # # # # # # # # # # # # # # + # # # # # # # # # # # # # # # # # # # # # # # # Decommenta la riga di interesse qua sotto # - # # # # # # # # # # # # # # # # # # # # # # # - #privatevar_example() - #inheritance_example() + # # # # # # # # # # # # # # # # # # # # # # # + # privatevar_example() + # inheritance_example() diff --git a/src/ideal.pxd b/src/ideal.pxd new file mode 100644 index 0000000..84e4349 --- /dev/null +++ b/src/ideal.pxd @@ -0,0 +1,16 @@ +import cython + + +@cython.locals( + TOT=cython.ulonglong, + K=cython.ulonglong, + N_in=cython.ulonglong, + N_out=cython.ulonglong, + X=cython.ulonglong, + Y=cython.ulonglong, + v=cython.ulonglong, +) +cpdef double grid(unsigned long long N) + + +cpdef void main() diff --git a/src/ideal.py b/src/ideal.py index 1448673..645be42 100755 --- a/src/ideal.py +++ b/src/ideal.py @@ -1,17 +1,23 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import matplotlib.pyplot as plt -from math import pi as PI +"""Ciò che succederebbe con un dataset ideale.""" +from __future__ import annotations +import sys +# from math import pi as PI +# import matplotlib.pyplot as plt def grid(N): + """Calcola π sia per eccesso e per difetto su una griglia di lato `N`.""" TOT = N**2 - K = (N - 1) ** 2 + squares = [x**2 for x in range(N)] + # K = (N - 1) ** 2 + K = squares[-1] # l'ultimo valore di `squares` è in effetti K N_in = 0 N_out = 0 - for x in range(N): - for y in range(N): - v = x**2 + y**2 - K + for X in squares: + for Y in squares: + v = X + Y - K if v == 0: N_in += 1 N_out += 1 @@ -22,24 +28,30 @@ def grid(N): pim = N_in * 4 / TOT piM = (TOT - N_out) * 4 / TOT pi = (pim + piM) / 2 - print(N, pim, piM, pi, sep=" \t") + print(f"{N}\t{pim:01.15f}\t{piM:01.15f}\t{pi:01.15f}") return pi -def theoretical(N, case): - TOT = N**2 - N_in = int(PI * TOT / 4) - if case == 0: - print(N_in) - elif case == 1: - print(N_in * 4 / TOT) - print(abs(PI - N_in * 4 / TOT)) - return N_in +# def theoretical(N, case): +# TOT = N**2 +# N_in = int(PI * TOT / 4) +# if case == 0: +# print(N_in) +# elif case == 1: +# print(N_in * 4 / TOT) +# print(abs(PI - N_in * 4 / TOT)) +# return N_in -if __name__ == "__main__": - for N in range(1, 1024): - grid(N) +def main(): + """Main program.""" + N = 1 + while True: + try: + grid(N) + N += 1 + except KeyboardInterrupt: + sys.exit(0) # theoretical(N, case) # diff = [] # for i in range(500): @@ -48,3 +60,7 @@ def theoretical(N, case): # print(diff) # plt.plot(diff) # plt.show() + + +if __name__ == "__main__": + main() diff --git a/src/ideal.pyi b/src/ideal.pyi new file mode 100644 index 0000000..fcd3984 --- /dev/null +++ b/src/ideal.pyi @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + + +def grid(N: int) -> float: + ... + + +def main() -> None: + ... diff --git a/src/log.py b/src/log.py new file mode 100644 index 0000000..691bd64 --- /dev/null +++ b/src/log.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +# -*- coding : utf-8 -*- +"""Utility module: logging support.""" +from __future__ import annotations +from typing import Any, Iterator, cast +from logging import NOTSET, DEBUG, INFO, WARNING, ERROR, CRITICAL +from contextlib import contextmanager, _GeneratorContextManager +from io import StringIO +import logging +import inspect +import time +import os +import sys + + +# Prova ad importare `rich`, se possibile +NO_RICH = bool(eval(os.environ.get("NO_RICH", "") or "0")) +RICH: bool +try: + if NO_RICH: + raise ModuleNotFoundError + import rich.console + import rich.markup + import rich.logging + import rich.highlighter +except ModuleNotFoundError: + RICH = False +else: + RICH = True + + +__all__ = [ + # Exported from `logging` + "NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", + # Defined here + "TIMESTAMP", "DEFAULT_LEVEL", "ICONS", "STYLES", + "ConsoleFormatter", "Logger", + "cli_configure", "getLogger", + "debug", "info", "warning", "error", "critical", "exception", "task", +] + + +def style(message: str, style: str) -> str: + """Apply the given `style` to `message` only if `rich` is available.""" + if RICH: + return f"[{style}]{message}[/]" + return message + + +def sprint(*values, sep: str = " ", end: str = "\n", style: str = ""): + """Print styled text to console.""" + console = rich.console.Console(highlight=False) + if RICH: + if style: + io = StringIO() + print(*values, sep=sep, end=end, file=io, flush=True) + io.seek(0) + console.print(f"[{style}]{io.read()}[/{style}]", end="") + else: + console.print(*values, sep=sep, end=end) + else: + print(*values, sep=sep, end=end, flush=True) + + +def getLogger(name: str | None = None, /, *, depth: int = 0) -> Logger: + """Get the logger associated with this given name. + + If no name is specified, get the logger of the module + that called this function. + """ + if not name: + try: + return getLogger( + inspect.stack()[1 + depth].frame.f_globals["__name__"] + if name is None else name + ) + except IndexError: + getLogger(__name__).critical( + "Could not resolve `__name__` from an outer frame.\n" + "There may be a problem with the interpreter frame stack, " + "most probably due to the caller module being Cython-compiled. " + "Please either switch from `(...)` to `getLogger(__name__).(...)` syntax, " + "or avoid Cython-compiling that module." + ) + sys.exit(1) + if name == "root": + name = "root_" + return cast(Logger, logging.getLogger(name)) + + +def taskLogger(module: str | None = None, /, id: str = "", *, depth: int = 0) -> Logger: + """Get the task logger for the module that's calling this function.""" + tl = getLogger(module, depth=1 + depth).getChild("task") + if id: + return tl.getChild(id) + return tl + + +TIMESTAMP: bool = True + +ICONS = { + NOTSET: "[ ]", + DEBUG: "[#]", + INFO: "[i]", + WARNING: "[!]", + ERROR: "[x]", + CRITICAL: "{x}", +} + +STYLES = { + NOTSET: "normal", + DEBUG: "dim", + INFO: "cyan", + WARNING: "yellow", + ERROR: "red1", + CRITICAL: "bold red", +} + +DEFAULT_LEVEL = INFO if __debug__ else WARNING # '-O' works like a '-q' + +if RICH: + MESSAGE_FORMAT = ( + "{x} {message}", + "[/][dim][bold]{took}[/bold][{asctime}]" + ) +else: + MESSAGE_FORMAT = ( + "{x} {message}", + "{took}[{asctime}]" + ) + +TASK_MESSAGE = style("--> {}", "bold") + +_setup_done: bool = False + + +class ConsoleFormatter(logging.Formatter): + """A customized logging formatter.""" + + def __init__(self, lfmt: str, rfmt: str, *args, **kwargs) -> None: + """Left and right formatter strings.""" + lfmt, rfmt = lfmt.replace("\0", ""), rfmt.replace("\0", "") + super().__init__(lfmt + "\0" + rfmt, *args, **kwargs) + + def format(self, record: logging.LogRecord) -> str: + """Correctly format `record`.""" + # Activate console markup + if RICH: + setattr(record, "markup", True) + # Make the `icon` available and escape it if necessary + icon = ICONS[record.levelno] + if RICH: + icon = rich.markup.escape(icon) + setattr(record, "x", icon) + # Fix `took` missing + if not hasattr(record, "took"): + setattr(record, "took", "") + # Apply indent + if hasattr(record, "indent"): + record.msg = " " * getattr(record, "indent") * 4 + record.msg + delattr(record, "indent") + # Format and right-align text + text = super().format(record) + if RICH: + text = f"[{STYLES[record.levelno]}]{text}[/]" + styles_len = len(text) - len(rich.markup.render(text)) if RICH else 0 + left, right = text.split("\0") + if right: + # Right-align text only if needed + width = os.get_terminal_size().columns # Terminal width + rows = left.split("\n") + first = rows[0] + if len(first) + 1 + len(right) - styles_len <= width: + # Don't add the right text if the left one is too long + first += f"{' '*(width - len(first) - len(right) + styles_len)}{right}" + return "\n".join([first, *rows[1:]]) + return left + "[/]" + + +class Logger(logging.Logger): + """An enhanced logger.""" + + __slots__ = ("_indent", "result", "_result_logged", "_timestamp") + _indent: int + result: str + _result_logged: bool + _timestamp: int | None + + def __init__(self, name: str, level: int | str = NOTSET) -> None: + super().__init__(name, level) + self._indent = 0 + self._task_reset() + + def _task_reset(self): + """Resetta gli attributi relativi alla `task`.""" + self.result = "" + self._result_logged = False + self._timestamp = None + + def makeRecord(self, *args, **kwargs) -> logging.LogRecord: + """Create a `logging.LogRecord` instance.""" + record = super().makeRecord(*args, **kwargs) + setattr(record, "indent", self._indent + getattr(record, "indent", 0)) + return record + + @contextmanager + def task(self, msg: str, level: int = INFO, id: str | None = None) -> Iterator[Logger]: + """Log the fact we're doing something.""" + # pylint: disable=protected-access + self.log(level, TASK_MESSAGE.format(msg)) # pylint: disable=logging-format-interpolation + tsk = self.getChild("task") + if id: + tsk = tsk.getChild(id) + tsk._indent = self._indent + 1 + tsk.save_timestamp() + try: + yield tsk + finally: + # Stampa il messaggio + if not tsk._result_logged: + tsk.done() + # Resetta questo logger + tsk._task_reset() + + def save_timestamp(self) -> None: + """Salva il tempo attuale in nanosecondi.""" + self._timestamp = time.time_ns() + + @staticmethod + def _repr_dt(dt: int) -> str: + # Converti il ∆t in un formato utile + if dt < 1_000: + return f"{dt} ns" + if dt < 1_000_000: + return f"{dt/1_000} µs" + if dt < 1_000_000_000: + return f"{dt/1_000_000} ms" + return time.strftime("%H:%M:%S", time.gmtime(dt / 1_000_000_000)) + + def done(self, result: str | None = None, level: int = INFO) -> None: + """Log a 'done (...)' message.""" + t = time.time_ns() + if self._timestamp: + extra = dict(took=f"took {self._repr_dt(t - self._timestamp)} ") + else: + extra = {} + result = self.result if result is None else result + self.log(level, f"done{f' ({result})' if result else ''}.", extra=extra) + self._result_logged = True + + def fail(self, result: str | None = None, level: int = ERROR) -> None: + """Log a 'fail (...)' message.""" + t = time.time_ns() + if self._timestamp: + extra = dict(took=f"took {self._repr_dt(t - self._timestamp)} ") + else: + extra = {} + result = self.result if result is None else result + self.log(level, f"failed{f' ({result})' if result else ''}.", extra=extra) + self._result_logged = True + + +def get_levels() -> list[int]: + """Get the installed levels, as a list, in severity ascending order.""" + name2level: dict[str, int] | None + if sys.version_info >= (3, 11): + name2level = logging.getLevelNamesMapping() # pylint: disable=no-member + else: + name2level = getattr(logging, "_nameToLevel", None) + return sorted(set( + name2level.values() if name2level + else map(logging.getLevelName, 'CRITICAL ERROR WARNING INFO DEBUG NOTSET'.split(" ")) + )) + + +def cli_configure() -> None: + """Set up `logging` based on command-line flags.""" + global _setup_done # pylint: disable=global-statement + if _setup_done: + return + levels = get_levels() + # Controlla le varie flags che vengono passate al programma + quietness = sys.argv.count("-q") - sys.argv.count("-v") + levels.index(DEFAULT_LEVEL) + sys.argv = [x for x in sys.argv if x not in ("-v", "-q")] + for arg in sys.argv.copy(): + if arg[0] == "-" and arg[1:] and set(arg[1:]) <= {"q", "v"}: + quietness += arg.count("q") - arg.count("v") + sys.argv.remove(arg) + # Determina il livello a partire dalla `silenziosità` richiesta + quietness = max(0, min(len(levels) - 1, quietness)) + level = levels[quietness] + # Configurazione + ch = rich.logging.RichHandler( + highlighter=rich.highlighter.NullHighlighter(), + show_level=False, + show_time=False, + rich_tracebacks=True, + show_path=False, + ) if RICH else logging.StreamHandler() + ch.setFormatter(ConsoleFormatter(*MESSAGE_FORMAT, style="{", datefmt="%Y-%m-%d %H:%M:%S")) + ch.setLevel(NOTSET) + logging.setLoggerClass(Logger) + root = logging.getLogger() + root.addHandler(ch) + root.setLevel(level) + _setup_done = True + + +def task(msg: str, level: int = INFO, id: str | None = "") -> _GeneratorContextManager[Logger]: + """Start logging a task.""" + return getLogger(depth=1).task(msg, level=level, id=id) + + +def debug(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log an debug message.""" + getLogger(depth=1).debug(msg, *args, extra=extra, **kwargs) + + +def info(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log an information.""" + getLogger(depth=1).info(msg, *args, extra=extra, **kwargs) + + +def warning(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log a warning.""" + getLogger(depth=1).warning(msg, *args, extra=extra, **kwargs) + + +def error(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log an error.""" + getLogger(depth=1).error(msg, *args, extra=extra, **kwargs) + + +def critical(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log an error that causes the program's termination.""" + getLogger(depth=1).critical(msg, *args, extra=extra, **kwargs) + + +def exception(msg: Any, *args: Any, extra: dict[str, Any] | None = None, **kwargs) -> None: + """Log an exception.""" + getLogger(depth=1).exception(msg, *args, extra=extra, **kwargs) + + +if not eval(os.environ.get("NO_AUTO_LOGGING_CONFIG", "0") or "0"): + cli_configure() + # Silenzia i messaggi di debug di alcune librerie + getLogger("matplotlib").setLevel(WARNING) + + +def main(): + cli_configure() + L = getLogger(__name__) + L.critical("Critical") + L.error("Error") + L.warning("Warning") + L.info("Info") + L.debug("Debug") + with L.task("Null task #1") as computation: + pass + with L.task("Null task #2") as computation: + computation.done() + with L.task("Null task #3") as computation: + computation.done("explicit") + with L.task("Null task #4") as computation: + computation.fail("explicit") + with L.task("Null task #5") as computation: + computation.result = "custom result" + with L.task("Sleep task #1 (1s)") as computation: + time.sleep(1) + computation.fail() + with L.task("Sleep task #2 (3s, loop)") as computation: + for _ in range(3): + time.sleep(1) + computation.done() + with L.task("Sleep task #3 (3s, loop, log at every iteration)") as computation: + for _ in range(3): + time.sleep(1) + computation.info("Just slept 1s.") + computation.done() + L.debug("About to define function `_foo` with `@task` decorator") + + @L.task("Sleep task #4 (3s, via function)") + def _foo(x: int) -> None: + for __ in range(x): + time.sleep(1) + + L.debug("After defining function `_foo` with `@task` decorator") + _foo(3) + + +if __name__ == "__main__": + main() diff --git a/src/pi.pxd b/src/pi.pxd new file mode 100755 index 0000000..0e2d8b0 --- /dev/null +++ b/src/pi.pxd @@ -0,0 +1,63 @@ +import cython + +cdef: + long K + object SRC + object L + bint UNICODE_BOX + + +@cython.locals( + BUG=bint, + rargv=list, +) +cdef bint bug(bint default) + + +@cython.locals(_mode=int) +cdef int mode() + + +@cython.locals( + s=str, + j=int, + c=str +) +cdef str style_pi(str pi, int i, str OK, str K0, str KO) + + +@cython.locals( + width=int, + title=str, + BUG=bint, + MODE=bint, + TRG=object, + LEN=int, + N_in=int, + x_in=list, + x_out=list, + y_in=list, + y_out=list, + pi_array=list, + squares=list, + i=int, + x=int, + y=int, + pi=object, + l=int, + spi=str, + sPI=str, + digit=str, + DIGIT=str, + PI_STYLE=str, + OK_STYLE=str, + K0_STYLE=str, + KO_STYLE=str, + UL=str, + UR=str, + DL=str, + DR=str, + H=str, + V=str, +) +cpdef void main() diff --git a/src/pi.py b/src/pi.py index 7f5c679..c74c666 100755 --- a/src/pi.py +++ b/src/pi.py @@ -1,43 +1,48 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Utilizza il TRNG per stimare π tramite il metodo Monte Carlo.""" -from rand import TrueRandomGenerator -import matplotlib.pyplot as plt +import os +import sys +import random from math import pi as PI from pathlib import Path -import random -import sys -import os +import matplotlib.pyplot as plt +from rand import TrueRandomGenerator +from log import getLogger, style, sprint # Costanti K = 255**2 SRC = Path(__file__).parent # Cartella di questo file +L = getLogger(__name__) # Logger per questo file + +# Se l'output è formattato male, imposta questa flag a `False` +UNICODE_BOX: bool = True # False def bug(default: bool, /) -> bool: """Determina se è stato attivato il “bug” da riga di comando.""" - # $ python pi.py # --> di default + # $ python pi.py # --> di default # $ python pi.py --bug # --> attivo - # $ python pi.py --no-bug # --> disattivato - # $ python pi.py --no-bug --bug # --> attivo (--bug sovrascrive --no-bug) - # $ python pi.py --bug --no-bug # --> disattivato (--no-bug sovrascrive --bug) + # $ python pi.py --no-bug # --> disattivato + # $ python pi.py --no-bug --bug # --> attivo (--bug sovrascrive --no-bug) + # $ python pi.py --bug --no-bug # --> disattivato (--no-bug sovrascrive --bug) if "--bug" in sys.argv: if "--no-bug" in sys.argv: - BUG = sys.argv[::-1].index("--bug") < sys.argv[::-1].index("--no-bug") + rargv = sys.argv[::-1] # Crea una copia invertita di `sys.argv` + BUG = rargv.index("--bug") < rargv.index("--no-bug") sys.argv = [x for x in sys.argv if x not in ("--no-bug", "--bug")] return BUG - else: - sys.argv = [x for x in sys.argv if x != "--bug"] - return True - elif "--no-bug" in sys.argv: + sys.argv = [x for x in sys.argv if x != "--bug"] + return True + if "--no-bug" in sys.argv: sys.argv = [x for x in sys.argv if x != "--no-bug"] return False return default def mode() -> int: - """Determina l'algoritmo da utilizzare""" + """Determina l'algoritmo da utilizzare.""" # Controlla se l'algoritmo è stato selezionato da riga di comando. # Struttura del vettore "sys.argv": # $ python /path/to/script.py a1 a2 a3 @@ -45,8 +50,8 @@ def mode() -> int: if len(sys.argv) > 1: # Ci sono almeno 2 valori in sys.argv, quindi è stato inserito almeno un argomento try: - _mode = int(sys.argv[1]) - except BaseException: + _mode = int(eval(sys.argv[1])) + except: # Gestione errori: se il modo selezionato dalla riga di comando # non è valido, continua con la selezione interattiva pass @@ -55,61 +60,58 @@ def mode() -> int: if 0 <= _mode <= 3: # Valido return _mode - else: - # Invalido: continua con la selezione interattiva - pass + # Non valido: continua con la selezione interattiva # Selezione interattiva dell'algoritmo - print(f""" ->>> Choose an algorithm: + print(""" +>>> Please choose an algorithm: [0] Interpret data as sequential (x, y) points. [1] Interpret data as adjacent/linked (x, y) points. [2] Generate every possible (x, y) combination. [3] Use pseudo-random (x, y) points.\ """) # Richiede all'utente l'algoritmo da utilizzare (il valore di "_mode") - _mode: int + _mode: int = 0 while True: try: - _mode = int(input("> ")) + _mode = int(eval(input("> "))) # Gestione errori: per terminare il programma except (KeyboardInterrupt, EOFError, OSError): sys.exit(0) # Gestione errori: input non intero (chiede nuovamente) - except BaseException: - print("[!] Please type in an integer (0|1|2|3)!") + except: + L.warning("Algorithm index has to be an integer (0|1|2|3)!") continue # Numero intero: ok else: # Troppo grande o troppo piccolo (chiede nuovamente) if _mode > 3 or _mode < 0: - print("[!] Invalid integer (has to be in [0, 3])!") + L.warning(f"Invalid integer `{_mode}` (has to be in [0, 3])!") continue # Tutto ok: "_mode" è impostato e si continua col programma return _mode # questo 'return' interrompe il ciclo 'while' e ritorna il valore di '_mode' -# Calcolo di π con metodo Monte Carlo e numeri casuali generati con TrueRandomGenerator +# Funzione principale def main(): + """Calcola π tramite il metodo Monte Carlo, utilizzando il nostro TRNG.""" # Stampa il titolo width = os.get_terminal_size().columns title = " Monte Carlo Method π Approximator " - around = "=" * (max(0, width - len(title)) // 2) - print(around, title, around, sep="") + sprint(f"{title:=^{width}}", style="bold") # see https://pyformat.info/ for why this works - # Determina il valore di "BUG", tenendo conto della riga di comando + # Determina il valore di `BUG`, tenendo conto delle flag da riga di comando BUG = bug(True) # Di default è attivo - if __debug__: - # Comunica che BUG è attivo (per sicurezza) - print(f"[i] BUG is {'en' if BUG else 'dis'}abled.") + # Comunica se BUG è attivo (per sicurezza) + L.info(f"BUG is {'en' if BUG else 'dis'}abled.") # Determina l'algoritmo da utilizzare - _mode: int = mode() # Usa la funzione sopra definita - print(f"[i] Using algorithm [{_mode}].") # Stampa l'algoritmo, per sicurezza + MODE: int = mode() # Usa la funzione sopra definita + L.info(f"Using algorithm [{MODE}].") # Stampa l'algoritmo, per sicurezza # Inizializzazione TRG = TrueRandomGenerator(bug=BUG) # Il nostro generatore - LEN = TRG.nRandomNumbers # Numero di valori casuali disponibili + LEN = TRG.n_random_numbers # Numero di valori casuali disponibili N_in: int = 0 # Numero di coordinate casuali all'interno del cerchio # noqa x_in: list[int] = [] # Lista delle coordinate x all'interno del cerchio # noqa y_in: list[int] = [] # Lista delle coordinate y all'interno del cerchio # noqa @@ -118,16 +120,19 @@ def main(): pi_array: list[float] = [] # Lista delle stime di π nel tempo pi: float = 0 # Stima di π, ricalcolata ad ogni iterazione + # Pre-calcolo dei quadrati, per ottimizzazione + squares = [x**2 for x in TRG.random_numbers] + # ------------------------- Metodo 1: base, O(n) -------------------------- - if _mode == 0: + if MODE == 0: for i in range(LEN // 2): # Generazione di coordinate con due numeri casuali sequenziali x = TRG.random_number() y = TRG.random_number() - # Se il punto di coordinate (x, y) appartiene al 1/4 di cerchio di raggio 255: + # Se il punto di coordinate (x, y) appartiene cerchio di raggio 255: if x**2 + y**2 <= K: - N_in = N_in + 1 # incrementa il numero di coordinate all'interno, + N_in += 1 # incrementa il numero di coordinate all'interno, x_in.append(x) # salva la coordinata x nella lista dedicata, y_in.append(y) # salva la coordinata y nella lista dedicata. else: # Altrimenti, le coordinate (x, y) non appartengono al cerchio: @@ -149,11 +154,11 @@ def main(): plt.show() # -------------- Metodo 2: coppie di valori adiacenti, O(n) --------------- - elif _mode == 1: + elif MODE == 1: y = TRG.random_number() # Assegnazione valore di default (pre-ciclo) - for i in range(LEN - 1): - x = y # Assegnazione a "x" del numero casuale "y" precedentemente utilizzato - y = TRG.random_number() # Creazione nuovo numero casuale + for i in range(LEN): + # L'`y` di prima diventa il nuovo `x`, mentre `y` diventa un nuovo numero casuale + x, y = y, TRG.random_number() if x**2 + y**2 <= K: # Analogo al metodo 1 N_in = N_in + 1 x_in.append(x) @@ -173,19 +178,16 @@ def main(): # Disegna l'andamento della stima di π in funzione del numero di coordinate plt.plot(pi_array) - plt.plot([PI] * (LEN - 1), linestyle="dashed") + plt.plot([PI] * LEN, linestyle="dashed") plt.show() # ------------ Metodo 3: tutte le coordinate possibili, O(n^2) ------------ - elif _mode == 2: - # Pre-calcolo dei quadrati, per ottimizzazione - nums = [b**2 for b in TRG.randomNumbers] - + elif MODE == 2: # `enumerate([a, b, c]) -> [(0, a), (1, b), (2, c)]` # Questo ciclo scorre gli elementi (`x`) del vettore `nums`, # associando a ciascuno il proprio indice (`i`) - for i, x in enumerate(nums): - for y in nums: + for i, x in enumerate(squares): + for y in squares: if x + y <= K: # Analogo al metodo 1 N_in += 1 # Stima di π @@ -231,25 +233,60 @@ def main(): plt.show() # --- Stampa la stima finale di π --- - # Converti i numeri in stringhe, rimuovendo il punto decimale (non conta come cifra uguale/diversa) - spi = str(pi).replace(".", "") - SPI = str(PI).replace(".", "") - L = len(SPI) # Per velocizzare i calcoli + # Per velocizzare i calcoli + l = len(str(PI)) - 1 # -1 perché ignoriamo il `.` + # + spi = f"{pi:01.{l-1}f}" + sPI = f"{PI:01.{l-1}f}" # Conta quante cifre sono corrette i = 0 - for i in range(L): - if SPI[i] != spi[i]: + for i, (digit, DIGIT) in enumerate(zip(spi.replace(".", ""), sPI.replace(".", ""))): + if DIGIT != digit: break # Stampa i valori in un riquadro - print(f"""\ -,{'-'*(L+7)}, -| π ≈ {pi} | -| π = {PI} | -| {'+' if i else '^'}-{'+'*(i-1) if i else ''}{'^' if i else ''}{'~'*(L-i-1)} | -'{'-'*(L+7)}'\ + PI_STYLE = "green" # il valore vero di π + OK_STYLE = "bold green" # le cifre corrette + K0_STYLE = "bold yellow" # la prima cifra errata + KO_STYLE = "bright_red" # le altre cifre errate + # Margini del riquadro + UL = "┌" if UNICODE_BOX else "," + UR = "┐" if UNICODE_BOX else "," + DL = "└" if UNICODE_BOX else "'" + DR = "┘" if UNICODE_BOX else "'" + H = "─" if UNICODE_BOX else "-" + V = "│" if UNICODE_BOX else "|" + sprint(f""" +{UL}{H*(l+7)}{UR} +{V} π ≈ {style_pi(spi, i, OK_STYLE, K0_STYLE, KO_STYLE)} {V} +{V} π = {style_pi(sPI, i, PI_STYLE, OK_STYLE, PI_STYLE)} {V} +{V} { + style('+', OK_STYLE) if i else style('^', K0_STYLE) +}-{ + style('+', OK_STYLE)*(i-1) if i else '' +}{ + style('^', K0_STYLE) if i else '' +}{ + style('~', KO_STYLE)*(l-i-1) +} {V} +{DL}{H*(l+7)}{DR} """) +def style_pi(pi: str, i: int, OK: str, K0: str, KO: str) -> str: + """Colora `pi` in base al numero di cifre corrette (`i`) e agli stili specificati.""" + s = "" + for j, c in enumerate(pi.replace(".", "")): + if j < i: + s += style(c, OK) + elif j == i: + s += style(c, K0) + else: + s += style(c, KO) + if j == 0: + s += "." + return s + + # Chiama "main()" quando il programma viene eseguito direttamente if __name__ == "__main__": main() diff --git a/src/rand.py b/src/rand.py index 9266522..c6b4015 100755 --- a/src/rand.py +++ b/src/rand.py @@ -5,10 +5,12 @@ from pathlib import Path from typing import Literal, NamedTuple, overload from enum import Flag, auto +from log import getLogger import root # Determina la cartella dove si trova questo file SRC = Path(__file__).parent +L = getLogger(__name__) # Il logger associato a questo modulo class Event(NamedTuple): @@ -31,12 +33,12 @@ class TrueRandomGenerator: # --- Variabili d'istanza --- # pubbliche - deltaTs: list[int] # Differenze dei tempi - randomBits: list[int] # Bit (0|1) casuali - randomNumbers: list[int] # Numeri casuali (da 0 a 255) - nRandomNumbers: int # Numero di numeri casuali + delta_times: list[int] # Differenze dei tempi + random_bits: list[int] # Bit (0|1) casuali + random_numbers: list[int] # Numeri casuali (da 0 a 255) + n_random_numbers: int # Numero di numeri casuali # protette - _i: int # Indice per il metodo `random_number()` + _i: int # Indice per il metodo `random_number()` # --- Metodo di inizializzazione --- @@ -75,8 +77,8 @@ def __init__( # Se invece nemmeno `file=` è stato specificato, non usare alcun file files = ([] if file is None else [file]) if files is None else files.copy() # Apri i file in `files` e leggi l'albero "Data_R", aggiungendo i dati a `t` - for f in files: - events += root.read(f, "Data_R", cls=Event) + for file in files: + events += root.read(file, "Data_R", cls=Event) # Se non ci sono abbastanza eventi, riporta un errore e termina il programma if len(events) < 9: raise ValueError( @@ -84,105 +86,97 @@ def __init__( ) # --- 1. Calcolo delle differenze dei tempi tra coppie di tempi adiacenti --- - if __debug__: - print("--> Calculating time differences") - self.deltaTs = [] - for i in range(1, len(events)): - # `dT` = (tempo dell'`i`-esimo evento) - (tempo dell'`i-1`-esimo evento) - dT = events[i].Timestamp - events[i - 1].Timestamp - # Salva `dT` nel vettore dedicato - self.deltaTs.append(dT) - if __debug__: - print(" done.") + with L.task("Calculating time differences"): - # --- 2. Generazione dei bit casuali --- - if __debug__: - print("--> Generating random bits") - # Applicazione del metodo (statico) `self._rand(...)` alle - # differenze dei tempi e salvataggio nel vettore `self.bits` - self.randomBits = list(map(self._rand, self.deltaTs)) - if __debug__: - print(" done.") + self.delta_times = [] + for i in range(1, len(events)): + # ∆t = (tempo dell'`i`-esimo evento) - (tempo dell'`i-1`-esimo evento) + delta_time = events[i].Timestamp - events[i - 1].Timestamp + # Salva ∆t (`delta_time`) nel vettore dedicato + self.delta_times.append(delta_time) - if __debug__: - print("--> Generating random numbers") + # --- 2. Generazione dei bit casuali --- + with L.task("Generating random bits"): + # Applicazione del metodo (statico) `self._rand(...)` alle + # differenze dei tempi e salvataggio nel vettore `self.bits` + self.random_bits = list(map(self._rand, self.delta_times)) # --- 3. Generazione dei numeri casuali (da 0 a 255) --- - self.randomNumbers = [] - randomNumbers_b = [] - # Inizializza un vettore di lunghezza 8 (pieno di zeri) - byte = [0] * 8 - - if _BYTES_GENERATION_METHOD == 0: - # -------------------- Metodo 1 -------------------- - # = ⌊ / 8 ⌋ ('//' è la divisione intera) - nbytes = len(self.randomBits) // 8 - for i in range(nbytes): - for j in range(8): - # Prendi 8 elementi da `self.randomBits` e salvali in `byte` - byte[j] = self.randomBits[i * 8 + j] - - # Converti `byte` in un numero da 0 a 255 tramite il metodo (statico) `_conv()`; - # salva poi il risultato nella variabile di istanza. - self.randomNumbers.append(self._conv(byte)) - # Se il `bug` è attivo, rifallo con il metodo (statico) `_conv2()` - if bug: - randomNumbers_b.append(self._conv2(byte)) - - else: - # -------------------- Metodo 2 -------------------- - for i in range(len(self.randomBits)): - # Copia l'`i`-esimo bit nell'(`i` mod 8)-esima cella di `byte` - byte[i % 8] = self.randomBits[i] - if i % 8 == 7: - # Il byte è completo: convertilo in numero decimale e salvalo - self.randomNumbers.append(self._conv(byte)) + with L.task("Generating random numbers"): + + self.random_numbers = [] + random_numbers_b = [] + # Inizializza un vettore di lunghezza 8 (pieno di zeri) + byte = [0] * 8 + + if _BYTES_GENERATION_METHOD == 0: + # -------------------- Metodo 1 -------------------- + # = ⌊ / 8 ⌋ ('//' è la divisione intera) + n_bytes = len(self.random_bits) // 8 + for i in range(n_bytes): + for j in range(8): + # Prendi 8 elementi da `self.randomBits` e salvali in `byte` + byte[j] = self.random_bits[i * 8 + j] + + # Converti `byte` in un numero da 0 a 255 tramite il metodo (statico) `_conv()`; + # salva poi il risultato nella variabile di istanza. + self.random_numbers.append(self._conv(byte)) + # Se il `bug` è attivo, rifallo con il metodo (statico) `_conv2()` if bug: - randomNumbers_b.append(self._conv2(byte)) - - if bug: - self.randomNumbers += randomNumbers_b - - if __debug__: - print(" done.") + random_numbers_b.append(self._conv2(byte)) + + else: + # -------------------- Metodo 2 -------------------- + for i, bit in enumerate(self.random_bits): + # Copia l'`i`-esimo bit nell'(`i` mod 8)-esima cella di `byte` + byte[i % 8] = bit + if i % 8 == 7: + # Il byte è completo: convertilo in numero decimale e salvalo + self.random_numbers.append(self._conv(byte)) + if bug: + random_numbers_b.append(self._conv2(byte)) + + if bug: + self.random_numbers += random_numbers_b # Salva la lunghezza di "self.randomNumbers" per un accesso più rapido - self.nRandomNumbers = len(self.randomNumbers) + self.n_random_numbers = len(self.random_numbers) # Dichiara la variabile d'istanza che tiene traccia del punto a cui siamo arrivati a leggere i byte casuali self._i = 0 - # Metodo statico: genera un bit dal paramentro "n" + # Metodo statico: genera un bit dal numero che gli viene passato @staticmethod - def _rand(n: int) -> int: - return n % 2 + def _rand(num: int) -> int: + return num % 2 - # Metodo statico: converte il vettore di bit "v" in numero decimale + # Metodo statico: converte il vettore di bit `byte` in numero decimale @staticmethod - def _conv(v: list[int]) -> int: - # indici di `v` (`7-i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] - # esponenti di 2 (`i`) : [ 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 ] - sum = 0 + def _conv(byte: list[int]) -> int: + # indici di `byte` (`7-i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] + # esponenti di 2 (`i`) : [ 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 ] + num = 0 for i in range(8): - sum += v[7 - i] * 2**i - return sum + num += byte[7 - i] * 2**i + return num - # Metodo statico: converte fasullamente il vettore di bit "v" in numero decimale + # Metodo statico: converte in modo errato il vettore di bit `byte` in numero decimale @staticmethod - def _conv2(v: list[int]) -> int: - # indici di `v` (`i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] - # esponenti di 2 (`i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] - sum = 0 + def _conv2(byte: list[int]) -> int: + # indici di `byte` (`i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] + # esponenti di 2 (`i`): [ 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 ] + num = 0 for i in range(8): - sum += v[i] * 2**i # <-- il bug è qui, i pesi dei bit sono in ordine inverso - return sum + num += byte[i] * 2**i # <-- il bug è qui, i pesi dei bit sono in ordine inverso + return num # Metodo: restituisce un numero casuale tra 0 e 255 (ogni volta diverso: scorre ciclicamente lungo i byte casuali) def random_number(self) -> int: - n = self.randomNumbers[self._i] + """Restituisce un numero casuale da 0 a 255.""" + num = self.random_numbers[self._i] # Incremento dell'indice, torna a 0 se si raggiunge l'ultimo numero casuale disponibile - self._i = (self._i + 1) % self.nRandomNumbers - return n + self._i = (self._i + 1) % self.n_random_numbers + return num # Classe che contiene le flag per scegliere cosa mostrare nei grafici @@ -195,7 +189,7 @@ class PLOT(Flag): # Distribuzione dei bit BITS_DISTRIBUTION = auto() # Distribuzione dei byte - BYTES_DISTRIBUTION = auto() # isogramma principale + BYTES_DISTRIBUTION = auto() # istogramma principale BYTES_DISTRIBUTION_LOCAL_MEANS = auto() # medie locali @@ -212,90 +206,96 @@ class PLOT(Flag): # Funzione per calcolare le medie locali (ciclicamente) -def cyclic_local_means(v: list[int], spread: int = 5) -> list[float]: - # 'v' è il vettore con i dati - # 'spread' è quanti valori prendere +def cyclic_local_means(data: list[int], spread: int = 5) -> list[float]: + """Calcola ciclicamente le medie locali del vettore `data`, con lo `spread` specificato. + + Esempio + ------- + >>> for i in range(1, 7): + ... print(f"spread={i} --> {cyclic_local_means(list(range(6)), spread=i)}") + spread=1 --> [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + spread=2 --> [0.5, 1.5, 2.5, 3.5, 4.5, 2.5] + spread=3 --> [2.0, 1.0, 2.0, 3.0, 4.0, 3.0] + spread=4 --> [2.0, 1.5, 2.5, 3.5, 3.0, 2.5] + spread=5 --> [2.4, 2.2, 2.0, 3.0, 2.8, 2.6] + spread=6 --> [2.5, 2.5, 2.5, 2.5, 2.5, 2.5] + """ left = (spread - 1) // 2 - L = len(v) - return [sum([v[(i + j - left) % L] for j in range(spread)]) / spread for i in range(L)] + length = len(data) + return [sum(data[(i + j - left) % length] for j in range(spread)) / spread for i in range(length)] # Funzione per testare il generatore def test(): + """Testa il generatore di numeri veramente casuali.""" + # La libreria `matplotlib` serve soltanto qua: importarla all'inizio di tutto il programma è sconveniente - import matplotlib.pyplot as plt + import matplotlib.pyplot as plt # pylint: disable=import-outside-toplevel gen = TrueRandomGenerator() # Salva alcuni valori utili nel namespace locale # per velocizzare l'accesso - bits = gen.randomBits - nums = gen.randomNumbers - - _PLOT_ITEM_MESSAGE = " * {}" - if __debug__ and TO_PLOT: - print("--> Plotting required items:") - - # ------------------------ Differenze di tempo ------------------------- - if PLOT.TIME_DELTAS in TO_PLOT: - if __debug__: - print(_PLOT_ITEM_MESSAGE.format(PLOT.TIME_DELTAS)) - plt.hist(gen.deltaTs, bins=500) - plt.yscale("log") - plt.xlabel("Time difference between two conecutive events [Digitizer Clock Periods]") - plt.ylabel("Counts") - plt.title("Time difference between two conecutive events") - plt.show() - - # ------------------------ Distribuzione dei bit ------------------------- - if PLOT.BITS_DISTRIBUTION in TO_PLOT: - if __debug__: - print(_PLOT_ITEM_MESSAGE.format(PLOT.BITS_DISTRIBUTION)) - # print(len(gen.deltaT)) # stampa il numero di deltaT disponibili - # print(*gen.randomNumbers, sep="\n") # stampa numeri casuali disponibili - # # Confronta frequenze di 0 e 1 in bits - # n0 = gen.randomBits.count(0) - # print(n0/len(bits), (nbits-n0)/len(bits)) - plt.hist(bits, bins=2) # istogramma per confrontare 0 e 1 (i bit) - plt.xlabel("Bit") - plt.ylabel("Counts") - plt.ylim(bottom=0) - plt.title("Bits distribution") - plt.show() - - # ------------------------ Distribuzione dei byte ------------------------- - - if PLOT.BYTES_DISTRIBUTION in TO_PLOT: - if __debug__: - print(_PLOT_ITEM_MESSAGE.format(PLOT.BYTES_DISTRIBUTION)) - # Numeri casuali - plt.hist( - nums, - bins=256, - alpha=0.75 if PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT else 1, - ) - - if PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT: - if __debug__: - print(_PLOT_ITEM_MESSAGE.format(PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS)) - # Conta quanti numeri casuali vengono generati in base al loro valore: - # `plt.hist()` lo fa in automatico, ma poiché dobbiamo fare le medie - # locali abbiamo bisogno di ottenere questi conteggi “manualmente” - vals = [0] * 256 - for x in nums: - vals[x] += 1 - # Disegna le medie locali - plt.plot(cyclic_local_means(vals, spread=32)) - - if PLOT.BYTES_DISTRIBUTION in TO_PLOT or PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT: - plt.xlabel("Bytes") - plt.ylabel("Counts") - plt.ylim(0, 85) - plt.title("Bytes distribution") - plt.show() - - if __debug__: - print(" done.") + bits = gen.random_bits + nums = gen.random_numbers + + if TO_PLOT: + with L.task("Plotting required items") as plotting: + _plot_item_message: str = " * {}" + + # ------------------------ Differenze di tempo ------------------------- + if PLOT.TIME_DELTAS in TO_PLOT: + plotting.info(_plot_item_message.format(PLOT.TIME_DELTAS)) + plt.hist(gen.delta_times, bins=500) + plt.yscale("log") + plt.xlabel("Time difference between two conecutive events [Digitizer Clock Periods]") + plt.ylabel("Counts") + plt.title("Time difference between two conecutive events") + plt.show() + + # ------------------------ Distribuzione dei bit ------------------------- + if PLOT.BITS_DISTRIBUTION in TO_PLOT: + plotting.info(_plot_item_message.format(PLOT.BITS_DISTRIBUTION)) + # print(len(gen.deltaT)) # stampa il numero di deltaT disponibili + # print(*gen.randomNumbers, sep="\n") # stampa numeri casuali disponibili + # # Confronta frequenze di 0 e 1 in bits + # n0 = gen.randomBits.count(0) + # print(n0/len(bits), (len(bits)-n0)/len(bits)) + plt.hist(bits, bins=2) # istogramma per confrontare 0 e 1 (i bit) + plt.xlabel("Bit") + plt.ylabel("Counts") + plt.ylim(bottom=0) + plt.title("Bits distribution") + plt.show() + + # ------------------------ Distribuzione dei byte ------------------------- + + if PLOT.BYTES_DISTRIBUTION in TO_PLOT: + plotting.info(_plot_item_message.format(PLOT.BYTES_DISTRIBUTION)) + # Numeri casuali + plt.hist( + nums, + bins=256, + alpha=0.75 if PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT else 1, + ) + + if PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT: + plotting.info(_plot_item_message.format(PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS)) + # Conta quanti numeri casuali vengono generati in base al loro valore: + # `plt.hist()` lo fa in automatico, ma poiché dobbiamo fare le medie + # locali abbiamo bisogno di ottenere questi conteggi “manualmente” + vals = [0] * 256 + for x in nums: + vals[x] += 1 + # Disegna le medie locali + plt.plot(cyclic_local_means(vals, spread=32)) + + if PLOT.BYTES_DISTRIBUTION in TO_PLOT or PLOT.BYTES_DISTRIBUTION_LOCAL_MEANS in TO_PLOT: + plt.xlabel("Bytes") + plt.ylabel("Counts") + plt.ylim(0, 85) + plt.title("Bytes distribution") + plt.show() # Chiama "test()" quando il programma viene eseguito direttamente diff --git a/src/root.pxd b/src/root.pxd new file mode 100644 index 0000000..1c712d8 --- /dev/null +++ b/src/root.pxd @@ -0,0 +1,24 @@ +import cython +cdef: + bint FORCE_UPROOT + bint ROOT + + +@cython.locals( + data=list, + vals=dict, + f=object, + t=object, + x=object, + raw_data=dict, + branches=dict, + attr=str, + i=cython.long, +) +cdef list[object] _read( + object file, + type cls, + str tree, + list attributes, + list list_conv, +) diff --git a/src/root.py b/src/root.py index b7e6ebf..c7ec2b6 100644 --- a/src/root.py +++ b/src/root.py @@ -1,44 +1,130 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +# pylint: disable=no-member,used-before-assignment """Modulo che utilizza PyROOT (se installato), o uproot come backend.""" from __future__ import annotations +from typing import Any, NamedTuple, Sequence, TypeVar, get_origin, get_type_hints, overload from collections import namedtuple from pathlib import Path -from typing import Any, NamedTuple, Sequence, TypeVar, get_origin, get_type_hints, overload +import sys import os +from log import getLogger + + +L = getLogger(__name__) -# ----- 1. Importa la libreria corretta ------ # +# ----- 1. Importa la libreria corretta ------ # # Variabile che controlla la libreria da usare: True per PyROOT, False per uproot. -# Il valore iniziale è determinato a partire variabile d'ambiente FORCE_UPROOT. -ROOT: bool = not eval(os.environ.get("FORCE_UPROOT", "0") or "0") +# Il valore iniziale è determinato a partire dalla variabile d'ambiente `FORCE_UPROOT`. +FORCE_UPROOT = eval(os.environ.get("FORCE_UPROOT", "0") or "0") # Prova a importare PyROOT; se fallisci, prova con uproot. # Imposta la variabile `ROOT` di conseguenza. +ROOT: bool try: - if not ROOT: + L.debug(f"Environment variable `FORCE_UPROOT` is {'' if FORCE_UPROOT else 'not '}set.") + if FORCE_UPROOT: raise ModuleNotFoundError + L.debug("Trying to import `PyROOT`") import ROOT as PyROOT except ModuleNotFoundError: - # Non c'è PyROOT: usiamo uproot - import uproot - - ROOT = False + try: + # Non c'è PyROOT: usiamo uproot + L.debug("Trying to import `uproot`") + import uproot + except ModuleNotFoundError: + # Non c'è né PyROOT né uproot: + L.critical("No ROOT backend available: please install either PyROOT (`root`) or `uproot`.") + sys.exit(1) + else: + ROOT = False else: # nessun errore: PyROOT c'è. ROOT = True -if __debug__: - print(f"[i] ROOT backend: {'PyROOT' if ROOT else 'uproot'}") +L.info(f"ROOT backend: {'PyROOT' if ROOT else 'uproot'}") +# ----- 2. Definisci la funzione di lettura ------ # -# ----- 2. Definisci la funzione di lettura ------ # _T = TypeVar("_T", bound=NamedTuple) +def _read( + file: str | Path, + cls: type[_T], + tree: str, + attributes: list[str], + list_conv: list[str], +) -> list[_T]: + # Inizializzazione variabili + file = str(Path(file).expanduser().resolve()) + data: list[_T] = [] # Questo sarà il risultato della funzione + # In `vals` vengono salvati i parametri da passare alla classe nella costruzione dell'oggetto + vals: dict[str, Any] = {} + + with L.task(f"Reading tree {tree!r} from file {file!r}...") as reading: + + if ROOT: # --- PyROOT --- + # Termina il loop degli eventi di PyROOT, in modo che non interferisca con matplotlib + PyROOT.keeppolling = 0 # type: ignore + # Apri il file + f = PyROOT.TFile(file) # type: ignore + # Leggi l'albero + t = f.Get(tree) + # Leggi e salva i dati di interesse + for x in t: + vals.clear() # Svuota i parametri + for attr in attributes: + # Converti l'attributo in lista ove necessario + if attr in list_conv: + vals[attr] = [*getattr(x, attr)] + else: + vals[attr] = getattr(x, attr) + # Crea l'oggetto e aggiungilo a `data` + data.append(cls(**vals)) # type: ignore + # Chiudi il file + f.Close() + + else: # --- uproot --- + + # Mappa vuota per i dati grezzi + # (associa al nome dell'attributo la lista dei valori, ancora da combinare negli oggetti) + raw_data: dict[str, Any] = {} + # Apri l'albero `tree` dal file `file` + with uproot.open(f"{file}:{tree}") as t: + # Salva i “rami” come mappa + branches = dict(t.iteritems()) + for attr in attributes: + # Converti l'attributo in lista ove necessario + if attr in list_conv: + raw_data[attr] = list(map(list, branches[attr].array())) + else: + raw_data[attr] = list(branches[attr].array()) + + # Converti i dati grezzi in lista di oggetti: + # scorri gli indici e associa gli attributi corrispondenti, creando l'oggetto + # + # i: 0 1 2 3 ... + # | | | | + # V V V V + # attr0: x00 x01 x02 x03 ... ¯| + # attr1: x10 x11 x12 x13 ... |--> raw_data + # attr2: x20 x21 x22 x23 ... _| + # | | | | + # V V V V + # data: ### ### ### ### ... + # + for i in range(len(raw_data[attributes[0]])): + data.append(cls(**{name: val[i] for name, val in raw_data.items()})) # type: ignore + + reading.result = f"read {len(data)} items" + return data + + # O si specifica la classe tramite il parametro `cls`... @overload def read(file: Path | str, tree: str, /, *, cls: type[_T]) -> list[_T]: @@ -121,84 +207,34 @@ def read( if issubclass(get_origin(t) or t, list) ] - # Inizializzazione variabili - file = str(Path(file).expanduser().resolve()) - data: list[_T] = [] # Questo sarà il risultato della funzione - # In `vals` vengono salvati i parametri da passare alla classe nella costruzione dell'oggetto - vals: dict[str, Any] = {} - - if __debug__: - print(f"--> Reading tree {tree!r} from file {file!r}") - - if ROOT: # --- PyROOT --- - # Termina il loop degli eventi di PyROOT, in modo che non interferisca con matplotlib - PyROOT.keeppolling = 0 # type: ignore - # Apri il file - f = PyROOT.TFile(file) # type: ignore - # Leggi l'albero - t = f.Get(tree) - # Leggi e salva i dati di interesse - for x in t: - vals.clear() # Svuota i parametri - for attr in attributes: - # Converti l'attributo in lista ove necessario - if attr in list_conv: - vals[attr] = [*getattr(x, attr)] - else: - vals[attr] = getattr(x, attr) - # Crea l'oggetto e aggiungilo a `data` - data.append(cls(**vals)) # type: ignore - # Chiudi il file - f.Close() - - else: # --- uproot --- - - # Mappa vuota per i dati grezzi - # (associa al nome dell'attributo la lista dei valori, ancora da combinare negli oggetti) - raw_data: dict[str, Any] = {} - # Apri l'albero `tree` dal file `file` - with uproot.open(f"{file}:{tree}") as t: - # Salva i “rami” come mappa - branches = {k: v for k, v in t.iteritems()} - for attr in attributes: - # Converti l'attributo in lista ove necessario - if attr in list_conv: - raw_data[attr] = list(map(list, branches[attr].array())) - else: - raw_data[attr] = list(branches[attr].array()) - - # Converti i dati grezzi in lista di oggetti: - # scorri gli indici e associa gli attributi corrispondenti, creando l'oggetto - # - # i: 0 1 2 3 ... - # | | | | - # V V V V - # attr0: x00 x01 x02 x03 ... ¯| - # attr1: x10 x11 x12 x13 ... |--> raw_data - # attr2: x20 x21 x22 x23 ... _| - # | | | | - # V V V V - # data: ### ### ### ### ... - # - for i in range(len(raw_data[attributes[0]])): - vals.clear() - for attr in raw_data: - vals[attr] = raw_data[attr][i] - data.append(cls(**vals)) # type: ignore - if __debug__: - print(f" done (read {len(data)} items).") - return data + return _read(file, cls, tree, list(attributes), list_conv) # type: ignore # "Esporta" i simboli di interesse __all__ = ["read"] -if __name__ == "__main__": - # Test +def test(): + """Testa il funzionamento di `read()`""" + class Event(NamedTuple): + """Rappresenta un evento.""" + + # Cosa leggere Timestamp: int Samples: list[int] - data = read("src/fondo.root", "Data_R", cls=Event) + SRC = Path(__file__).parent + DEFAULT = SRC / "fondo.root" + if len(sys.argv) > 1: + file = Path(sys.argv[1]) + if not file.exists(): + file = DEFAULT + else: + file = DEFAULT + data = read(file, "Data_R", cls=Event) assert isinstance(data[0], Event) + + +if __name__ == "__main__": + test() diff --git a/src/spettro.py b/src/spettro.py index 3303994..c98ffee 100755 --- a/src/spettro.py +++ b/src/spettro.py @@ -1,13 +1,17 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Analisi dello spettro del segnale.""" from __future__ import annotations from pathlib import Path from typing import Literal, NamedTuple import matplotlib.pyplot as plt import root +from log import getLogger, taskLogger # --- Costanti --- +# Logger per questo programma +L = getLogger(__name__) # Distanza temporale tra due samples T: float = 4 # µs # Numero di samples da prendere per calcolare la baseline @@ -32,12 +36,13 @@ class Event(NamedTuple): # --- Utility ---- -# Calcolo della media degli elementi contenuti nel vettore "v" -def mean(v): +def mean(v: list[float] | list[int]) -> float: + """Calcola la media degli elementi nel vettore `v`""" return sum(v) / len(v) # Calcolo delle aree per ogni evento +@L.task(f"Calculating {'BASELINES and ' if BASELINE_CALC_MODE == 1 else ''}areas") def aree( events: list[Event], BASELINE: float | None = None, @@ -45,22 +50,18 @@ def aree( min_samples: int = 0, max_samples: int | None = None, ) -> list[float]: + """Calcola l'area di ogni evento.""" + logger = taskLogger(__name__) + logger.debug(f"{max_area=}, samples range = [{min_samples}, {max_samples}]") - if __debug__: - print( - f"--> calculating {'BASELINES and ' if BASELINE_CALC_MODE == 1 else ''}" - f"areas({f'BASELINE={BASELINE}, ' if BASELINE_CALC_MODE == 0 else ''}" - f"{max_area=}, samples range = [{min_samples}, {max_samples}])" - ) - - aree: list[float] = [] + aree_calcolate: list[float] = [] for event in events: # Se necessario, calcola la BASELINE per questo evento if BASELINE_CALC_MODE == 1: BASELINE = mean(event.Samples[:BASELINE_CALC_N]) assert BASELINE is not None - # Estrazione dei samples dell'evento tra "min_samples" e "max_samples" + # Estrazione dei samples dell'evento tra `min_samples` e `max_samples` samples = event.Samples[min_samples:max_samples] # Calcolo dell'area: @@ -69,41 +70,34 @@ def aree( # Se non sono stati impostati limiti all'area o area < del limite ... if max_area is None or temp_area < max_area: - # ... salva l'area nel vettore "Aree" - aree.append(temp_area) + # ... salva l'area nel vettore `aree_calcolate` + aree_calcolate.append(temp_area) - if __debug__: - print(" done.") - return aree + return aree_calcolate # --- Programma principale ---- -# Funzione principale def main(): - if __debug__: - print("START") + """Funzione principale.""" # ----------------------------- Apertura file ----------------------------- SRC = Path(__file__).parent t = root.read(SRC / "data.root", "Data_R", cls=Event) # ------------------------ Calcolo della baseline ------------------------- + BASELINE = None if BASELINE_CALC_MODE == 0: - if __debug__: - print("--> calculating baseline") - medie = [] - for event in t: - # Calcola della media dei primi `BASELINE_CALC_N` samples richiamando la funzione "mean" - # Salva la media nel vettore "medie" - medie.append(mean(event.Samples[:BASELINE_CALC_N])) - # Salva la media del vettore "medie" come "BASELINE" - if __debug__: - print(" done.") - BASELINE = mean(medie) - # BASELINE = 13313.683338704632 # già calcolata, all'occorrenza - else: - BASELINE = None + with L.task("Calculating baseline...") as calc: + medie = [] + for event in t: + # Calcola della media dei primi `BASELINE_CALC_N` samples richiamando la funzione "mean" + # Salva la media nel vettore "medie" + medie.append(mean(event.Samples[:BASELINE_CALC_N])) + # Salva la media del vettore "medie" come "BASELINE" + BASELINE = mean(medie) + # BASELINE = 13313.683338704632 # già calcolata, all'occorrenza + calc.result = f"it's {BASELINE}" # ---------------------- Calibrazione spettro in keV ---------------------- X1 = 118900 # picco a 1436 keV @@ -144,6 +138,6 @@ def calibrate(x): plt.show() -# Chiama "main()" quando il programma viene eseguito direttamente +# Chiama `main()` quando il programma viene eseguito direttamente if __name__ == "__main__": main() diff --git a/src/stagisti.py b/src/stagisti.py index adea725..777afb0 100755 --- a/src/stagisti.py +++ b/src/stagisti.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +"""Distribuisci le parti della presentazione casualmente fra gli stagisti.""" from rand import TrueRandomGenerator g = TrueRandomGenerator() @@ -9,6 +10,6 @@ risultati = [] while len(risultati) < 4: stagista = stagisti[g.random_number() % 4] - if not stagista in risultati: + if stagista not in risultati: risultati.append(stagista) print(risultati)