diff --git a/shvatka/common/log_utils.py b/shvatka/common/log_utils.py new file mode 100644 index 00000000..1f9a5d95 --- /dev/null +++ b/shvatka/common/log_utils.py @@ -0,0 +1,6 @@ +from base64 import b64encode +from typing import Any + + +def obfuscate_sensitive(information: Any) -> str: + return b64encode(str(information).encode("utf8")).decode("utf8") diff --git a/shvatka/core/interfaces/dal/game_play.py b/shvatka/core/interfaces/dal/game_play.py index 4b4948fa..69ed93a4 100644 --- a/shvatka/core/interfaces/dal/game_play.py +++ b/shvatka/core/interfaces/dal/game_play.py @@ -54,6 +54,11 @@ async def save_key( ) -> dto.KeyTime: raise NotImplementedError + async def get_team_typed_keys( + self, game: dto.Game, team: dto.Team, level_number: int + ) -> list[dto.KeyTime]: + raise NotImplementedError + async def level_up(self, team: dto.Team, level: dto.Level, game: dto.Game) -> None: raise NotImplementedError diff --git a/shvatka/core/models/dto/scn/__init__.py b/shvatka/core/models/dto/scn/__init__.py index bc3bd059..a82302c0 100644 --- a/shvatka/core/models/dto/scn/__init__.py +++ b/shvatka/core/models/dto/scn/__init__.py @@ -25,6 +25,6 @@ PhotoHint, ContactHint, ) -from .level import LevelScenario, SHKey, BonusKey, HintsList +from .level import LevelScenario, SHKey, BonusKey, HintsList, Conditions from .parsed_zip import ParsedZip from .time_hint import TimeHint diff --git a/shvatka/core/models/dto/scn/action/__init__.py b/shvatka/core/models/dto/scn/action/__init__.py new file mode 100644 index 00000000..d9649dad --- /dev/null +++ b/shvatka/core/models/dto/scn/action/__init__.py @@ -0,0 +1,11 @@ +from .interface import WinCondition, Action, State, Decision, DecisionType, StateHolder +from .decisions import NotImplementedActionDecision, Decisions +from .keys import ( + KeyDecision, + KeyWinCondition, + TypedKeyAction, + TypedKeysState, + BonusKeyDecision, + KeyBonusCondition, +) +from .state_holder import InMemoryStateHolder diff --git a/shvatka/core/models/dto/scn/action/decisions.py b/shvatka/core/models/dto/scn/action/decisions.py new file mode 100644 index 00000000..5cda9e2a --- /dev/null +++ b/shvatka/core/models/dto/scn/action/decisions.py @@ -0,0 +1,49 @@ +from abc import abstractmethod +from dataclasses import dataclass +from typing import Literal, Sequence, overload + +from shvatka.core.models.dto.scn.action.interface import DecisionType, Decision + + +@dataclass +class NotImplementedActionDecision(Decision): + type: Literal[DecisionType.NO_ACTION] = DecisionType.NOT_IMPLEMENTED + + +class Decisions(Sequence[Decision]): + def __init__(self, decisions: list[Decision]): + self.decisions = decisions + + @overload + @abstractmethod + def __getitem__(self, index: int) -> Decision: + return self.decisions[index] + + @overload + @abstractmethod + def __getitem__(self, index: slice) -> Sequence[Decision]: + return self.decisions[index] + + def __getitem__(self, index): + return self.decisions[index] + + def __len__(self): + return len(self.decisions) + + def __iter__(self): + return iter(self.decisions) + + def get_significant(self) -> "Decisions": + return self.get_all_except(DecisionType.NOT_IMPLEMENTED, DecisionType.NO_ACTION) + + def get_implemented(self) -> "Decisions": + return self.get_all_except(DecisionType.NOT_IMPLEMENTED) + + def get_all(self, *type_: type) -> "Decisions": + return Decisions([d for d in self if isinstance(d, type_)]) + + def get_all_except(self, *type_: DecisionType) -> "Decisions": + return Decisions([d for d in self if d.type not in type_]) + + def get_all_only(self, *type_: DecisionType) -> "Decisions": + return Decisions([d for d in self if d.type in type_]) diff --git a/shvatka/core/models/dto/scn/action/interface.py b/shvatka/core/models/dto/scn/action/interface.py new file mode 100644 index 00000000..7a8fb4f9 --- /dev/null +++ b/shvatka/core/models/dto/scn/action/interface.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import enum +import typing +from typing import Protocol + + +class WinCondition(Protocol): + def check(self, action: Action, state_holder: StateHolder) -> Decision: + raise NotImplementedError + + +class Action(Protocol): + pass + + +class State(Protocol): + pass + + +T = typing.TypeVar("T") + + +class StateHolder(Protocol): + def get(self, state_class: type[T]) -> T: + raise NotImplementedError + + +class Decision(Protocol): + type: DecisionType + + +class DecisionType(enum.StrEnum): + NOT_IMPLEMENTED = enum.auto() + LEVEL_UP = enum.auto() + SIGNIFICANT_ACTION = enum.auto() + NO_ACTION = enum.auto() + BONUS_TIME = enum.auto() diff --git a/shvatka/core/models/dto/scn/action/keys.py b/shvatka/core/models/dto/scn/action/keys.py new file mode 100644 index 00000000..99963af9 --- /dev/null +++ b/shvatka/core/models/dto/scn/action/keys.py @@ -0,0 +1,136 @@ +import typing +from dataclasses import dataclass +from typing import Literal + +from shvatka.core.models import enums, dto +from . import StateHolder +from .decisions import NotImplementedActionDecision +from .interface import Action, State, Decision, WinCondition, DecisionType +from shvatka.core.models.dto.scn import BonusKey + +SHKey: typing.TypeAlias = str + + +@dataclass +class TypedKeyAction(Action): + key: SHKey + + +@dataclass +class TypedKeysState(State): + typed_correct: set[SHKey] + all_typed: set[SHKey] + + def is_duplicate(self, action: TypedKeyAction) -> bool: + return action.key in self.all_typed + + +@dataclass +class WrongKeyDecision(Decision): + duplicate: bool + key: str + type: Literal[DecisionType.NO_ACTION] = DecisionType.NO_ACTION + key_type: typing.Literal[enums.KeyType.wrong] = enums.KeyType.wrong + + +@dataclass +class KeyDecision(Decision): + type: DecisionType + key_type: enums.KeyType + duplicate: bool + key: SHKey + + def is_level_up(self) -> bool: + return self.type == DecisionType.LEVEL_UP + + def to_parsed_key(self) -> dto.ParsedKey: + return dto.ParsedKey( + type_=self.key_type, + text=self.key, + ) + + @property + def key_text(self) -> str: + return self.key + + +@dataclass +class KeyWinCondition(WinCondition): + keys: set[SHKey] + + def check(self, action: Action, state_holder: StateHolder) -> Decision: + if not isinstance(action, TypedKeyAction): + return NotImplementedActionDecision() + state = state_holder.get(TypedKeysState) + type_: DecisionType + if not self._is_correct(action): + return WrongKeyDecision(duplicate=state.is_duplicate(action), key=action.key) + if not state.is_duplicate(action): + if self._is_all_typed(action, state): + type_ = DecisionType.LEVEL_UP + else: + type_ = DecisionType.SIGNIFICANT_ACTION + else: + type_ = DecisionType.NO_ACTION + return KeyDecision( + type=type_, + key_type=enums.KeyType.simple if self._is_correct(action) else enums.KeyType.wrong, + duplicate=state.is_duplicate(action), + key=action.key, + ) + + def _is_correct(self, action: TypedKeyAction) -> bool: + return action.key in self.keys + + def _is_all_typed(self, action: TypedKeyAction, state: TypedKeysState) -> bool: + return self.keys == {*state.typed_correct, action.key} + + +@dataclass +class BonusKeyDecision(Decision): + type: DecisionType + key_type: enums.KeyType + duplicate: bool + key: BonusKey | None + + def to_parsed_key(self) -> dto.ParsedKey: + if self.type == DecisionType.BONUS_TIME: + return dto.ParsedBonusKey( + type_=enums.KeyType.bonus, + text=self.key.text, + bonus_minutes=self.key.bonus_minutes, + ) + else: + return dto.ParsedKey( + type_=enums.KeyType.wrong, + text=self.key.text, + ) + + @property + def key_text(self) -> str: + return self.key.text + + +@dataclass +class KeyBonusCondition(WinCondition): + keys: set[BonusKey] + + def check(self, action: Action, state_holder: StateHolder) -> Decision: + if not isinstance(action, TypedKeyAction): + return NotImplementedActionDecision() + state = state_holder.get(TypedKeysState) + bonus = self._get_bonus(action) + if bonus is None: + return WrongKeyDecision(duplicate=state.is_duplicate(action), key=action.key) + return BonusKeyDecision( + type=DecisionType.BONUS_TIME, + key_type=enums.KeyType.bonus, + duplicate=state.is_duplicate(action), + key=bonus, + ) + + def _get_bonus(self, action: TypedKeyAction) -> BonusKey | None: + for bonus_key in self.keys: + if action.key == bonus_key.text: + return bonus_key + return None diff --git a/shvatka/core/models/dto/scn/action/state_holder.py b/shvatka/core/models/dto/scn/action/state_holder.py new file mode 100644 index 00000000..5a102a3a --- /dev/null +++ b/shvatka/core/models/dto/scn/action/state_holder.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass + +from . import TypedKeysState +from .interface import StateHolder, T +from shvatka.core.models.dto import scn + + +@dataclass +class InMemoryStateHolder(StateHolder): + typed_correct: set[scn.SHKey] + all_typed: set[scn.SHKey] + + def get(self, state_class: type[T]) -> T: + if isinstance(state_class, TypedKeysState): + return TypedKeysState( + typed_correct=self.typed_correct, + all_typed=self.all_typed, + ) + else: + raise NotImplementedError(f"unknown state type {type(state_class)}") diff --git a/shvatka/core/models/dto/scn/level.py b/shvatka/core/models/dto/scn/level.py index c2cea195..435cc7fd 100644 --- a/shvatka/core/models/dto/scn/level.py +++ b/shvatka/core/models/dto/scn/level.py @@ -1,15 +1,28 @@ -import typing +import logging +from abc import abstractmethod from collections.abc import Sequence, Iterable -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import timedelta from typing import overload - +from shvatka.common.log_utils import obfuscate_sensitive from shvatka.core.utils import exceptions from .hint_part import AnyHint from .time_hint import TimeHint, EnumeratedTimeHint - -SHKey: typing.TypeAlias = str +from .action import ( + WinCondition, + Action, + Decision, + StateHolder, + DecisionType, + Decisions, + KeyDecision, + KeyBonusCondition, + NotImplementedActionDecision, +) +from .action.keys import SHKey, KeyWinCondition, TypedKeyAction, WrongKeyDecision + +logger = logging.getLogger(__name__) @dataclass(frozen=True) @@ -129,12 +142,75 @@ def __repr__(self): return repr(self.hints) +class Conditions(Sequence[WinCondition]): + def __init__(self, conditions: Sequence[WinCondition]): + self.validate(conditions) + self.conditions = conditions + + @staticmethod + def validate(conditions: Sequence[WinCondition]) -> None: + keys = set() + win_conditions = [] + for c in conditions: + if isinstance(c, KeyWinCondition): + win_conditions.append(c) + if keys.intersection(c.keys): + raise exceptions.LevelError( + text=f"keys already exists {keys.intersection(c.keys)}" + ) + keys.union(c.keys) + if isinstance(c, KeyBonusCondition): + if keys.intersection({k.text for k in c.keys}): + raise exceptions.LevelError( + text=f"keys already exists {keys.intersection(c.keys)}" + ) + keys.union({k.text for k in c.keys}) + if not win_conditions: + raise exceptions.LevelError(text="There is no win condition") + + def get_keys(self) -> set[str]: + result: set[SHKey] = set() + for condition in self.conditions: + if isinstance(condition, KeyWinCondition): + result.union(condition.keys) + return result + + def get_bonus_keys(self) -> set[BonusKey]: + result: set[BonusKey] = set() + for condition in self.conditions: + if isinstance(condition, KeyBonusCondition): + result.union(condition.keys) + return result + + @overload + @abstractmethod + def __getitem__(self, index: int) -> WinCondition: + return self.conditions[index] + + @overload + @abstractmethod + def __getitem__(self, index: slice) -> Sequence[WinCondition]: + return self.conditions[index] + + def __getitem__(self, index): + return self.conditions[index] + + def __len__(self): + return len(self.conditions) + + def __repr__(self): + return repr(self.conditions) + + @dataclass class LevelScenario: id: str time_hints: HintsList - keys: set[SHKey] = field(default_factory=set) - bonus_keys: set[BonusKey] = field(default_factory=set) + conditions: Conditions + + def __post_init__(self): + if not self.conditions: + raise exceptions.LevelError(text="no win conditions are present") def get_hint(self, hint_number: int) -> TimeHint: return self.time_hints[hint_number] @@ -145,11 +221,42 @@ def get_hint_by_time(self, time: timedelta) -> EnumeratedTimeHint: def is_last_hint(self, hint_number: int) -> bool: return len(self.time_hints) == hint_number + 1 - def get_keys(self) -> set[str]: - return self.keys + def check(self, action: Action, state: StateHolder) -> Decision: + decisions = Decisions([cond.check(action, state) for cond in self.conditions]) + implemented = decisions.get_implemented() + if not implemented: + return NotImplementedActionDecision() + if isinstance(action, TypedKeyAction): + key_decisions = implemented.get_all(KeyDecision, WrongKeyDecision) + if not key_decisions: + return NotImplementedActionDecision() + if not key_decisions.get_significant(): + assert all(d.type == DecisionType.NO_ACTION for d in key_decisions) + if duplicate_correct := key_decisions.get_all(KeyDecision): + if len(duplicate_correct) != 1: + logger.warning( + "more than one duplicate correct key decision %s", + obfuscate_sensitive(duplicate_correct), + ) + return duplicate_correct[0] + return key_decisions[0] + significant_key_decisions = key_decisions.get_significant() + if len(significant_key_decisions) != 1: + logger.warning( + "More than one significant key decision. " + "Will used first but it's not clear %s", + obfuscate_sensitive(significant_key_decisions), + ) + else: + return significant_key_decisions[0] + else: + return NotImplementedActionDecision() + + def get_keys(self) -> set[SHKey]: + return self.conditions.get_keys() def get_bonus_keys(self) -> set[BonusKey]: - return self.bonus_keys + return self.conditions.get_bonus_keys() def get_guids(self) -> list[str]: guids = [] diff --git a/shvatka/core/services/key.py b/shvatka/core/services/key.py index 6666135e..385fecef 100644 --- a/shvatka/core/services/key.py +++ b/shvatka/core/services/key.py @@ -1,13 +1,18 @@ +import logging from dataclasses import dataclass + from shvatka.core.interfaces.dal.game_play import GamePlayerDao -from shvatka.core.models import dto, enums -from shvatka.core.models.dto import scn +from shvatka.core.models import dto +from shvatka.core.models.dto.scn import action from shvatka.core.utils import exceptions from shvatka.core.utils.input_validation import is_key_valid from shvatka.core.utils.key_checker_lock import KeyCheckerFactory +logger = logging.getLogger(__name__) + + @dataclass class KeyProcessor: dao: GamePlayerDao @@ -25,49 +30,35 @@ async def submit_key( player: dto.Player, team: dto.Team, ) -> dto.InsertedKey: - is_level_up = False async with self.locker(team): level = await self.dao.get_current_level(team, self.game) - parsed_key = await self.parse_key(key, level) - saved_key = await self.dao.save_key( - key=parsed_key.text, - team=team, - level=level, - game=self.game, - player=player, - type_=parsed_key.type_, - is_duplicate=await self.is_duplicate(level=level, team=team, key=key), - ) - typed_keys = await self.dao.get_correct_typed_keys( + correct_keys = await self.dao.get_correct_typed_keys( level=level, game=self.game, team=team ) - if parsed_key.type_ == enums.KeyType.simple: - # add just now added key to typed, because no flush in dao - typed_keys.add(parsed_key.text) - if is_level_up := await self.is_level_up(typed_keys, level): - await self.dao.level_up(team=team, level=level, game=self.game) - await self.dao.commit() - return dto.InsertedKey.from_key_time(saved_key, is_level_up, parsed_key=parsed_key) - - async def get_bonus_value(self, key: str, level: dto.Level) -> float: - for bonus_key in level.get_bonus_keys(): - if bonus_key.text == key: - return bonus_key.bonus_minutes - raise AssertionError - - async def parse_key(self, key: str, level: dto.Level) -> dto.ParsedKey: - if key in level.get_bonus_keys_texts(): - return dto.ParsedBonusKey( - type_=enums.KeyType.bonus, - text=key, - bonus_minutes=await self.get_bonus_value(key, level), + all_typed = await self.dao.get_team_typed_keys( + self.game, team, level_number=level.number_in_game ) - if key in level.get_keys(): - return dto.ParsedKey(type_=enums.KeyType.simple, text=key) - return dto.ParsedKey(type_=enums.KeyType.wrong, text=key) - - async def is_duplicate(self, key: scn.SHKey, level: dto.Level, team: dto.Team) -> bool: - return await self.dao.is_key_duplicate(level, team, key) - - async def is_level_up(self, typed_keys: set[scn.SHKey], level: dto.Level) -> bool: - return typed_keys == level.get_keys() + state = action.InMemoryStateHolder( + typed_correct=correct_keys, + all_typed={k.text for k in all_typed}, + ) + decision = level.scenario.check( + action=action.TypedKeyAction(key=key), + state=state, + ) + if isinstance(decision, action.KeyDecision | action.BonusKeyDecision): + saved_key = await self.dao.save_key( + key=decision.key_text, + team=team, + level=level, + game=self.game, + player=player, + type_=decision.key_type, + is_duplicate=decision.duplicate, + ) + if is_level_up := decision.type == action.DecisionType.LEVEL_UP: + await self.dao.level_up(team=team, level=level, game=self.game) + await self.dao.commit() + return dto.InsertedKey.from_key_time( + saved_key, is_level_up, parsed_key=decision.to_parsed_key() + ) diff --git a/shvatka/infrastructure/db/dao/complex/game_play.py b/shvatka/infrastructure/db/dao/complex/game_play.py index 6c28e64f..c451b12f 100644 --- a/shvatka/infrastructure/db/dao/complex/game_play.py +++ b/shvatka/infrastructure/db/dao/complex/game_play.py @@ -120,6 +120,11 @@ async def save_key( is_duplicate=is_duplicate, ) + async def get_team_typed_keys( + self, game: dto.Game, team: dto.Team, level_number: int + ) -> list[dto.KeyTime]: + return await self.key_time.get_team_typed_keys(game, team, level_number) + async def level_up(self, team: dto.Team, level: dto.Level, game: dto.Game) -> None: assert level.number_in_game is not None await self.level_time.set_to_level(