diff --git a/backend/app.py b/backend/app.py index bbe337c3..af04255e 100644 --- a/backend/app.py +++ b/backend/app.py @@ -1,12 +1,14 @@ import uvicorn from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware from starlette import status from starlette.requests import Request from starlette.responses import JSONResponse from db.errors.database_errors import ActionAlreadyPerformedError, ItemNotFoundError, NoSuchRelationError -from routes.errors.authentication import InvalidRoleCredentialsError, NoAccessToSubjectError +from routes.errors.authentication import InvalidAuthenticationError, InvalidRoleCredentialsError, NoAccessToSubjectError from routes.group import group_router +from routes.login import login_router from routes.project import project_router from routes.student import student_router from routes.subject import subject_router @@ -16,6 +18,7 @@ app = FastAPI() # Koppel routes uit andere modules. +app.include_router(login_router, prefix="/api") app.include_router(student_router, prefix="/api") app.include_router(teacher_router, prefix="/api") app.include_router(users_router, prefix="/api") @@ -23,10 +26,11 @@ app.include_router(subject_router, prefix="/api") app.include_router(group_router, prefix="/api") -DEBUG = False # Should always be false in repo +DEBUG = False # Should always be false in repo if DEBUG: from fastapi.middleware.cors import CORSMiddleware + origins = [ "https://localhost", "https://localhost:8080", @@ -41,6 +45,7 @@ allow_headers=["*"], ) + # Koppel de exception handlers @app.exception_handler(InvalidRoleCredentialsError) def invalid_admin_credentials_error_handler(request: Request, exc: InvalidRoleCredentialsError) -> JSONResponse: @@ -82,5 +87,13 @@ def no_such_relation_error_handler(request: Request, exc: NoSuchRelationError) - ) +@app.exception_handler(InvalidAuthenticationError) +def invalid_authentication_error_handler(request: Request, exc: NoSuchRelationError) -> JSONResponse: + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"detail": str(exc)}, + ) + + if __name__ == "__main__": uvicorn.run("app:app") diff --git a/backend/controllers/auth/authentication_controller.py b/backend/controllers/auth/authentication_controller.py new file mode 100644 index 00000000..a8bb2748 --- /dev/null +++ b/backend/controllers/auth/authentication_controller.py @@ -0,0 +1,80 @@ +import os +import string +from typing import TYPE_CHECKING + +import httpx +from defusedxml.ElementTree import fromstring +from sqlalchemy.orm import Session + +from domain.logic.student import create_student +from domain.logic.teacher import create_teacher +from domain.logic.user import get_user_with_email +from domain.models.UserDataclass import UserDataclass + +if TYPE_CHECKING: + from _elementtree import Element + +cas_service = os.getenv("CAS_URL", "https://localhost:8080/login") + + +def authenticate_user(session: Session, ticket: str) -> UserDataclass | None: + """ + This function will authenticate the user. + If the use doesn't yet exist in the database, it will create an entry. + a + + :param session: Session with the database + :param ticket: A ticket from login.ugent.be/login?service=https://localhost:8080/login + :return: None if the authentication failed, user: UseDataclass is the authentication was successful + """ + allowed_chars = set(string.ascii_letters + string.digits + "-") + if not all(c in allowed_chars for c in ticket): + return None + user_information = httpx.get(f"https://login.ugent.be/serviceValidate?service={cas_service}&ticket={ticket}") + user_dict: dict | None = parse_cas_xml(user_information.text) + if user_dict is None: + return None + + user: UserDataclass | None = get_user_with_email(session, user_dict["email"]) + if user is None: + if user_dict["role"] == "student": + user = create_student(session, user_dict["name"], user_dict["email"]) + elif user_dict["role"] == "teacher": + user = create_teacher(session, user_dict["name"], user_dict["email"]) + return user + + +def parse_cas_xml(xml: str) -> dict | None: + """ + The authentication with CAS returns a xml-object. + This function will read the necessary attributes and return them in a dictionary. + + :param xml: str: response xml from CAS + :return: None if the authentication failed else dict + """ + + namespace = "{http://www.yale.edu/tp/cas}" + root: Element | None = fromstring(xml).find(f"{namespace}authenticationSuccess") + if root is None: + return None + user_information: Element | None = root.find(f"{namespace}attributes") + if user_information is None: + return None + givenname: Element | None = user_information.find(f"{namespace}givenname") + surname: Element | None = user_information.find(f"{namespace}surname") + email: Element | None = user_information.find(f"{namespace}mail") + role: list | None = user_information.findall(f"{namespace}objectClass") + if role is not None and givenname is not None and surname is not None and email is not None: + role_str = "" + for r in role: + if r.text == "ugentStudent" and role_str == "": + role_str = "student" + elif r.text == "ugentEmployee": + role_str = "teacher" + + return { + "email": email.text.lower(), + "name": f"{givenname.text} {surname.text}", + "role": role_str, + } + return None diff --git a/backend/controllers/auth/token_controller.py b/backend/controllers/auth/token_controller.py new file mode 100644 index 00000000..ceefcb37 --- /dev/null +++ b/backend/controllers/auth/token_controller.py @@ -0,0 +1,25 @@ +import contextlib +import os +from datetime import UTC, datetime, timedelta + +import jwt + +from domain.models.UserDataclass import UserDataclass + +# Zeker aanpassen in production +jwt_secret = os.getenv("JWT_SECRET", "secret") + + +def verify_token(token: str) -> int | None: + with contextlib.suppress(jwt.ExpiredSignatureError, jwt.DecodeError): + payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) + return payload.get("uid", None) + + +def create_token(user: UserDataclass) -> str: + expire = datetime.now(UTC) + timedelta(days=1) + to_encode: dict = { + "uid": user.id, + "exp": expire, + } + return jwt.encode(to_encode, jwt_secret, algorithm="HS256") diff --git a/backend/domain/logic/user.py b/backend/domain/logic/user.py index b3f25f9b..1125191a 100644 --- a/backend/domain/logic/user.py +++ b/backend/domain/logic/user.py @@ -1,3 +1,4 @@ +from sqlalchemy import select from sqlalchemy.orm import Session from db.models.models import Admin, Student, Teacher, User @@ -29,6 +30,20 @@ def get_user(session: Session, user_id: int) -> UserDataclass: return get(session, User, user_id).to_domain_model() +def get_user_with_email(session: Session, email: str) -> UserDataclass | None: + stmt = select(User).where(User.email == email) + result = session.execute(stmt) + users = [r.to_domain_model() for r in result.scalars()] + + if len(users) > 1: + raise NotImplementedError + + if len(users) == 1: + return users[0] + + return None + + def get_all_users(session: Session) -> list[UserDataclass]: return [user.to_domain_model() for user in get_all(session, User)] diff --git a/backend/domain/models/APIUser.py b/backend/domain/models/APIUser.py index c37fb477..c4ec1a67 100644 --- a/backend/domain/models/APIUser.py +++ b/backend/domain/models/APIUser.py @@ -8,3 +8,7 @@ class APIUser(BaseModel): name: str email: EmailStr roles: list[Role] + + +class LoginResponse(BaseModel): + token: str diff --git a/backend/requirements.txt b/backend/requirements.txt index 2e62ebad..7041d175 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -21,3 +21,7 @@ SQLAlchemy==2.0.27 starlette==0.36.3 typing_extensions==4.10.0 uvicorn==0.27.1 +httpx==0.27.0 +defusedxml~=0.7.1 +cryptography~=42.0.5 +PyJWT~=2.8.0 \ No newline at end of file diff --git a/backend/routes/dependencies/role_dependencies.py b/backend/routes/dependencies/role_dependencies.py index 9de95992..559c08ba 100644 --- a/backend/routes/dependencies/role_dependencies.py +++ b/backend/routes/dependencies/role_dependencies.py @@ -1,6 +1,8 @@ from fastapi import Depends +from fastapi.security import APIKeyHeader from sqlalchemy.orm import Session +from controllers.auth.token_controller import verify_token from db.sessions import get_session from domain.logic.admin import get_admin, is_user_admin from domain.logic.group import get_group @@ -13,35 +15,47 @@ from domain.models.TeacherDataclass import TeacherDataclass from routes.errors.authentication import ( InvalidAdminCredentialsError, + InvalidAuthenticationError, InvalidStudentCredentialsError, InvalidTeacherCredentialsError, NoAccessToSubjectError, ) +auth_scheme = APIKeyHeader(name="cas") -def get_authenticated_user() -> int: - return 1 # Checken of een user bestaat en/of hij de juiste credentials heeft. +def get_authenticated_user(token: str = Depends(auth_scheme)) -> int: + uid = verify_token(token) + if uid is None: + raise InvalidAuthenticationError + return uid -def get_authenticated_admin(session: Session = Depends(get_session)) -> AdminDataclass: - user_id = get_authenticated_user() - if not is_user_admin(session, user_id): + +def get_authenticated_admin( + session: Session = Depends(get_session), + uid: int = Depends(get_authenticated_user), +) -> AdminDataclass: + if not is_user_admin(session, uid): raise InvalidAdminCredentialsError - return get_admin(session, user_id) + return get_admin(session, uid) -def get_authenticated_teacher(session: Session = Depends(get_session)) -> TeacherDataclass: - user_id = get_authenticated_user() - if not is_user_teacher(session, user_id): +def get_authenticated_teacher( + session: Session = Depends(get_session), + uid: int = Depends(get_authenticated_user), +) -> TeacherDataclass: + if not is_user_teacher(session, uid): raise InvalidTeacherCredentialsError - return get_teacher(session, user_id) + return get_teacher(session, uid) -def get_authenticated_student(session: Session = Depends(get_session)) -> StudentDataclass: - user_id = get_authenticated_user() - if not is_user_student(session, user_id): +def get_authenticated_student( + session: Session = Depends(get_session), + uid: int = Depends(get_authenticated_user), +) -> StudentDataclass: + if not is_user_student(session, uid): raise InvalidStudentCredentialsError - return get_student(session, user_id) + return get_student(session, uid) def ensure_user_authorized_for_subject( diff --git a/backend/routes/errors/authentication.py b/backend/routes/errors/authentication.py index 4c51dd14..8c9b507d 100644 --- a/backend/routes/errors/authentication.py +++ b/backend/routes/errors/authentication.py @@ -16,3 +16,7 @@ class InvalidStudentCredentialsError(InvalidRoleCredentialsError): class NoAccessToSubjectError(Exception): ERROR_MESSAGE = "User doesn't have access to subject" + + +class InvalidAuthenticationError(Exception): + ERROR_MESSAGE = "User is not authenticated" diff --git a/backend/routes/login.py b/backend/routes/login.py index e69de29b..d7d43773 100644 --- a/backend/routes/login.py +++ b/backend/routes/login.py @@ -0,0 +1,33 @@ +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session + +from controllers.auth.authentication_controller import authenticate_user +from controllers.auth.token_controller import create_token +from db.sessions import get_session +from domain.models.APIUser import LoginResponse +from domain.models.UserDataclass import UserDataclass +from routes.errors.authentication import InvalidAuthenticationError + +# test url: https://login.ugent.be/login?service=https://localhost:8080/api/login +login_router = APIRouter() + + +@login_router.get("/login") +def login( + ticket: str, + session: Session = Depends(get_session), +) -> LoginResponse: + """ + This function starts a session for the user. + For authentication, it uses the given ticket and the UGent CAS server (https://login.ugent.be). + + :param session: + :param ticket: str: A UGent CAS ticket that will be used for authentication. + :return: + - Valid Ticket: Response: with a JWT token; + - Invalid Ticket: Response: with status_code 401 (unauthenticated) and an error message + """ + user: UserDataclass | None = authenticate_user(session, ticket) + if not user: + raise InvalidAuthenticationError + return LoginResponse(token=create_token(user)) diff --git a/backend/routes/user.py b/backend/routes/user.py index ceb208e7..e3544546 100644 --- a/backend/routes/user.py +++ b/backend/routes/user.py @@ -5,7 +5,7 @@ from db.sessions import get_session from domain.logic.basic_operations import get, get_all from domain.logic.role_enum import Role -from domain.logic.user import convert_user, modify_user_roles +from domain.logic.user import convert_user, get_user, modify_user_roles from domain.models.APIUser import APIUser from domain.models.UserDataclass import UserDataclass from routes.dependencies.role_dependencies import get_authenticated_admin, get_authenticated_user @@ -16,10 +16,9 @@ @users_router.get("/user") def get_current_user( session: Session = Depends(get_session), - user_id: int = Depends(get_authenticated_user), + uid: int = Depends(get_authenticated_user), ) -> APIUser: - user: UserDataclass = get(session, User, user_id).to_domain_model() - return convert_user(session, user) + return convert_user(session, get_user(session, uid)) @users_router.get("/users", dependencies=[Depends(get_authenticated_admin)]) @@ -29,7 +28,7 @@ def get_users(session: Session = Depends(get_session)) -> list[APIUser]: @users_router.get("/users/{uid}", dependencies=[Depends(get_authenticated_admin)]) -def get_user(uid: int, session: Session = Depends(get_session)) -> APIUser: +def admin_get_user(uid: int, session: Session = Depends(get_session)) -> APIUser: user: UserDataclass = get(session, User, uid).to_domain_model() return convert_user(session, user) diff --git a/backend/tests/authentication_test.py b/backend/tests/authentication_test.py new file mode 100644 index 00000000..fdc5f66d --- /dev/null +++ b/backend/tests/authentication_test.py @@ -0,0 +1,14 @@ +import unittest + +from fastapi.testclient import TestClient + +from app import app + + +class AuthenticationTest(unittest.TestCase): + def setUp(self) -> None: + self.app = TestClient(app) + + +if __name__ == "__main__": + unittest.main()