Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
[PERMISSION]: Restructure from tuple to dict
Browse files Browse the repository at this point in the history
OLD:
tuple( read_set, connect_set, write_set )

NEW:
dict( ref_id: access_level )
  • Loading branch information
amadolid committed Jul 15, 2024
1 parent e52f01b commit 0e8e210
Showing 1 changed file with 23 additions and 34 deletions.
57 changes: 23 additions & 34 deletions jaclang/core/architype.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
Any,
Callable,
ClassVar,
Generic,
Iterable,
Optional,
TypeVar,
cast,
)
from uuid import UUID, uuid4

from jaclang.compiler.constant import EdgeDir, T
from jaclang.compiler.constant import EdgeDir
from jaclang.core.utils import collect_node_connections
from jaclang.vendor.orjson import dumps

Expand Down Expand Up @@ -53,68 +52,58 @@ class AnchorType(Enum):


@dataclass
class Access(Generic[T]):
class Access:
"""Access Structure."""

whitelist: bool = True # whitelist or blacklist
level: tuple[set[T], set[T], set[T]] = field(
default_factory=lambda: (set(), set(), set())
) # index 0 == read access, 1 == write access
anchors: dict[Anchor, int] = field(default_factory=dict)

def check(
self, id: T
self, anchor: Anchor
) -> tuple[bool, int]: # whitelist or blacklist, has_read_access, level
"""Validate access."""
if self.whitelist:
for i in range(2, -1, -1):
if id in self.level[i]:
return self.whitelist, i
return self.whitelist, -1
return self.whitelist, self.anchors.get(anchor, -1)
else:
access = -1
for i in range(0, 3, 1):
if id in self.level[i]:
break
access = i
return self.whitelist, access
return self.whitelist, self.anchors.get(anchor, 2)

def serialize(self) -> dict[str, object]:
"""Serialize Access."""
return {
"whitelist": self.whitelist,
"level": [list(level) for level in self.level],
"anchors": {key.ref_id: val for key, val in self.anchors.items()},
}

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Access:
"""Deserialize Access."""
level = cast(list[list[T]], data.get("level"))
anchors = cast(dict[str, int], data.get("anchors"))
return Access(
whitelist=bool(data.get("whitelist")),
level=(
(set(level[0]), set(level[1]), set(level[2]))
if level
else (set(), set(), set())
),
anchors={
anchor: val
for key, val in anchors.items()
if (anchor := Anchor.ref(key))
},
)


@dataclass
class Permission(Generic[T]):
class Permission:
"""Anchor Access Handler."""

all: int = -1
roots: Access[T] = field(default_factory=Access[T])
# types: dict[type[Architype], Access[T]] = field(default_factory=dict)
# nodes: Access[T] = field(default_factory=Access[T])
roots: Access = field(default_factory=Access)
# types: dict[type[Architype], Access] = field(default_factory=dict)
# nodes: Access = field(default_factory=Access)

def serialize(self) -> dict[str, object]:
"""Serialize Permission."""
return {
"all": self.all,
"roots": self.roots.serialize(),
# "types": {
# f"{'n' if isinstance(key, NodeArchitype) else 'e'}:{key.__name__}": value.serialize()
# f"{'n' if issubclass(key, NodeArchitype) else 'e'}:{key.__name__}": value.serialize()
# for key, value in self.types.items()
# },
# "nodes": self.nodes.serialize(),
Expand Down Expand Up @@ -147,7 +136,7 @@ class Anchor:
name: str = ""
id: UUID = field(default_factory=uuid4)
root: Optional[UUID] = None
access: Permission[UUID] = field(default_factory=Permission[UUID])
access: Permission = field(default_factory=Permission)
architype: Optional[Architype] = None
connected: bool = False
current_access_level: int = -1
Expand Down Expand Up @@ -248,7 +237,7 @@ def access_level(self, to: Anchor) -> int:
if (to_access := to.access).all > -1:
to.current_access_level = to_access.all

# whitelist, level = to_access.nodes.check(self.id)
# whitelist, level = to_access.nodes.check(self)
# if not whitelist and level < 0:
# to.current_access_level = -1
# return to.current_access_level
Expand All @@ -258,22 +247,22 @@ def access_level(self, to: Anchor) -> int:
# if (architype := self.architype) and (
# access_type := to_access.types.get(architype.__class__)
# ):
# whitelist, level = access_type.check(self.id)
# whitelist, level = access_type.check(self)
# if not whitelist and level < 0:
# to.current_access_level = -1
# return to.current_access_level
# elif whitelist and level > -1 and to.current_access_level == -1:
# to.current_access_level = level

whitelist, level = to_access.roots.check(jroot.id)
whitelist, level = to_access.roots.check(jroot)
if not whitelist and level < 0:
to.current_access_level = -1
return to.current_access_level
elif whitelist and level > -1 and to.current_access_level == -1:
to.current_access_level = level

if to.root and (to_root := jctx.datasource.find_one(to.root)):
whitelist, level = to_root.access.roots.check(jroot.id)
whitelist, level = to_root.access.roots.check(jroot)
if not whitelist and level < 0:
to.current_access_level = -1
return to.current_access_level
Expand Down

0 comments on commit 0e8e210

Please sign in to comment.