From 79604d2b8234cb9d93640618cbc66afd45d3fb5e Mon Sep 17 00:00:00 2001 From: Shadi Naif Date: Tue, 19 Nov 2024 09:49:19 +0300 Subject: [PATCH] feat: custom roles and view-user mapping --- futurex_openedx_extensions/helpers/admin.py | 172 +++++++++++++++++- futurex_openedx_extensions/helpers/apps.py | 2 + .../helpers/constants.py | 21 ++- .../helpers/custom_roles.py | 75 ++++++++ .../helpers/exceptions.py | 2 + .../migrations/0005_view_user_mapping.py | 53 ++++++ futurex_openedx_extensions/helpers/models.py | 123 ++++++++++++- .../helpers/permissions.py | 4 +- futurex_openedx_extensions/helpers/roles.py | 25 ++- .../common/djangoapps/student/admin.py | 2 + .../common/djangoapps/student/roles.py | 2 + .../edx_platform_mocks/fake_models/classes.py | 18 ++ .../edx_platform_mocks/fake_models/models.py | 11 ++ tests/test_helpers/test_admin.py | 114 +++++++++++- tests/test_helpers/test_custom_roles.py | 58 ++++++ tests/test_helpers/test_models.py | 92 +++++++++- tests/test_helpers/test_permissions.py | 10 +- tests/test_helpers/test_roles.py | 32 ++++ 18 files changed, 801 insertions(+), 15 deletions(-) create mode 100644 futurex_openedx_extensions/helpers/custom_roles.py create mode 100644 futurex_openedx_extensions/helpers/migrations/0005_view_user_mapping.py create mode 100644 test_utils/edx_platform_mocks/common/djangoapps/student/admin.py create mode 100644 test_utils/edx_platform_mocks/common/djangoapps/student/roles.py create mode 100644 test_utils/edx_platform_mocks/fake_models/classes.py create mode 100644 tests/test_helpers/test_custom_roles.py diff --git a/futurex_openedx_extensions/helpers/admin.py b/futurex_openedx_extensions/helpers/admin.py index e08391a9..c558db45 100644 --- a/futurex_openedx_extensions/helpers/admin.py +++ b/futurex_openedx_extensions/helpers/admin.py @@ -1,19 +1,63 @@ """Django admin view for the models.""" from __future__ import annotations -from typing import Any +from typing import Any, List, Tuple import yaml # type: ignore +from common.djangoapps.student.admin import CourseAccessRoleForm +from django import forms from django.contrib import admin +from django.contrib.admin import SimpleListFilter from django.core.cache import cache from django.http import Http404, HttpResponseRedirect from django.urls import path from django.utils import timezone +from django_mysql.models import QuerySet +from openedx.core.lib.api.authentication import BearerAuthentication from rest_framework.response import Response from simple_history.admin import SimpleHistoryAdmin from futurex_openedx_extensions.helpers.constants import CACHE_NAMES -from futurex_openedx_extensions.helpers.models import ClickhouseQuery, DataExportTask, ViewAllowedRoles +from futurex_openedx_extensions.helpers.models import ClickhouseQuery, DataExportTask, ViewAllowedRoles, ViewUserMapping +from futurex_openedx_extensions.helpers.roles import get_fx_view_with_roles + + +class YesNoFilter(SimpleListFilter): + """Filter for the Yes/No fields.""" + title = 'Yes / No' + parameter_name = 'no_yet_set_must_be_replaced' + + def lookups(self, request: Any, model_admin: Any) -> List[Tuple[str, str]]: + """ + Define filter options. + + :param request: The request object + :type request: Request + :param model_admin: The model admin object + :type model_admin: Any + :return: List of filter options + """ + return [ + ('yes', 'Yes'), + ('no', 'No'), + ] + + def queryset(self, request: Any, queryset: QuerySet) -> QuerySet: + """ + Filter the queryset based on the selected option. + + :param request: The request object + :type request: Request + :param queryset: The queryset to filter + :type queryset: QuerySet + :return: The filtered queryset + """ + filter_params = {self.parameter_name: self.value() == 'yes'} + + if self.value() in ('yes', 'no'): + return queryset.filter(**filter_params) + + return queryset class ClickhouseQueryAdmin(SimpleHistoryAdmin): @@ -57,9 +101,132 @@ def load_missing_queries(self, request: Any) -> HttpResponseRedirect: # pylint: list_display = ('id', 'scope', 'version', 'slug', 'description', 'enabled', 'modified_at') +class ViewAllowedRolesModelForm(forms.ModelForm): + """Model form for the ViewAllowedRoles model.""" + class Meta: + """Meta class for the ViewAllowedRoles model form.""" + model = ViewAllowedRoles + fields = '__all__' + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Initialize the form with the dynamic choices.""" + super().__init__(*args, **kwargs) + + self.fields['view_name'] = forms.TypedChoiceField() + self.fields['view_name'].choices = sorted([ + (view_name, view_name) for view_name in get_fx_view_with_roles()['_all_view_names'] + ]) + + self.fields['allowed_role'] = forms.TypedChoiceField() + self.fields['allowed_role'].choices = CourseAccessRoleForm.COURSE_ACCESS_ROLES + + class ViewAllowedRolesHistoryAdmin(SimpleHistoryAdmin): """Admin view for the ViewAllowedRoles model.""" + form = ViewAllowedRolesModelForm + list_display = ('view_name', 'view_description', 'allowed_role') + list_filter = ('view_name', 'allowed_role') + + +class IsUserActiveFilter(YesNoFilter): + """Filter for the is_user_active field.""" + title = 'Is User Active' + parameter_name = 'is_user_active' + + +class IsUserSystemStaffFilter(YesNoFilter): + """Filter for the is_user_system_staff field.""" + title = 'Is User System Staff' + parameter_name = 'is_user_system_staff' + + +class UsableFilter(YesNoFilter): + """Filter for the usable field.""" + title = 'Usable' + parameter_name = 'usable' + + +class HasAccessRomeFilter(YesNoFilter): + """Filter for the usable field.""" + title = 'Has Access Role' + parameter_name = 'has_access_role' + + +class ViewUserMappingModelForm(forms.ModelForm): + """Model form for the ViewUserMapping model.""" + class Meta: + """Meta class for the ViewUserMapping model form.""" + model = ViewUserMapping + fields = '__all__' + + @staticmethod + def get_all_supported_view_names() -> List[Any]: + """Get all the supported view names.""" + result = [] + + for view_name, view_class in get_fx_view_with_roles()['_all_view_names'].items(): + if hasattr( + view_class, 'authentication_classes', + ) and BearerAuthentication in view_class.authentication_classes: + result.append(view_name) + + return sorted(result) + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Initialize the form with the dynamic choices.""" + super().__init__(*args, **kwargs) + + self.fields['view_name'] = forms.TypedChoiceField() + self.fields['view_name'].choices = [(view_name, view_name) for view_name in self.get_all_supported_view_names()] + + +class ViewUserMappingHistoryAdmin(SimpleHistoryAdmin): + """Admin view for the ViewUserMapping model.""" + form = ViewUserMappingModelForm + + list_display = ( + 'user', 'view_name', 'enabled', 'expires_at', 'is_user_active', + 'is_user_system_staff', 'has_access_role', 'usable', + ) + list_filter = ( + 'view_name', 'enabled', 'expires_at', + IsUserActiveFilter, IsUserSystemStaffFilter, HasAccessRomeFilter, UsableFilter, + ) + search_fields = ('user__username', 'user__email') + raw_id_fields = ('user',) + + def is_user_active(self, obj: ViewUserMapping) -> bool: # pylint: disable=no-self-use + """Check if the user is active or not.""" + return obj.get_is_user_active() + + def is_user_system_staff(self, obj: ViewUserMapping) -> bool: # pylint: disable=no-self-use + """Check if the user is system staff or not.""" + return obj.get_is_user_system_staff() + + def has_access_role(self, obj: ViewUserMapping) -> bool: # pylint: disable=no-self-use + """Check if the user has access role.""" + return obj.get_has_access_role() + + def usable(self, obj: ViewUserMapping) -> bool: # pylint: disable=no-self-use + """Check if the mapping link is usable.""" + return obj.get_usable() + + is_user_active.short_description = 'Is User Active' # type: ignore + is_user_active.boolean = True # type: ignore + is_user_active.admin_order_field = 'is_user_active' # type: ignore + + is_user_system_staff.short_description = 'Is User System Staff' # type: ignore + is_user_system_staff.boolean = True # type: ignore + is_user_system_staff.admin_order_field = 'is_user_system_staff' # type: ignore + + has_access_role.short_description = 'Has Access Role' # type: ignore + has_access_role.boolean = True # type: ignore + has_access_role.admin_order_field = 'has_access_role' # type: ignore + + usable.short_description = 'Usable' # type: ignore + usable.boolean = True # type: ignore + usable.admin_order_field = 'usable' # type: ignore class CacheInvalidator(ViewAllowedRoles): @@ -150,6 +317,7 @@ def register_admins() -> None: admin.site.register(CacheInvalidator, CacheInvalidatorAdmin) admin.site.register(ClickhouseQuery, ClickhouseQueryAdmin) admin.site.register(ViewAllowedRoles, ViewAllowedRolesHistoryAdmin) + admin.site.register(ViewUserMapping, ViewUserMappingHistoryAdmin) admin.site.register(DataExportTask, DataExportTaskAdmin) diff --git a/futurex_openedx_extensions/helpers/apps.py b/futurex_openedx_extensions/helpers/apps.py index d87b389a..f19e6fc5 100644 --- a/futurex_openedx_extensions/helpers/apps.py +++ b/futurex_openedx_extensions/helpers/apps.py @@ -29,6 +29,8 @@ class HelpersConfig(AppConfig): def ready(self) -> None: """Connect handlers to send notifications about discussions.""" + from futurex_openedx_extensions.helpers import \ + custom_roles # pylint: disable=unused-import, import-outside-toplevel from futurex_openedx_extensions.helpers import \ monkey_patches # pylint: disable=unused-import, import-outside-toplevel from futurex_openedx_extensions.helpers import signals # pylint: disable=unused-import, import-outside-toplevel diff --git a/futurex_openedx_extensions/helpers/constants.py b/futurex_openedx_extensions/helpers/constants.py index 7ab63d0c..f211b952 100644 --- a/futurex_openedx_extensions/helpers/constants.py +++ b/futurex_openedx_extensions/helpers/constants.py @@ -50,6 +50,13 @@ COURSE_CREATOR_ROLE_TENANT = 'org_course_creator_group' COURSE_CREATOR_ROLE_GLOBAL = 'course_creator_group' COURSE_SUPPORT_ROLE_GLOBAL = 'support' +COURSE_FX_API_ACCESS_ROLE = 'fx_api_access' +COURSE_FX_API_ACCESS_ROLE_GLOBAL = 'fx_api_access_global' + +COURSE_ACCESS_ROLES_USER_VIEW_MAPPING = [ + COURSE_FX_API_ACCESS_ROLE, + COURSE_FX_API_ACCESS_ROLE_GLOBAL +] COURSE_ACCESS_ROLES_COURSE_ONLY = [ 'beta_testers', @@ -68,18 +75,24 @@ 'instructor', COURSE_ACCESS_ROLES_LIBRARY_USER, COURSE_ACCESS_ROLES_STAFF_EDITOR, + COURSE_FX_API_ACCESS_ROLE, ] COURSE_ACCESS_ROLES_GLOBAL = [ COURSE_CREATOR_ROLE_GLOBAL, COURSE_SUPPORT_ROLE_GLOBAL, + COURSE_FX_API_ACCESS_ROLE_GLOBAL, ] -COURSE_ACCESS_ROLES_SUPPORTED_EDIT = \ - COURSE_ACCESS_ROLES_COURSE_ONLY + \ - COURSE_ACCESS_ROLES_TENANT_ONLY + \ +COURSE_ACCESS_ROLES_SUPPORTED_EDIT = list(set( + COURSE_ACCESS_ROLES_COURSE_ONLY + + COURSE_ACCESS_ROLES_TENANT_ONLY + COURSE_ACCESS_ROLES_TENANT_OR_COURSE +) - {COURSE_FX_API_ACCESS_ROLE}) -COURSE_ACCESS_ROLES_SUPPORTED_READ = COURSE_ACCESS_ROLES_SUPPORTED_EDIT + COURSE_ACCESS_ROLES_GLOBAL +COURSE_ACCESS_ROLES_SUPPORTED_READ = list( + set(COURSE_ACCESS_ROLES_SUPPORTED_EDIT + COURSE_ACCESS_ROLES_GLOBAL) | + {COURSE_FX_API_ACCESS_ROLE} +) COURSE_ACCESS_ROLES_ACCEPT_COURSE_ID = COURSE_ACCESS_ROLES_COURSE_ONLY + COURSE_ACCESS_ROLES_TENANT_OR_COURSE diff --git a/futurex_openedx_extensions/helpers/custom_roles.py b/futurex_openedx_extensions/helpers/custom_roles.py new file mode 100644 index 00000000..32bb037a --- /dev/null +++ b/futurex_openedx_extensions/helpers/custom_roles.py @@ -0,0 +1,75 @@ +"""New roles for FutureX Open edX Extensions.""" +from __future__ import annotations + +import logging +from typing import Any + +from common.djangoapps.student.admin import CourseAccessRoleForm +from common.djangoapps.student.roles import REGISTERED_ACCESS_ROLES, CourseRole, OrgRole, RoleBase + +from futurex_openedx_extensions.helpers.exceptions import FXCodedException, FXExceptionCodes + +log = logging.getLogger(__name__) + + +def register_custom_access_role(cls: Any) -> Any: + """ + Decorator that adds the new access role to the list of registered access roles to be accessible in the Django admin. + + Note: roles inheritances is not supported + + :param cls: The class to register + :type cls: Any + """ + def _hacky_update_django_admin_choices(choices: list[tuple[str, str]]) -> None: + """ + Update the choices of the role field in django admin. + + :param choices: The choices to update + :type choices: list[tuple[str, str]] + """ + CourseAccessRoleForm.COURSE_ACCESS_ROLES = choices + CourseAccessRoleForm.declared_fields['role'].choices = choices + + try: + role_name = cls.ROLE + if role_name in REGISTERED_ACCESS_ROLES: + raise FXCodedException( + code=FXExceptionCodes.CUSTOM_ROLE_DUPLICATE_DECLARATION, + message=f'Trying to register a custom role {role_name} that is already registered!' + ) + except AttributeError: + log.exception('Role class %s does not have a ROLE attribute', cls.__name__) + except FXCodedException as exc: + log.exception(str(exc)) + else: + REGISTERED_ACCESS_ROLES[role_name] = cls + _hacky_update_django_admin_choices([(role.ROLE, role.ROLE) for role in REGISTERED_ACCESS_ROLES.values()]) + + return cls + + +@register_custom_access_role +class FXAPIAccessRoleCourse(CourseRole): # pylint: disable=too-few-public-methods + """Course specific access to the FutureX APIs.""" + ROLE = 'fx_api_access' + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(self.ROLE, *args, **kwargs) + + +class FXAPIAccessRoleOrg(OrgRole): # pylint: disable=too-few-public-methods + """Tenant-wide access to the FutureX APIs.""" + ROLE = 'fx_api_access' + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(self.ROLE, *args, **kwargs) + + +@register_custom_access_role +class FXAPIAccessRoleGlobal(RoleBase): # pylint: disable=too-few-public-methods + """Global access to the FutureX APIs.""" + ROLE = 'fx_api_access_global' + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(self.ROLE, *args, **kwargs) diff --git a/futurex_openedx_extensions/helpers/exceptions.py b/futurex_openedx_extensions/helpers/exceptions.py index e59d6461..951ae354 100644 --- a/futurex_openedx_extensions/helpers/exceptions.py +++ b/futurex_openedx_extensions/helpers/exceptions.py @@ -40,6 +40,8 @@ class FXExceptionCodes(Enum): QUERY_SET_BAD_OPERATION = 8001 + CUSTOM_ROLE_DUPLICATE_DECLARATION = 9001 + class FXCodedException(Exception): """Exception with a code.""" diff --git a/futurex_openedx_extensions/helpers/migrations/0005_view_user_mapping.py b/futurex_openedx_extensions/helpers/migrations/0005_view_user_mapping.py new file mode 100644 index 00000000..fb22f7e9 --- /dev/null +++ b/futurex_openedx_extensions/helpers/migrations/0005_view_user_mapping.py @@ -0,0 +1,53 @@ +# Generated by Django 3.2.25 on 2024-11-19 12:09 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import simple_history.models + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('fx_helpers', '0004_dataexporttask'), + ] + + operations = [ + migrations.CreateModel( + name='HistoricalViewUserMapping', + fields=[ + ('id', models.IntegerField(auto_created=True, blank=True, db_index=True, verbose_name='ID')), + ('view_name', models.CharField(max_length=255)), + ('enabled', models.BooleanField(default=True)), + ('expires_at', models.DateTimeField(blank=True, null=True)), + ('history_id', models.AutoField(primary_key=True, serialize=False)), + ('history_date', models.DateTimeField()), + ('history_change_reason', models.CharField(max_length=100, null=True)), + ('history_type', models.CharField(choices=[('+', 'Created'), ('~', 'Changed'), ('-', 'Deleted')], max_length=1)), + ('history_user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)), + ('user', models.ForeignKey(blank=True, db_constraint=False, null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='+', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'historical View-User Mapping', + 'ordering': ('-history_date', '-history_id'), + 'get_latest_by': 'history_date', + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + migrations.CreateModel( + name='ViewUserMapping', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('view_name', models.CharField(max_length=255)), + ('enabled', models.BooleanField(default=True)), + ('expires_at', models.DateTimeField(blank=True, null=True)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'View-User Mapping', + 'verbose_name_plural': 'Views-Users Mapping', + 'unique_together': {('user', 'view_name')}, + }, + ), + ] diff --git a/futurex_openedx_extensions/helpers/models.py b/futurex_openedx_extensions/helpers/models.py index f17856cd..67b8eabe 100644 --- a/futurex_openedx_extensions/helpers/models.py +++ b/futurex_openedx_extensions/helpers/models.py @@ -4,15 +4,19 @@ import re from typing import Any, Dict, List, Tuple +from common.djangoapps.student.models import CourseAccessRole from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError from django.db import models +from django.db.models import BooleanField, Case, Exists, F, OuterRef, Q, Value, When from django.utils import timezone from eox_tenant.models import TenantConfig +from opaque_keys.edx.django.models import CourseKeyField from simple_history.models import HistoricalRecords from futurex_openedx_extensions.helpers import clickhouse_operations as ch -from futurex_openedx_extensions.helpers.converters import DateMethods +from futurex_openedx_extensions.helpers import constants as cs +from futurex_openedx_extensions.helpers.converters import DateMethods, get_allowed_roles from futurex_openedx_extensions.helpers.exceptions import FXCodedException, FXExceptionCodes User = get_user_model() @@ -34,6 +38,123 @@ class Meta: unique_together = ('view_name', 'allowed_role') +class ViewUserMappingManager(models.Manager): # pylint: disable=too-few-public-methods + """Manager for the ViewUserMapping model""" + def get_queryset(self) -> models.QuerySet: + """Get the queryset for the model""" + queryset = super().get_queryset() + allowed_roles = get_allowed_roles(cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING) + + queryset = queryset.annotate( + is_user_active=F('user__is_active'), + ).annotate( + is_user_system_staff=F('user__is_superuser').bitor(F('user__is_staff')), + ).annotate( + has_access_role=Exists( + CourseAccessRole.objects.filter( + user_id=OuterRef('user_id'), + ).filter( + ( + Q(role__in=allowed_roles['global']) & + Q(org='') & + Q(course_id=CourseKeyField.Empty) + ) | + ( + Q(role__in=allowed_roles['tenant_only']) & + ~Q(org='') & + Q(course_id=CourseKeyField.Empty) + ) | + ( + Q(role__in=allowed_roles['course_only']) & + ~Q(org='') & + ~Q(course_id=CourseKeyField.Empty) + ) | + ( + Q(role__in=allowed_roles['tenant_or_course']) & + ~Q(org='') + ) + ), + ), + ).annotate( + usable=Case( + When( + Q(is_user_active=True) & ( + Q(is_user_system_staff=True) | ( + (Q(has_access_role=True)) & + Q(enabled=True) & + (Q(expires_at__isnull=True) | Q(expires_at__gte=timezone.now())) + ) + ), + then=Value(True), + ), + default=Value(False), + output_field=BooleanField(), + ), + ) + + return queryset + + +class ViewUserMapping(models.Model): + """Allowed roles for every supported view""" + user = models.ForeignKey(User, on_delete=models.CASCADE) + view_name = models.CharField(max_length=255) + enabled = models.BooleanField(default=True) + expires_at = models.DateTimeField(null=True, blank=True) + + history = HistoricalRecords() + + objects = ViewUserMappingManager() + + def get_annotated_attribute(self, attribute: str) -> Any: + """ + Get the annotated attribute. + + :param attribute: The attribute to get. + :type attribute: str + :return: The annotated attribute. + :rtype: Any + """ + return getattr(self, attribute, getattr(ViewUserMapping.objects.filter(pk=self.pk).first(), attribute)) + + def get_is_user_active(self) -> bool: + """Return the value of the annotated field is_user_active.""" + return self.get_annotated_attribute('is_user_active') + + def get_is_user_system_staff(self) -> bool: + """Return the value of the annotated field is_user_system_staff.""" + return self.get_annotated_attribute('is_user_system_staff') + + def get_has_access_role(self) -> bool: + """Return the value of the annotated field has_access_role.""" + return self.get_annotated_attribute('has_access_role') + + def get_usable(self) -> bool: + """Return the value of the annotated field usable.""" + return self.get_annotated_attribute('usable') + + @classmethod + def is_usable_access(cls, user: get_user_model, view_name: str) -> bool: + """ + Check if the user has usable access to the view. + + :param user: The user to check. + :type user: User + :param view_name: The name of the view. + :type view_name: str + :return: True if the user has usable access to the view. + :rtype: bool + """ + record = cls.objects.filter(user=user, view_name=view_name).first() + return record is not None and record.get_usable() + + class Meta: + """Metaclass for the model""" + verbose_name = 'View-User Mapping' + verbose_name_plural = 'Views-Users Mapping' + unique_together = ('user', 'view_name') + + class ClickhouseQuery(models.Model): """Model for storing Clickhouse queries""" SCOPE_COURSE = 'course' diff --git a/futurex_openedx_extensions/helpers/permissions.py b/futurex_openedx_extensions/helpers/permissions.py index 1176c6bd..78565728 100644 --- a/futurex_openedx_extensions/helpers/permissions.py +++ b/futurex_openedx_extensions/helpers/permissions.py @@ -98,7 +98,9 @@ def has_permission(self, request: Any, view: Any) -> bool: if not super().has_permission(request, view) or not request.user.is_active: raise NotAuthenticated() - view_allowed_roles: List[str] = view.get_allowed_roles_all_views()[view.fx_view_name] + view_allowed_roles: List[str] = view.get_view_user_roles_mapping( + view_name=view.fx_view_name, user=request.user, + ) tenant_ids_string: str | None = request.GET.get('tenant_ids') if tenant_ids_string: diff --git a/futurex_openedx_extensions/helpers/roles.py b/futurex_openedx_extensions/helpers/roles.py index 08d7c525..6159c977 100644 --- a/futurex_openedx_extensions/helpers/roles.py +++ b/futurex_openedx_extensions/helpers/roles.py @@ -33,7 +33,7 @@ get_orgs_of_courses, verify_course_ids, ) -from futurex_openedx_extensions.helpers.models import ViewAllowedRoles +from futurex_openedx_extensions.helpers.models import ViewAllowedRoles, ViewUserMapping from futurex_openedx_extensions.helpers.querysets import check_staff_exist_queryset from futurex_openedx_extensions.helpers.tenants import ( get_all_tenant_ids, @@ -524,6 +524,29 @@ def get_allowed_roles_all_views() -> Dict[str, List[str]]: return result + def get_view_user_roles_mapping(self, view_name: str, user: get_user_model) -> List[str]: + """ + Get the allowed roles on the view for the user. + + :param view_name: The view name + :type view_name: str + :param user: The user + :type user: get_user_model + :return: The allowed roles on the view for the user + :rtype: list + """ + view_allowed_roles = list( + set(self.get_allowed_roles_all_views().get(view_name, [])) - + set(cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING) + ) + + if is_system_staff_user(user) or ViewUserMapping.is_usable_access(user, view_name): + view_allowed_roles.extend(cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING) + else: + view_allowed_roles = list(set(view_allowed_roles) - set(cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING)) + + return view_allowed_roles + def get_usernames_with_access_roles(orgs: list[str], active_filter: None | bool = None) -> list[str]: """ diff --git a/test_utils/edx_platform_mocks/common/djangoapps/student/admin.py b/test_utils/edx_platform_mocks/common/djangoapps/student/admin.py new file mode 100644 index 00000000..21d00646 --- /dev/null +++ b/test_utils/edx_platform_mocks/common/djangoapps/student/admin.py @@ -0,0 +1,2 @@ +"""edx-platform Mocks""" +from fake_models.models import CourseAccessRoleForm # pylint: disable=unused-import diff --git a/test_utils/edx_platform_mocks/common/djangoapps/student/roles.py b/test_utils/edx_platform_mocks/common/djangoapps/student/roles.py new file mode 100644 index 00000000..b4fe2bed --- /dev/null +++ b/test_utils/edx_platform_mocks/common/djangoapps/student/roles.py @@ -0,0 +1,2 @@ +"""edx-platform Mocks""" +from fake_models.classes import REGISTERED_ACCESS_ROLES, CourseRole, OrgRole, RoleBase # pylint: disable=unused-import diff --git a/test_utils/edx_platform_mocks/fake_models/classes.py b/test_utils/edx_platform_mocks/fake_models/classes.py new file mode 100644 index 00000000..97b5f4c5 --- /dev/null +++ b/test_utils/edx_platform_mocks/fake_models/classes.py @@ -0,0 +1,18 @@ +"""edx-platform classes mocks for testing purposes.""" + + +class RoleBase: # pylint: disable=too-few-public-methods + """Mock""" + def __init__(self, role, *args, **kwargs): + """Mock""" + + +class CourseRole(RoleBase): # pylint: disable=too-few-public-methods + """Mock""" + + +class OrgRole(RoleBase): # pylint: disable=too-few-public-methods + """Mock""" + + +REGISTERED_ACCESS_ROLES = {} diff --git a/test_utils/edx_platform_mocks/fake_models/models.py b/test_utils/edx_platform_mocks/fake_models/models.py index d5be8c82..3c605dca 100644 --- a/test_utils/edx_platform_mocks/fake_models/models.py +++ b/test_utils/edx_platform_mocks/fake_models/models.py @@ -1,6 +1,7 @@ """edx-platform models mocks for testing purposes.""" import re +from django import forms from django.contrib.auth import get_user_model from django.db import models from django.db.models.fields import AutoField @@ -306,3 +307,13 @@ class ExtraInfo(models.Model): class Meta: app_label = 'fake_models' db_table = 'custom_reg_form_extra_info' + + +class CourseAccessRoleForm(forms.ModelForm): + """Mock""" + class Meta: + model = CourseAccessRole + fields = '__all__' + + COURSE_ACCESS_ROLES = [] + role = forms.ChoiceField(choices=COURSE_ACCESS_ROLES) diff --git a/tests/test_helpers/test_admin.py b/tests/test_helpers/test_admin.py index 414e907a..fd4cfa09 100644 --- a/tests/test_helpers/test_admin.py +++ b/tests/test_helpers/test_admin.py @@ -1,11 +1,14 @@ """Tests for the admin helpers.""" -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest +from django import forms from django.contrib.admin.sites import AdminSite from django.core.cache import cache from django.http import Http404, HttpResponseRedirect from django.utils.timezone import now +from django_mysql.models import QuerySet +from openedx.core.lib.api.authentication import BearerAuthentication from rest_framework.test import APIRequestFactory from futurex_openedx_extensions.helpers.admin import ( @@ -13,9 +16,13 @@ CacheInvalidatorAdmin, ClickhouseQueryAdmin, ViewAllowedRolesHistoryAdmin, + ViewAllowedRolesModelForm, + ViewUserMappingHistoryAdmin, + ViewUserMappingModelForm, + YesNoFilter, ) from futurex_openedx_extensions.helpers.constants import CACHE_NAMES -from futurex_openedx_extensions.helpers.models import ClickhouseQuery, ViewAllowedRoles +from futurex_openedx_extensions.helpers.models import ClickhouseQuery, ViewAllowedRoles, ViewUserMapping from tests.fixture_helpers import set_user @@ -54,6 +61,24 @@ def mock_clickhousequery_methods(): yield +@pytest.fixture +def mock_get_fx_view_with_roles(): + """Fixture to mock the get_fx_view_with_roles method.""" + with patch( + 'futurex_openedx_extensions.helpers.admin.get_fx_view_with_roles', + return_value={ + '_all_view_names': { + 'view_name9': Mock(authentication_classes=[BearerAuthentication]), + 'view_name2': Mock(authentication_classes=[BearerAuthentication]), + 'view_no_auth_classes': Mock(), + 'view_no_bearer_class': Mock(authentication_classes=[]), + }, + }, + ) as mocked_result: + del mocked_result.return_value['_all_view_names']['view_no_auth_classes'].authentication_classes + yield + + def test_view_allowed_roles_admin_main_settings(view_allowed_roles_admin): # pylint: disable=redefined-outer-name """Verify the main settings of the ViewAllowedRolesHistoryAdmin.""" assert view_allowed_roles_admin.list_display == ('view_name', 'view_description', 'allowed_role') @@ -179,3 +204,88 @@ def test_clickhouse_query_admin_load_missing_queries( ClickhouseQuery.load_missing_queries.assert_called_once() assert isinstance(response, HttpResponseRedirect) assert response.url == '/admin/fx_helpers/clickhousequery' + + +@patch('futurex_openedx_extensions.helpers.admin.CourseAccessRoleForm') +def test_view_allowed_roles_model_form( + mock_course_access_form, mock_get_fx_view_with_roles, +): # pylint: disable=unused-argument, redefined-outer-name + """Verify the ViewAllowedRolesModelForm model form.""" + mock_course_access_form.COURSE_ACCESS_ROLES = [('role99', 'role99'), ('role44', 'role44')] + + form = ViewAllowedRolesModelForm() + + assert isinstance(form.fields['view_name'], forms.TypedChoiceField) + assert form.fields['view_name'].choices == [ + ('view_name2', 'view_name2'), + ('view_name9', 'view_name9'), + ('view_no_auth_classes', 'view_no_auth_classes'), + ('view_no_bearer_class', 'view_no_bearer_class'), + ] + assert isinstance(form.fields['allowed_role'], forms.TypedChoiceField) + assert form.fields['allowed_role'].choices == mock_course_access_form.COURSE_ACCESS_ROLES + + +@pytest.mark.django_db +def test_yes_no_filter_lookups(): + """Verify the lookups method of the YesNoFilter.""" + filter_instance = YesNoFilter(request=None, params={}, model=None, model_admin=None) + expected_lookups = [('yes', 'Yes'), ('no', 'No')] + assert filter_instance.lookups(None, None) == expected_lookups + + +@pytest.mark.django_db +@pytest.mark.parametrize('no_yet_set_must_be_replaced, filter_called, expected_flag', [ + ('yes', True, True), + ('no', True, False), + ('Yes', False, None), + ('NO', False, None), + ('something', False, None), + (None, False, None), +]) +def test_yes_no_filter_queryset(no_yet_set_must_be_replaced, filter_called, expected_flag): + """Verify the queryset method of the YesNoFilter.""" + filter_instance = YesNoFilter( + request=None, + params={'no_yet_set_must_be_replaced': no_yet_set_must_be_replaced}, + model=None, + model_admin=None, + ) + mock_queryset = Mock(spec=QuerySet) + mock_queryset.filter.return_value = 'filtered_queryset' + result = filter_instance.queryset(None, mock_queryset) + if filter_called: + mock_queryset.filter.assert_called_once_with(no_yet_set_must_be_replaced=expected_flag) + assert result == 'filtered_queryset' + else: + mock_queryset.filter.assert_not_called() + assert result == mock_queryset + + +def test_view_user_mapping_model_form_initialization( + mock_get_fx_view_with_roles, +): # pylint: disable=unused-argument, redefined-outer-name + """Verify the initialization of the ViewUserMappingModelForm.""" + form = ViewUserMappingModelForm() + + assert isinstance(form.fields['view_name'], forms.TypedChoiceField) + assert form.fields['view_name'].choices == [('view_name2', 'view_name2'), ('view_name9', 'view_name9')] + + +@pytest.mark.django_db +@pytest.mark.parametrize('attribute_name', [ + 'is_user_active', 'is_user_system_staff', 'has_access_role', 'usable', +]) +def test_view_user_mapping_model_form_extra_attributes(attribute_name): + """Verify the extra attributes of the ViewUserMappingModelForm.""" + obj = Mock(**{ + 'spec': ViewUserMapping, + f'get_{attribute_name}': Mock(return_value='testing-attribute'), + }) + + admin = ViewUserMappingHistoryAdmin(model=ViewUserMapping, admin_site=Mock()) + attribute_to_test = getattr(admin, attribute_name) + assert attribute_to_test(obj) == 'testing-attribute' + assert attribute_to_test.short_description == attribute_name.replace('_', ' ').title() + assert attribute_to_test.boolean is True + assert attribute_to_test.admin_order_field == attribute_name diff --git a/tests/test_helpers/test_custom_roles.py b/tests/test_helpers/test_custom_roles.py new file mode 100644 index 00000000..89c59f12 --- /dev/null +++ b/tests/test_helpers/test_custom_roles.py @@ -0,0 +1,58 @@ +"""Test for custom roles module.""" +from common.djangoapps.student.admin import CourseAccessRoleForm +from common.djangoapps.student.roles import REGISTERED_ACCESS_ROLES, CourseRole, OrgRole, RoleBase + +from futurex_openedx_extensions.helpers.custom_roles import ( + FXAPIAccessRoleCourse, + FXAPIAccessRoleGlobal, + FXAPIAccessRoleOrg, + register_custom_access_role, +) + + +def test_new_roles(): + """Test that the role names are unique.""" + assert FXAPIAccessRoleOrg().ROLE == 'fx_api_access' + assert FXAPIAccessRoleCourse().ROLE == 'fx_api_access' + assert FXAPIAccessRoleGlobal().ROLE == 'fx_api_access_global' + + assert issubclass(FXAPIAccessRoleOrg, OrgRole) + assert issubclass(FXAPIAccessRoleCourse, CourseRole) + assert issubclass(FXAPIAccessRoleGlobal, RoleBase) + + +def test_register_custom_access_role_no_role(caplog): + """Test that the decorator raises an exception if the role attribute is not present.""" + registered = REGISTERED_ACCESS_ROLES.copy() + + result = register_custom_access_role(object) + assert result == object + assert 'Role class object does not have a ROLE attribute' in caplog.text + assert registered == REGISTERED_ACCESS_ROLES + + +def test_register_custom_access_role_already_registered(caplog): + """Test that the decorator raises an exception if the role is already registered.""" + registered = REGISTERED_ACCESS_ROLES.copy() + + result = register_custom_access_role(FXAPIAccessRoleOrg) + assert result == FXAPIAccessRoleOrg + assert 'Trying to register a custom role fx_api_access that is already registered!' in caplog.text + assert registered == REGISTERED_ACCESS_ROLES + + +def test_register_custom_access_role_register_new(): + """Test that the decorator raises an exception if the role is already registered.""" + class DummyRole: # pylint: disable=too-few-public-methods + """Dummy custom role""" + ROLE = 'fake-role' + + expected_registered = REGISTERED_ACCESS_ROLES.copy() + expected_registered['fake-role'] = DummyRole + expected_choices = [(role.ROLE, role.ROLE) for role in expected_registered.values()] + + result = register_custom_access_role(DummyRole) + assert result == DummyRole + assert expected_registered == REGISTERED_ACCESS_ROLES + assert CourseAccessRoleForm.COURSE_ACCESS_ROLES == expected_choices + assert CourseAccessRoleForm.declared_fields['role'].choices == expected_choices diff --git a/tests/test_helpers/test_models.py b/tests/test_helpers/test_models.py index d2793892..2a2f55fc 100644 --- a/tests/test_helpers/test_models.py +++ b/tests/test_helpers/test_models.py @@ -1,12 +1,15 @@ """Tests for models module.""" +from itertools import product from unittest.mock import Mock, patch import pytest +from common.djangoapps.student.models import CourseAccessRole from django.core.exceptions import ValidationError +from django.utils import timezone from futurex_openedx_extensions.helpers.clickhouse_operations import ClickhouseBaseError from futurex_openedx_extensions.helpers.exceptions import FXCodedException, FXExceptionCodes -from futurex_openedx_extensions.helpers.models import ClickhouseQuery, DataExportTask +from futurex_openedx_extensions.helpers.models import ClickhouseQuery, DataExportTask, ViewUserMapping @pytest.fixture @@ -39,6 +42,16 @@ def get_client_mock(settings): yield mocked +@pytest.fixture +def view_user_mapping(): + """Return a ViewUserMapping instance.""" + return ViewUserMapping( + user_id=1, + view_name='test_view', + enabled=True, + ) + + @pytest.mark.parametrize('sample_data, configured_type, expected_result', [ ('1', 'int', 1), ('1', 'float', 1.0), @@ -521,3 +534,80 @@ def test_data_export_task_set_progress_invalid_value(base_data, invalid_progress DataExportTask.set_progress(task.id, invalid_progress) assert exc_info.value.code == FXExceptionCodes.EXPORT_CSV_TASK_INVALID_PROGRESS_VALUE.value assert str(exc_info.value) == f'Invalid progress value! ({invalid_progress}).' + + +@pytest.mark.django_db +@pytest.mark.parametrize('role_definition, is_staff, is_active, enabled, expires_at', product( + [ # expected_permitted , role, org, course_id + (True, 'global', '', ''), + (True, 'tenant_only', 'org1', ''), + (True, 'course_only', 'org1', 'course-v1:org+1+1'), + (True, 'tenant_or_course', 'org1', ''), + (True, 'tenant_or_course', 'org1', 'course-v1:org+1+1'), + (False, 'global', 'org', ''), + (False, 'global', 'org', 'course-v1:org+1+1'), + (False, 'global', '', 'course-v1:org+1+1'), + (False, 'tenant_only', '', ''), + (False, 'tenant_only', 'org1', 'course-v1:org+1+1'), + (False, 'tenant_only', '', 'course-v1:org+1+1'), + (False, 'course_only', '', 'course-v1:org+1+1'), + (False, 'course_only', 'org1', ''), + (False, 'course_only', '', ''), + (False, 'tenant_or_course', '', ''), + (False, 'tenant_or_course', '', 'course-v1:org+1+1'), + ], + [True, False], + [True, False], + [True, False], + [None, timezone.now() + timezone.timedelta(minutes=1), timezone.now() + timezone.timedelta(minutes=-1)], +)) +def test_view_user_mapping_manager_get_queryset( + role_definition, is_staff, is_active, enabled, expires_at, base_data, view_user_mapping, +): # pylint: disable=unused-argument, too-many-arguments, redefined-outer-name + """ + Verify that ViewAllowedRolesManager.get_queryset returns correct value. This is a critical test that ensures + that get_queryset method is returning the correct queryset based on the role definition and user properties. The + returned annotations will determine if an API mapping is usable or not. This why we test all possible combinations + of the role definition and user properties with no exceptions. + """ + user_id = 1 + expected_permitted = role_definition[0] + role = role_definition[1] + org = role_definition[2] + course_id = role_definition[3] + CourseAccessRole.objects.create( + user_id=user_id, + role=role, + org=org, + course_id=course_id, + ) + view_user_mapping.user.is_staff = is_staff + view_user_mapping.user.is_superuser = is_staff + view_user_mapping.user.is_active = is_active + view_user_mapping.user.save() + view_user_mapping.enabled = enabled + view_user_mapping.expires_at = expires_at + view_user_mapping.save() + + expected_usable = is_active and ( + is_staff or ( + expected_permitted and enabled and ( + expires_at is None or expires_at > timezone.now() + ) + ) + ) + + with patch('futurex_openedx_extensions.helpers.models.get_allowed_roles') as mocked_roles: + mocked_roles.return_value = { + 'global': ['global'], + 'tenant_only': ['tenant_only'], + 'course_only': ['course_only'], + 'tenant_or_course': ['tenant_or_course'], + } + assert view_user_mapping.get_is_user_active() == is_active + assert view_user_mapping.get_is_user_system_staff() == is_staff + assert view_user_mapping.get_has_access_role() == expected_permitted + assert view_user_mapping.get_usable() == expected_usable + assert ViewUserMapping.is_usable_access( + view_user_mapping.user, view_user_mapping.view_name, + ) == expected_usable diff --git a/tests/test_helpers/test_permissions.py b/tests/test_helpers/test_permissions.py index 431e6dc0..172484a6 100644 --- a/tests/test_helpers/test_permissions.py +++ b/tests/test_helpers/test_permissions.py @@ -22,7 +22,7 @@ @pytest.fixture def dummy_view(): """Dummy view fixture""" - class DummyView: # pylint: disable=too-few-public-methods + class DummyView: """Dummy view class""" fx_view_name = 'dummyView' @@ -32,8 +32,12 @@ def __init__(self): } def get_allowed_roles_all_views(self): - """Get allowed roles for all views""" - return self.result_of_method + """Adding method to bypass the type check""" + + def get_view_user_roles_mapping(self, user, view_name): # pylint: disable=unused-argument + """Get view user roles mapping""" + return self.result_of_method[view_name] + return DummyView() diff --git a/tests/test_helpers/test_roles.py b/tests/test_helpers/test_roles.py index 5c16806f..7023fec7 100644 --- a/tests/test_helpers/test_roles.py +++ b/tests/test_helpers/test_roles.py @@ -783,6 +783,38 @@ def test_fx_view_role_mixin_fx_permission_info_available(): assert mixin.fx_permission_info == {'dummy': ['data']} +@pytest.mark.django_db +@pytest.mark.parametrize('is_staff, is_usable, expected_allowed', [ + (False, False, False), + (True, False, True), + (False, True, True), + (True, True, True), +]) +@patch('futurex_openedx_extensions.helpers.roles.ViewUserMapping.is_usable_access') +@patch('futurex_openedx_extensions.helpers.roles.is_system_staff_user') +def test_fx_view_role_mixin_get_view_user_roles_mapping_allowed( + mock_is_staff, mock_is_usable, is_staff, is_usable, expected_allowed, base_data, +): # pylint: disable=unused-argument, too-many-arguments + """Verify that get_view_user_roles_mapping returns the expected result for system staff users.""" + mock_is_staff.return_value = is_staff + mock_is_usable.return_value = is_usable + + mixin = FXViewRoleInfoMixin() + mixin.get_allowed_roles_all_views = Mock( + return_value={'view1': ['staff', cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING[0]]} + ) + user = get_user_model().objects.create(username='test_user') + + assert not DeepDiff( + mixin.get_view_user_roles_mapping('view1', user), + ['staff'] + cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING if expected_allowed else ['staff'], + ) + assert not DeepDiff( + mixin.get_view_user_roles_mapping('view2', user), + cs.COURSE_ACCESS_ROLES_USER_VIEW_MAPPING if expected_allowed else [], + ) + + @pytest.mark.django_db def test_get_usernames_with_access_roles(base_data): # pylint: disable=unused-argument """Verify that get_usernames_with_access_roles returns the expected value."""