Skip to content

Commit

Permalink
Fix SA2.0 usage in managers.libraries (2); refactor
Browse files Browse the repository at this point in the history
Factor out SA statement building logic into 2 separate helpers (I think
the filtering criteria for admin and nonadmin users is sufficiently
different, so 2 methods make for cleaner code here).
  • Loading branch information
jdavcs committed Oct 16, 2023
1 parent 3ca32d9 commit 7ec1143
Showing 1 changed file with 57 additions and 40 deletions.
97 changes: 57 additions & 40 deletions lib/galaxy/managers/libraries.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,61 +145,50 @@ def list(self, trans, deleted: Optional[bool] = False) -> Tuple[Query, Dict[str,
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: query that will emit all accessible libraries
:rtype: sqlalchemy query
:returns: iterable that will emit all accessible libraries
:rtype: sqlalchemy ScalarResult
:returns: dict of 3 sets with available actions for user's accessible
libraries and a set of ids of all public libraries. These are
used for limiting the number of queries when dictifying the
libraries later on.
:rtype: dict
"""
is_admin = trans.user_is_admin
query = trans.sa_session.query(trans.app.model.Library)
library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
restricted_library_ids = {id for id in get_library_ids(trans.sa_session, library_access_action)}
prefetched_ids = {"restricted_library_ids": restricted_library_ids}

if is_admin:
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
query = query.filter(trans.app.model.Library.table.c.deleted == true())
else:
query = query.filter(trans.app.model.Library.table.c.deleted == false())
libraries = get_libraries_for_admins(trans.sa_session, deleted=deleted)
else:
# Nonadmins can't see deleted libraries
if deleted:
raise exceptions.AdminRequiredException()
else:
query = query.filter(trans.app.model.Library.table.c.deleted == false())
current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action
library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action
library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action
accessible_restricted_library_ids = set()
allowed_library_add_ids = set()
allowed_library_modify_ids = set()
allowed_library_manage_ids = set()
for action in get_library_permissions_by_role(trans.sa_session, current_user_role_ids):
if action.action == library_access_action:
accessible_restricted_library_ids.add(action.library_id)
if action.action == library_add_action:
allowed_library_add_ids.add(action.library_id)
if action.action == library_modify_action:
allowed_library_modify_ids.add(action.library_id)
if action.action == library_manage_action:
allowed_library_manage_ids.add(action.library_id)
query = query.filter(
or_(
not_(trans.model.Library.table.c.id.in_(restricted_library_ids)),
trans.model.Library.table.c.id.in_(accessible_restricted_library_ids),
)
)
prefetched_ids["allowed_library_add_ids"] = allowed_library_add_ids
prefetched_ids["allowed_library_modify_ids"] = allowed_library_modify_ids
prefetched_ids["allowed_library_manage_ids"] = allowed_library_manage_ids
query = query.order_by(asc(func.lower(Library.name)))
return query, prefetched_ids
current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action
library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action
library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action
accessible_restricted_library_ids = set()
allowed_library_add_ids = set()
allowed_library_modify_ids = set()
allowed_library_manage_ids = set()
for action in get_library_permissions_by_role(trans.sa_session, current_user_role_ids):
if action.action == library_access_action:
accessible_restricted_library_ids.add(action.library_id)
if action.action == library_add_action:
allowed_library_add_ids.add(action.library_id)
if action.action == library_modify_action:
allowed_library_modify_ids.add(action.library_id)
if action.action == library_manage_action:
allowed_library_manage_ids.add(action.library_id)
prefetched_ids["allowed_library_add_ids"] = allowed_library_add_ids
prefetched_ids["allowed_library_modify_ids"] = allowed_library_modify_ids
prefetched_ids["allowed_library_manage_ids"] = allowed_library_manage_ids
libraries = get_libraries_for_nonadmins(
trans.sa_session, restricted_library_ids, accessible_restricted_library_ids
)

return libraries, prefetched_ids

def secure(self, trans, library: Library, check_accessible: bool = True) -> Library:
"""
Expand Down Expand Up @@ -391,3 +380,31 @@ def get_library_ids(session, library_access_action):
def get_library_permissions_by_role(session, role_ids):
stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids))
return session.scalars(stmt)


def get_libraries_for_admins(session, deleted):
stmt = select(Library)
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
stmt = stmt.where(Library.deleted == true())
else:
stmt = stmt.where(Library.deleted == false())
stmt = stmt.order_by(asc(func.lower(Library.name)))
return session.scalars(stmt)


def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids):
stmt = (
select(Library)
.where(Library.deleted == false())
.where(
or_(
not_(Library.id.in_(restricted_library_ids)),
Library.id.in_(accessible_restricted_library_ids),
)
)
)
stmt = stmt.order_by(asc(func.lower(Library.name)))
return session.scalars(stmt)

0 comments on commit 7ec1143

Please sign in to comment.