diff --git a/jac-cloud/jac_cloud/core/architype.py b/jac-cloud/jac_cloud/core/architype.py index c893c7cbb..7679b1f7d 100644 --- a/jac-cloud/jac_cloud/core/architype.py +++ b/jac-cloud/jac_cloud/core/architype.py @@ -9,6 +9,7 @@ is_dataclass, ) from enum import Enum +from functools import cached_property from os import getenv from pickle import dumps as pdumps from re import IGNORECASE, compile @@ -18,6 +19,7 @@ ClassVar, Iterable, Mapping, + Type, TypeVar, cast, get_args, @@ -35,12 +37,12 @@ DSFunc, EdgeAnchor as _EdgeAnchor, EdgeArchitype as _EdgeArchitype, + JID, NodeAnchor as _NodeAnchor, NodeArchitype as _NodeArchitype, ObjectAnchor as _ObjectAnchor, ObjectArchitype as _ObjectArchitype, Permission as _Permission, - TANCH, WalkerAnchor as _WalkerAnchor, WalkerArchitype as _WalkerArchitype, ) @@ -56,14 +58,20 @@ from ..jaseci.utils import logger MANUAL_SAVE = getenv("MANUAL_SAVE") -GENERIC_ID_REGEX = compile(r"^(n|e|w|o):([^:]*):([a-f\d]{24})$", IGNORECASE) -NODE_ID_REGEX = compile(r"^n:([^:]*):([a-f\d]{24})$", IGNORECASE) -EDGE_ID_REGEX = compile(r"^e:([^:]*):([a-f\d]{24})$", IGNORECASE) -WALKER_ID_REGEX = compile(r"^w:([^:]*):([a-f\d]{24})$", IGNORECASE) -OBJECT_ID_REGEX = compile(r"^o:([^:]*):([a-f\d]{24})$", IGNORECASE) +JID_REGEX = compile(r"^(n|e|w|o):([^:]*):([a-f\d]{24})$", IGNORECASE) T = TypeVar("T") TBA = TypeVar("TBA", bound="BaseArchitype") +_ANCHOR = TypeVar("_ANCHOR", "NodeAnchor", "EdgeAnchor", "WalkerAnchor", "ObjectAnchor") +_C_ANCHOR = TypeVar( + "_C_ANCHOR", + "NodeAnchor", + "EdgeAnchor", + "WalkerAnchor", + "ObjectAnchor", + covariant=True, +) + def asdict_factory(data: Iterable[tuple]) -> dict[str, Any]: """Parse dataclass to dict.""" @@ -296,7 +304,7 @@ class Access(_Access): def serialize(self) -> dict[str, object]: """Serialize Access.""" return { - "anchors": {key: val.name for key, val in self.anchors.items()}, + "anchors": {str(key): val.name for key, val in self.anchors.items()}, } @classmethod @@ -304,7 +312,9 @@ def deserialize(cls, data: dict[str, Any]) -> "Access": """Deserialize Access.""" anchors = cast(dict[str, str], data.get("anchors")) return Access( - anchors={key: AccessLevel[val] for key, val in anchors.items()}, + anchors={ + JacCloudJID(key): AccessLevel[val] for key, val in anchors.items() + }, ) @@ -338,14 +348,59 @@ class AnchorState: connected: bool = False -@dataclass(eq=False, repr=False, kw_only=True) +@dataclass(eq=False, kw_only=True) +class JacCloudJID(JID[_C_ANCHOR]): + """Jaclang ID Implementation.""" + + def __init__( + self, + id: str | ObjectId | None = None, + type: Type[_C_ANCHOR] | None = None, + name: str = "", + ) -> None: + """Override JID initializer.""" + match id: + case str(): + if matched := JID_REGEX.search(id): + self.id = ObjectId(matched.group(3)) + self.name = matched.group(2) + # currently no way to base hinting on string regex! + match matched.group(1).lower(): + case "n": + self.type = NodeAnchor # type: ignore [assignment] + case "e": + self.type = EdgeAnchor # type: ignore [assignment] + case "w": + self.type = WalkerAnchor # type: ignore [assignment] + case _: + self.type = ObjectAnchor # type: ignore [assignment] + return + raise ValueError("Not a valid JID format!") + case ObjectId(): + self.id = id + case None: + self.id = ObjectId() + case _: + raise ValueError("Not a valid id for JID!") + + if type is None: + raise ValueError("Type is required from non string JID!") + self.type = type + self.name = name + + def __hash__(self) -> int: + """Return default hasher.""" + return hash(self.__cached_repr__) + + +@dataclass(eq=False, kw_only=True) class BaseAnchor: """Base Anchor.""" architype: "BaseArchitype" name: str = "" id: ObjectId = field(default_factory=ObjectId) - root: ObjectId | None = None + root: JacCloudJID["NodeAnchor"] | None = None access: Permission state: AnchorState @@ -354,33 +409,21 @@ class Collection(BaseCollection["BaseAnchor"]): pass - @property - def ref_id(self) -> str: - """Return id in reference type.""" - return f"{self.__class__.__name__[:1].lower()}:{self.name}:{self.id}" + @cached_property + def jid(self: _ANCHOR) -> JacCloudJID[_ANCHOR]: # type: ignore[misc] + """Get JID representation.""" + jid = JacCloudJID[_ANCHOR]( + self.id, + self.__class__, + ( + "" + if isinstance(self.architype, (GenericEdge, Root)) + else self.architype.__class__.__name__ + ), + ) + jid.anchor = self - @staticmethod - def ref(ref_id: str) -> "BaseAnchor | Anchor": - """Return ObjectAnchor instance if .""" - if match := GENERIC_ID_REGEX.search(ref_id): - cls: type[BaseAnchor] - - match match.group(1): - case "n": - cls = NodeAnchor - case "e": - cls = EdgeAnchor - case "w": - cls = WalkerAnchor - case "o": - cls = ObjectAnchor - case _: - raise ValueError(f"[{ref_id}] is not a valid reference!") - anchor = object.__new__(cls) - anchor.name = str(match.group(2)) - anchor.id = ObjectId(match.group(3)) - return anchor - raise ValueError(f"[{ref_id}] is not a valid reference!") + return jid #################################################### # QUERY OPERATIONS # @@ -452,33 +495,6 @@ def disconnect_edge(self, anchor: Anchor) -> None: # POPULATE OPERATIONS # #################################################### - def is_populated(self) -> bool: - """Check if populated.""" - return "architype" in self.__dict__ - - def make_stub(self: "BaseAnchor | TANCH") -> "BaseAnchor | TANCH": - """Return unsynced copy of anchor.""" - if self.is_populated(): - unloaded = object.__new__(self.__class__) - # this will be refactored on abstraction - unloaded.name = self.name # type: ignore[attr-defined] - unloaded.id = self.id # type: ignore[attr-defined] - return unloaded # type: ignore[return-value] - return self - - def populate(self) -> None: - """Retrieve the Architype from db and return.""" - from .context import JaseciContext - - jsrc = JaseciContext.get().mem - - if anchor := jsrc.find_by_id(self): - self.__dict__.update(anchor.__dict__) - else: - raise ValueError( - f"{self.__class__.__name__} [{self.ref_id}] is not a valid reference!" - ) - def build_query( self, bulk_write: BulkWrite, @@ -557,15 +573,15 @@ def update(self, bulk_write: BulkWrite, propagate: bool = False) -> None: ############################################################ # POPULATE ADDED EDGES # ############################################################ - added_edges: set[BaseAnchor | Anchor] = ( + added_edges: set[BaseAnchor] = ( changes.get("$addToSet", {}).get("edges", {}).get("$each", []) ) if added_edges: _added_edges = [] for anchor in added_edges: if propagate: - anchor.build_query(bulk_write) # type: ignore[operator] - _added_edges.append(anchor.ref_id) + anchor.build_query(bulk_write) + _added_edges.append(str(anchor.jid)) changes["$addToSet"]["edges"]["$each"] = _added_edges else: changes.pop("$addToSet", None) @@ -575,17 +591,16 @@ def update(self, bulk_write: BulkWrite, propagate: bool = False) -> None: ############################################################ # POPULATE REMOVED EDGES # ############################################################ - pulled_edges: set[BaseAnchor | Anchor] = ( + pulled_edges: set[BaseAnchor] = ( changes.get("$pull", {}).get("edges", {}).get("$in", []) ) if pulled_edges: _pulled_edges = [] for anchor in pulled_edges: - # will be refactored on abstraction - if propagate and anchor.state.deleted is not True: # type: ignore[attr-defined] - anchor.state.deleted = True # type: ignore[attr-defined] - bulk_write.del_edge(anchor.id) # type: ignore[attr-defined, arg-type] - _pulled_edges.append(anchor.ref_id) + if propagate and anchor.state.deleted is not True: + anchor.state.deleted = True + bulk_write.del_edge(anchor.id) + _pulled_edges.append(str(anchor.jid)) if added_edges: # Isolate pull to avoid conflict with addToSet @@ -618,26 +633,19 @@ def has_changed(self) -> int: def sync_hash(self) -> None: """Sync current serialization hash.""" - if is_dataclass(architype := self.architype) and not isinstance( - architype, type - ): - self.state.context_hashes = { - key: hash(val if isinstance(val, bytes) else dumps(val)) - for key, val in architype.__serialize__().items() # type:ignore[attr-defined] # mypy issue - } - self.state.full_hash = hash(pdumps(self.serialize())) + self.state.context_hashes = { + key: hash(val if isinstance(val, bytes) else dumps(val)) + for key, val in self.architype.__serialize__().items() + } + self.state.full_hash = hash(pdumps(self.serialize())) # ---------------------------------------------------------------------- # def report(self) -> dict[str, object]: """Report Anchor.""" return { - "id": self.ref_id, - "context": ( - self.architype.__serialize__() # type:ignore[attr-defined] # mypy issue - if is_dataclass(self.architype) and not isinstance(self.architype, type) - else {} - ), + "id": str(self.jid), + "context": self.architype.__serialize__(), } def serialize(self) -> dict[str, object]: @@ -645,35 +653,18 @@ def serialize(self) -> dict[str, object]: return { "_id": self.id, "name": self.name, - "root": self.root, + "root": str(self.root) if self.root else None, "access": self.access.serialize(), - "architype": ( - self.architype.__serialize__() # type:ignore[attr-defined] # mypy issue - if is_dataclass(self.architype) and not isinstance(self.architype, type) - else {} - ), + "architype": self.architype.__serialize__(), } - def __repr__(self) -> str: - """Override representation.""" - if self.is_populated(): - attrs = "" - for f in fields(self): - if f.name in self.__dict__: - attrs += f"{f.name}={self.__dict__[f.name]}, " - attrs = attrs[:-2] - else: - attrs = f"name={self.name}, id={self.id}" - - return f"{self.__class__.__name__}({attrs})" - -@dataclass(eq=False, repr=False, kw_only=True) -class NodeAnchor(BaseAnchor, _NodeAnchor): # type: ignore[misc] +@dataclass(eq=False, kw_only=True) +class NodeAnchor(BaseAnchor, _NodeAnchor): # type:ignore[misc] """Node Anchor.""" architype: "NodeArchitype" - edges: list["EdgeAnchor"] # type: ignore[assignment] + edges: list[JacCloudJID["EdgeAnchor"]] # type:ignore[assignment] class Collection(BaseCollection["NodeAnchor"]): """NodeAnchor collection interface.""" @@ -695,7 +686,14 @@ def __document__(cls, doc: Mapping[str, Any]) -> "NodeAnchor": anchor = NodeAnchor( architype=architype, id=doc.pop("_id"), - edges=[e for edge in doc.pop("edges") if (e := EdgeAnchor.ref(edge))], + root=( + JacCloudJID[NodeAnchor](root) if (root := doc.pop("root")) else None + ), + edges=[ + e + for edge in doc.pop("edges") + if (e := JacCloudJID[EdgeAnchor](edge)) + ], access=Permission.deserialize(doc.pop("access")), state=AnchorState(connected=True), persistent=True, @@ -705,30 +703,22 @@ def __document__(cls, doc: Mapping[str, Any]) -> "NodeAnchor": anchor.sync_hash() return anchor - @classmethod - def ref(cls, ref_id: str) -> "NodeAnchor": - """Return NodeAnchor instance if existing.""" - if match := NODE_ID_REGEX.search(ref_id): - anchor = object.__new__(cls) - anchor.name = str(match.group(1)) - anchor.id = ObjectId(match.group(2)) - return anchor - raise ValueError(f"[{ref_id}] is not a valid reference!") - def insert( self, bulk_write: BulkWrite, ) -> None: """Append Insert Query.""" - for edge in self.edges: - edge.build_query(bulk_write) + for jid in self.edges: + if anchor := jid.anchor: + anchor.build_query(bulk_write) bulk_write.operations[NodeAnchor].append(InsertOne(self.serialize())) def delete(self, bulk_write: BulkWrite) -> None: """Append Delete Query.""" - for edge in self.edges: - edge.delete(bulk_write) + for jid in self.edges: + if anchor := jid.anchor: + anchor.delete(bulk_write) pulled_edges: set[EdgeAnchor] = self._pull.get("edges", {}).get("$in", []) for edge in pulled_edges: @@ -740,18 +730,17 @@ def serialize(self) -> dict[str, object]: """Serialize Node Anchor.""" return { **super().serialize(), - "edges": [edge.ref_id for edge in self.edges], + "edges": [str(edge) for edge in self.edges], } -@dataclass(eq=False, repr=False, kw_only=True) +@dataclass(eq=False, kw_only=True) class EdgeAnchor(BaseAnchor, _EdgeAnchor): # type: ignore[misc] """Edge Anchor.""" architype: "EdgeArchitype" - source: NodeAnchor - target: NodeAnchor - is_undirected: bool + source: JacCloudJID[NodeAnchor] + target: JacCloudJID[NodeAnchor] class Collection(BaseCollection["EdgeAnchor"]): """EdgeAnchor collection interface.""" @@ -772,8 +761,11 @@ def __document__(cls, doc: Mapping[str, Any]) -> "EdgeAnchor": anchor = EdgeAnchor( architype=architype, id=doc.pop("_id"), - source=NodeAnchor.ref(doc.pop("source")), - target=NodeAnchor.ref(doc.pop("target")), + root=( + JacCloudJID[NodeAnchor](root) if (root := doc.pop("root")) else None + ), + source=JacCloudJID[NodeAnchor](doc.pop("source")), + target=JacCloudJID[NodeAnchor](doc.pop("target")), access=Permission.deserialize(doc.pop("access")), state=AnchorState(connected=True), persistent=True, @@ -783,32 +775,22 @@ def __document__(cls, doc: Mapping[str, Any]) -> "EdgeAnchor": anchor.sync_hash() return anchor - @classmethod - def ref(cls, ref_id: str) -> "EdgeAnchor": - """Return EdgeAnchor instance if existing.""" - if match := EDGE_ID_REGEX.search(ref_id): - anchor = object.__new__(cls) - anchor.name = str(match.group(1)) - anchor.id = ObjectId(match.group(2)) - return anchor - raise ValueError(f"{ref_id}] is not a valid reference!") - def insert(self, bulk_write: BulkWrite) -> None: """Append Insert Query.""" - if source := self.source: + if source := self.source.anchor: source.build_query(bulk_write) - if target := self.target: + if target := self.target.anchor: target.build_query(bulk_write) bulk_write.operations[EdgeAnchor].append(InsertOne(self.serialize())) def delete(self, bulk_write: BulkWrite) -> None: """Append Delete Query.""" - if source := self.source: + if source := self.source.anchor: source.build_query(bulk_write) - if target := self.target: + if target := self.target.anchor: target.build_query(bulk_write) bulk_write.del_edge(self.id) @@ -817,13 +799,13 @@ def serialize(self) -> dict[str, object]: """Serialize Node Anchor.""" return { **super().serialize(), - "source": self.source.ref_id if self.source else None, - "target": self.target.ref_id if self.target else None, + "source": str(self.source) if self.source else None, + "target": str(self.target) if self.target else None, "is_undirected": self.is_undirected, } -@dataclass(eq=False, repr=False, kw_only=True) +@dataclass(eq=False, kw_only=True) class WalkerAnchor(BaseAnchor, _WalkerAnchor): # type: ignore[misc] """Walker Anchor.""" @@ -832,7 +814,6 @@ class WalkerAnchor(BaseAnchor, _WalkerAnchor): # type: ignore[misc] next: list[Anchor] = field(default_factory=list) returns: list[Any] = field(default_factory=list) ignores: list[Anchor] = field(default_factory=list) - disengaged: bool = False class Collection(BaseCollection["WalkerAnchor"]): """WalkerAnchor collection interface.""" @@ -853,6 +834,9 @@ def __document__(cls, doc: Mapping[str, Any]) -> "WalkerAnchor": anchor = WalkerAnchor( architype=architype, id=doc.pop("_id"), + root=( + JacCloudJID[NodeAnchor](root) if (root := doc.pop("root")) else None + ), access=Permission.deserialize(doc.pop("access")), state=AnchorState(connected=True), persistent=True, @@ -862,16 +846,6 @@ def __document__(cls, doc: Mapping[str, Any]) -> "WalkerAnchor": anchor.sync_hash() return anchor - @classmethod - def ref(cls, ref_id: str) -> "WalkerAnchor": - """Return EdgeAnchor instance if existing.""" - if match := WALKER_ID_REGEX.search(ref_id): - anchor = object.__new__(cls) - anchor.name = str(match.group(1)) - anchor.id = ObjectId(match.group(2)) - return anchor - raise ValueError(f"{ref_id}] is not a valid reference!") - def insert( self, bulk_write: BulkWrite, @@ -884,7 +858,7 @@ def delete(self, bulk_write: BulkWrite) -> None: bulk_write.del_walker(self.id) -@dataclass(eq=False, repr=False, kw_only=True) +@dataclass(eq=False, kw_only=True) class ObjectAnchor(BaseAnchor, _ObjectAnchor): # type: ignore[misc] """Object Anchor.""" @@ -910,6 +884,9 @@ def __document__(cls, doc: Mapping[str, Any]) -> "ObjectAnchor": anchor = ObjectAnchor( architype=architype, id=doc.pop("_id"), + root=( + JacCloudJID[NodeAnchor](root) if (root := doc.pop("root")) else None + ), access=Permission.deserialize(doc.pop("access")), state=AnchorState(connected=True), persistent=True, @@ -919,16 +896,6 @@ def __document__(cls, doc: Mapping[str, Any]) -> "ObjectAnchor": anchor.sync_hash() return anchor - @classmethod - def ref(cls, ref_id: str) -> "ObjectAnchor": - """Return NodeAnchor instance if existing.""" - if match := NODE_ID_REGEX.search(ref_id): - anchor = object.__new__(cls) - anchor.name = str(match.group(1)) - anchor.id = ObjectId(match.group(2)) - return anchor - raise ValueError(f"[{ref_id}] is not a valid reference!") - def insert( self, bulk_write: BulkWrite, @@ -947,7 +914,7 @@ class BaseArchitype: __jac_classes__: dict[str, type["BaseArchitype"]] __jac_hintings__: dict[str, type] - __jac__: Anchor + __jac__: BaseAnchor def __serialize__(self) -> dict[str, Any]: """Process default serialization.""" diff --git a/jac-cloud/jac_cloud/core/context.py b/jac-cloud/jac_cloud/core/context.py index 803254265..025033b98 100644 --- a/jac-cloud/jac_cloud/core/context.py +++ b/jac-cloud/jac_cloud/core/context.py @@ -16,6 +16,7 @@ Anchor, AnchorState, BaseArchitype, + JacCloudJID, NodeAnchor, Permission, Root, @@ -29,8 +30,8 @@ SUPER_ROOT_ID = ObjectId("000000000000000000000000") PUBLIC_ROOT_ID = ObjectId("000000000000000000000001") -SUPER_ROOT = NodeAnchor.ref(f"n::{SUPER_ROOT_ID}") -PUBLIC_ROOT = NodeAnchor.ref(f"n::{PUBLIC_ROOT_ID}") +SUPER_ROOT_JID = JacCloudJID[NodeAnchor](f"n::{SUPER_ROOT_ID}") +PUBLIC_ROOT_JID = JacCloudJID[NodeAnchor](f"n::{PUBLIC_ROOT_ID}") RT = TypeVar("RT") @@ -69,7 +70,7 @@ def close(self) -> None: self.mem.close() @staticmethod - def create(request: Request, entry: NodeAnchor | None = None) -> "JaseciContext": # type: ignore[override] + def create(request: Request, entry: str | None = None) -> "JaseciContext": # type: ignore[override] """Create JacContext.""" ctx = JaseciContext() ctx.base = ExecutionContext.get() @@ -78,7 +79,9 @@ def create(request: Request, entry: NodeAnchor | None = None) -> "JaseciContext" ctx.reports = [] ctx.status = 200 - if not isinstance(system_root := ctx.mem.find_by_id(SUPER_ROOT), NodeAnchor): + if not isinstance( + system_root := ctx.mem.find_by_id(SUPER_ROOT_JID), NodeAnchor + ): system_root = NodeAnchor( architype=object.__new__(Root), id=SUPER_ROOT_ID, @@ -90,7 +93,7 @@ def create(request: Request, entry: NodeAnchor | None = None) -> "JaseciContext" system_root.architype.__jac__ = system_root NodeAnchor.Collection.insert_one(system_root.serialize()) system_root.sync_hash() - ctx.mem.set(system_root.id, system_root) + ctx.mem.set(system_root.jid, system_root) ctx.system_root = system_root @@ -99,7 +102,7 @@ def create(request: Request, entry: NodeAnchor | None = None) -> "JaseciContext" ctx.mem.set(_root.id, _root) else: if not isinstance( - public_root := ctx.mem.find_by_id(PUBLIC_ROOT), NodeAnchor + public_root := ctx.mem.find_by_id(PUBLIC_ROOT_JID), NodeAnchor ): public_root = NodeAnchor( architype=object.__new__(Root), @@ -110,13 +113,13 @@ def create(request: Request, entry: NodeAnchor | None = None) -> "JaseciContext" edges=[], ) public_root.architype.__jac__ = public_root - ctx.mem.set(public_root.id, public_root) + ctx.mem.set(public_root.jid, public_root) ctx.root = public_root if entry: - if not isinstance(entry_node := ctx.mem.find_by_id(entry), NodeAnchor): - raise ValueError(f"Invalid anchor id {entry.ref_id} !") + if not (entry_node := ctx.mem.find_by_id(JacCloudJID[NodeAnchor](entry))): + raise ValueError(f"Invalid anchor id {entry} !") ctx.entry_node = entry_node else: ctx.entry_node = ctx.root diff --git a/jac-cloud/jac_cloud/core/memory.py b/jac-cloud/jac_cloud/core/memory.py index 84cac7c2c..14dbd9ea5 100644 --- a/jac-cloud/jac_cloud/core/memory.py +++ b/jac-cloud/jac_cloud/core/memory.py @@ -1,8 +1,8 @@ """Memory abstraction for jaseci plugin.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from os import getenv -from typing import Callable, Generator, Iterable, TypeVar, cast +from typing import Callable, Generator, Iterable, TypeVar from bson import ObjectId @@ -14,10 +14,9 @@ from pymongo.client_session import ClientSession from .architype import ( - Anchor, - BaseAnchor, BulkWrite, EdgeAnchor, + JacCloudJID, NodeAnchor, ObjectAnchor, Root, @@ -27,78 +26,90 @@ DISABLE_AUTO_CLEANUP = getenv("DISABLE_AUTO_CLEANUP") == "true" SINGLE_QUERY = getenv("SINGLE_QUERY") == "true" -IDS = ObjectId | Iterable[ObjectId] -BA = TypeVar("BA", bound="BaseAnchor") + +_ANCHOR = TypeVar("_ANCHOR", NodeAnchor, EdgeAnchor, WalkerAnchor, ObjectAnchor) @dataclass -class MongoDB(Memory[ObjectId, BaseAnchor | Anchor]): +class MongoDB(Memory): """Shelf Handler.""" + __mem__: dict[ + JacCloudJID, NodeAnchor | EdgeAnchor | WalkerAnchor | ObjectAnchor + ] = field( + default_factory=dict + ) # type: ignore[assignment] __session__: ClientSession | None = None - def populate_data(self, edges: Iterable[EdgeAnchor]) -> None: + def populate_data(self, edges: Iterable[JacCloudJID[EdgeAnchor]]) -> None: """Populate data to avoid multiple query.""" if not SINGLE_QUERY: - nodes: set[NodeAnchor] = set() + nodes: set[JacCloudJID] = set() for edge in self.find(edges): - if edge.source: - nodes.add(edge.source) - if edge.target: - nodes.add(edge.target) + nodes.add(edge.source) + nodes.add(edge.target) self.find(nodes) def find( # type: ignore[override] self, - anchors: BA | Iterable[BA], - filter: Callable[[Anchor], Anchor] | None = None, + ids: JacCloudJID[_ANCHOR] | Iterable[JacCloudJID[_ANCHOR]], + filter: Callable[[_ANCHOR], _ANCHOR] | None = None, session: ClientSession | None = None, - ) -> Generator[BA, None, None]: + ) -> Generator[_ANCHOR, None, None]: """Find anchors from datasource by ids with filter.""" - if not isinstance(anchors, Iterable): - anchors = [anchors] - - collections: dict[type[Collection[BaseAnchor]], list[ObjectId]] = {} - for anchor in anchors: - if anchor.id not in self.__mem__ and anchor not in self.__gc__: - coll = collections.get(anchor.Collection) + if not isinstance(ids, Iterable): + ids = [ids] + + collections: dict[ + type[ + Collection[NodeAnchor] + | Collection[EdgeAnchor] + | Collection[WalkerAnchor] + | Collection[ObjectAnchor] + ], + list[ObjectId], + ] = {} + for jid in ids: + if jid not in self.__mem__ and jid not in self.__gc__: + coll = collections.get(jid.type.Collection) if coll is None: - coll = collections[anchor.Collection] = [] + coll = collections[jid.type.Collection] = [] - coll.append(anchor.id) + coll.append(jid.id) - for cl, ids in collections.items(): + for cl, oids in collections.items(): for anch_db in cl.find( { - "_id": {"$in": ids}, + "_id": {"$in": oids}, }, session=session or self.__session__, ): - self.__mem__[anch_db.id] = anch_db + self.__mem__[anch_db.jid] = anch_db - for anchor in anchors: + for jid in ids: if ( - anchor not in self.__gc__ - and (anch_mem := self.__mem__.get(anchor.id)) - and (not filter or filter(anch_mem)) # type: ignore[arg-type] + jid not in self.__gc__ + and (anch_mem := self.__mem__.get(jid)) + and isinstance(anch_mem, jid.type) + and (not filter or filter(anch_mem)) ): - yield cast(BA, anch_mem) + yield anch_mem def find_one( # type: ignore[override] self, - anchors: BA | Iterable[BA], - filter: Callable[[Anchor], Anchor] | None = None, + ids: JacCloudJID[_ANCHOR] | Iterable[JacCloudJID[_ANCHOR]], + filter: Callable[[_ANCHOR], _ANCHOR] | None = None, session: ClientSession | None = None, - ) -> BA | None: + ) -> _ANCHOR | None: """Find one anchor from memory by ids with filter.""" - return next(self.find(anchors, filter, session), None) + return next(self.find(ids, filter, session), None) - def find_by_id(self, anchor: BA) -> BA | None: + def find_by_id(self, id: JacCloudJID[_ANCHOR]) -> _ANCHOR | None: # type: ignore[override] """Find one by id.""" - data = super().find_by_id(anchor.id) + data = super().find_by_id(id) - if not data and (data := anchor.Collection.find_by_id(anchor.id)): - self.__mem__[data.id] = data + if not data and (data := id.type.Collection.find_by_id(id.id)): + self.__mem__[data.jid] = data return data @@ -115,7 +126,9 @@ def close(self) -> None: super().close() - def sync_mem_to_db(self, bulk_write: BulkWrite, keys: Iterable[ObjectId]) -> None: + def sync_mem_to_db( + self, bulk_write: BulkWrite, keys: Iterable[JacCloudJID] + ) -> None: """Manually sync memory to db.""" for key in keys: if ( diff --git a/jac-cloud/jac_cloud/plugin/jaseci.py b/jac-cloud/jac_cloud/plugin/jaseci.py index 2b58283dc..58ee9dfc8 100644 --- a/jac-cloud/jac_cloud/plugin/jaseci.py +++ b/jac-cloud/jac_cloud/plugin/jaseci.py @@ -1,7 +1,6 @@ """Jac Language Features.""" from collections import OrderedDict -from contextlib import suppress from dataclasses import Field, MISSING, fields, is_dataclass from functools import wraps from os import getenv @@ -45,8 +44,11 @@ EdgeAnchor, EdgeArchitype, GenericEdge, + JID, + JacCloudJID, NodeAnchor, NodeArchitype, + ObjectAnchor, ObjectArchitype, Permission, Root, @@ -195,7 +197,7 @@ def api_entry( except ValidationError as e: return ORJSONResponse({"detail": e.errors()}) - jctx = JaseciContext.create(request, NodeAnchor.ref(node) if node else None) + jctx = JaseciContext.create(request, node) wlk: WalkerAnchor = cls(**body, **pl["query"], **pl["files"]).__jac__ if Jac.check_read_access(jctx.entry_node): @@ -211,7 +213,7 @@ def api_entry( return ORJSONResponse(resp, jctx.status) else: error = { - "error": f"You don't have access on target entry{cast(Anchor, jctx.entry_node).ref_id}!" + "error": f"You don't have access on target entry {jctx.entry_node.jid}!" } jctx.close() @@ -312,14 +314,16 @@ class JacCallableImplementation: """Callable Implementations.""" @staticmethod - def get_object(id: str) -> Architype | None: + def get_object(id: str | JacCloudJID) -> Architype | None: """Get object by id.""" if not FastAPI.is_enabled(): return _JacCallableImplementation.get_object(id=id) - with suppress(ValueError): - if isinstance(architype := BaseAnchor.ref(id).architype, Architype): - return architype + if isinstance(id, str): + id = JacCloudJID(id) + + if anchor := id.anchor: + return anchor.architype return None @@ -330,32 +334,28 @@ class JacAccessValidationPlugin: @staticmethod @hookimpl def allow_root( - architype: Architype, root_id: BaseAnchor, level: AccessLevel | int | str + architype: Architype, root_id: JacCloudJID, level: AccessLevel | int | str ) -> None: """Allow all access from target root graph to current Architype.""" if not FastAPI.is_enabled(): - JacFeatureImpl.allow_root( - architype=architype, root_id=root_id, level=level # type: ignore[arg-type] - ) + JacFeatureImpl.allow_root(architype=architype, root_id=root_id, level=level) return anchor = architype.__jac__ level = AccessLevel.cast(level) access = anchor.access.roots - if ( - isinstance(anchor, BaseAnchor) - and (ref_id := root_id.ref_id) - and level != access.anchors.get(ref_id, AccessLevel.NO_ACCESS) + if isinstance(anchor, BaseAnchor) and level != access.anchors.get( + root_id, AccessLevel.NO_ACCESS ): - access.anchors[ref_id] = level - anchor._set.update({f"access.roots.anchors.{ref_id}": level.name}) - anchor._unset.pop(f"access.roots.anchors.{ref_id}", None) + access.anchors[root_id] = level + anchor._set.update({f"access.roots.anchors.{root_id}": level.name}) + anchor._unset.pop(f"access.roots.anchors.{root_id}", None) @staticmethod @hookimpl def disallow_root( - architype: Architype, root_id: BaseAnchor, level: AccessLevel | int | str + architype: Architype, root_id: JacCloudJID, level: AccessLevel | int | str ) -> None: """Disallow all access from target root graph to current Architype.""" if not FastAPI.is_enabled(): @@ -370,11 +370,10 @@ def disallow_root( access = anchor.access.roots if ( isinstance(anchor, BaseAnchor) - and (ref_id := root_id.ref_id) - and access.anchors.pop(ref_id, None) is not None + and access.anchors.pop(root_id, None) is not None ): - anchor._unset.update({f"access.roots.anchors.{ref_id}": True}) - anchor._set.pop(f"access.roots.anchors.{ref_id}", None) + anchor._unset.update({f"access.roots.anchors.{root_id}": True}) + anchor._set.pop(f"access.roots.anchors.{root_id}", None) @staticmethod @hookimpl @@ -424,7 +423,7 @@ def check_access_level(to: Anchor) -> AccessLevel: # if current root is system_root # if current root id is equal to target anchor's root id # if current root is the target anchor - if jroot == jctx.system_root or jroot.id == to.root or jroot == to: + if jroot == jctx.system_root or jroot.jid == to.root or jroot == to: return AccessLevel.WRITE access_level = AccessLevel.NO_ACCESS @@ -435,19 +434,17 @@ def check_access_level(to: Anchor) -> AccessLevel: # if target anchor's root have set allowed roots # if current root is allowed to the whole graph of target anchor's root - if to.root and isinstance( - to_root := jctx.mem.find_by_id(NodeAnchor.ref(f"n::{to.root}")), Anchor - ): + if to.root and (to_root := to.root.anchor): if to_root.access.all > access_level: access_level = to_root.access.all - level = to_root.access.roots.check(jroot.ref_id) + level = to_root.access.roots.check(jroot.jid) if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: access_level = level # if target anchor have set allowed roots # if current root is allowed to target anchor - level = to_access.roots.check(jroot.ref_id) + level = to_access.roots.check(jroot.jid) if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: access_level = level @@ -501,11 +498,13 @@ def detach(edge: EdgeAnchor) -> None: JacFeatureImpl.detach(edge=edge) return - Jac.remove_edge(node=edge.source, edge=edge) - edge.source.disconnect_edge(edge) + if source := edge.source.anchor: + Jac.remove_edge(node=source, edge=edge) + source.disconnect_edge(edge) - Jac.remove_edge(node=edge.target, edge=edge) - edge.target.disconnect_edge(edge) + if target := edge.target.anchor: + Jac.remove_edge(node=target, edge=edge) + target.disconnect_edge(edge) class JacPlugin(JacAccessValidationPlugin, JacNodePlugin, JacEdgePlugin): @@ -529,26 +528,31 @@ def reset_graph(root: Root | None = None) -> int: ctx = JaseciContext.get() ranchor = root.__jac__ if root else ctx.root - + ranchor_jid = str(ranchor.jid) deleted_count = 0 # noqa: SIM113 for node in NodeAnchor.Collection.find( - {"_id": {"$ne": ranchor.id}, "root": ranchor.id} + {"_id": {"$ne": ranchor.id}, "root": ranchor_jid} ): - ctx.mem.__mem__[node.id] = node + ctx.mem.__mem__[node.jid] = node Jac.destroy(node) deleted_count += 1 - for edge in EdgeAnchor.Collection.find({"root": ranchor.id}): - ctx.mem.__mem__[edge.id] = edge + for edge in EdgeAnchor.Collection.find({"root": ranchor_jid}): + ctx.mem.__mem__[edge.jid] = edge Jac.destroy(edge) deleted_count += 1 - for walker in WalkerAnchor.Collection.find({"root": ranchor.id}): - ctx.mem.__mem__[walker.id] = walker + for walker in WalkerAnchor.Collection.find({"root": ranchor_jid}): + ctx.mem.__mem__[walker.jid] = walker Jac.destroy(walker) deleted_count += 1 + for obj in ObjectAnchor.Collection.find({"root": ranchor_jid}): + ctx.mem.__mem__[obj.jid] = obj + Jac.destroy(obj) + deleted_count += 1 + return deleted_count @staticmethod @@ -721,14 +725,14 @@ def builder(source: NodeAnchor, target: NodeAnchor) -> EdgeArchitype: eanch = edge.__jac__ = EdgeAnchor( architype=edge, name=("" if isinstance(edge, GenericEdge) else edge.__class__.__name__), - source=source, - target=target, + source=source.jid, + target=target.jid, is_undirected=is_undirected, access=Permission(), state=AnchorState(), ) - source.edges.append(eanch) - target.edges.append(eanch) + source.edges.append(eanch.jid) + target.edges.append(eanch.jid) source.connect_edge(eanch) target.connect_edge(eanch) @@ -754,12 +758,12 @@ def get_object_func() -> Callable[[str], Architype | None]: @staticmethod @hookimpl - def object_ref(obj: Architype) -> str: + def object_ref(obj: Architype) -> JID: """Get object reference id.""" if not FastAPI.is_enabled(): return JacFeatureImpl.object_ref(obj=obj) - return str(obj.__jac__.ref_id) + return obj.__jac__.jid @staticmethod @hookimpl @@ -775,8 +779,10 @@ def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: walker = op1.__jac__ if isinstance(op2, NodeArchitype): node = op2.__jac__ - elif isinstance(op2, EdgeArchitype): - node = op2.__jac__.target + elif isinstance(op2, EdgeArchitype) and ( + target := op2.__jac__.target.anchor + ): + node = target else: raise TypeError("Invalid target object") elif isinstance(op2, WalkerArchitype): @@ -784,8 +790,10 @@ def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: walker = op2.__jac__ if isinstance(op1, NodeArchitype): node = op1.__jac__ - elif isinstance(op1, EdgeArchitype): - node = op1.__jac__.target + elif isinstance(op1, EdgeArchitype) and ( + target := op1.__jac__.target.anchor + ): + node = target else: raise TypeError("Invalid target object") else: @@ -874,10 +882,11 @@ def destroy(obj: Architype | Anchor | BaseAnchor) -> None: match anchor: case NodeAnchor(): for edge in anchor.edges: - Jac.destroy(edge) + if eanch := edge.anchor: + Jac.destroy(eanch) case EdgeAnchor(): Jac.detach(anchor) case _: pass - Jac.get_context().mem.remove(anchor.id) + Jac.get_context().mem.remove(anchor.jid) diff --git a/jac/jaclang/runtimelib/architype.py b/jac/jaclang/runtimelib/architype.py index 222403f92..e04b8748a 100644 --- a/jac/jaclang/runtimelib/architype.py +++ b/jac/jaclang/runtimelib/architype.py @@ -20,14 +20,15 @@ IGNORECASE, ) _ANCHOR = TypeVar("_ANCHOR", bound="Anchor") +_C_ANCHOR = TypeVar("_C_ANCHOR", bound="Anchor", covariant=True) @dataclass(kw_only=True) -class JID(Generic[_ANCHOR]): +class JID(Generic[_C_ANCHOR]): """Jaclang ID Interface.""" id: Any - type: Type[_ANCHOR] + type: Type[_C_ANCHOR] name: str @cached_property @@ -36,7 +37,7 @@ def __cached_repr__(self) -> str: return f"{self.type.__name__[:1].lower()}:{self.name}:{self.id}" @cached_property - def anchor(self) -> _ANCHOR | None: + def anchor(self) -> _C_ANCHOR | None: """Get architype.""" from jaclang.plugin.feature import JacFeature @@ -182,7 +183,7 @@ def jid(self: _ANCHOR) -> JID[_ANCHOR]: return jid - def report(self) -> AnchorReport: + def report(self) -> Any: # noqa: ANN401 """Report Anchor.""" return AnchorReport( id=self.jid,