Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Login route #68

Merged
merged 21 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions backend/app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette import status
from starlette.requests import Request
from starlette.responses import JSONResponse

from db.errors.database_errors import ActionAlreadyPerformedError, ItemNotFoundError, NoSuchRelationError
from routes.errors.authentication import InvalidRoleCredentialsError, NoAccessToSubjectError
from routes.errors.authentication import InvalidAuthenticationError, InvalidRoleCredentialsError, NoAccessToSubjectError
from routes.group import group_router
from routes.login import login_router
from routes.project import project_router
from routes.student import student_router
from routes.subject import subject_router
Expand All @@ -16,17 +18,19 @@
app = FastAPI()

# Koppel routes uit andere modules.
app.include_router(login_router, prefix="/api")
app.include_router(student_router, prefix="/api")
app.include_router(teacher_router, prefix="/api")
app.include_router(users_router, prefix="/api")
app.include_router(project_router, prefix="/api")
app.include_router(subject_router, prefix="/api")
app.include_router(group_router, prefix="/api")

DEBUG = False # Should always be false in repo
DEBUG = False # Should always be false in repo

if DEBUG:
from fastapi.middleware.cors import CORSMiddleware

origins = [
"https://localhost",
"https://localhost:8080",
Expand All @@ -41,6 +45,7 @@
allow_headers=["*"],
)


# Koppel de exception handlers
@app.exception_handler(InvalidRoleCredentialsError)
def invalid_admin_credentials_error_handler(request: Request, exc: InvalidRoleCredentialsError) -> JSONResponse:
Expand Down Expand Up @@ -82,5 +87,13 @@ def no_such_relation_error_handler(request: Request, exc: NoSuchRelationError) -
)


@app.exception_handler(InvalidAuthenticationError)
def invalid_authentication_error_handler(request: Request, exc: NoSuchRelationError) -> JSONResponse:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": str(exc)},
)


if __name__ == "__main__":
uvicorn.run("app:app")
80 changes: 80 additions & 0 deletions backend/controllers/auth/authentication_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import string
from typing import TYPE_CHECKING

import httpx
from defusedxml.ElementTree import fromstring
from sqlalchemy.orm import Session

from domain.logic.student import create_student
from domain.logic.teacher import create_teacher
from domain.logic.user import get_user_with_email
from domain.models.UserDataclass import UserDataclass

if TYPE_CHECKING:
from _elementtree import Element

cas_service = os.getenv("CAS_URL", "https://localhost:8080/login")


def authenticate_user(session: Session, ticket: str) -> UserDataclass | None:
"""
This function will authenticate the user.
If the use doesn't yet exist in the database, it will create an entry.
a

:param session: Session with the database
:param ticket: A ticket from login.ugent.be/login?service=https://localhost:8080/login
:return: None if the authentication failed, user: UseDataclass is the authentication was successful
"""
allowed_chars = set(string.ascii_letters + string.digits + "-")
if not all(c in allowed_chars for c in ticket):
return None
user_information = httpx.get(f"https://login.ugent.be/serviceValidate?service={cas_service}&ticket={ticket}")
user_dict: dict | None = parse_cas_xml(user_information.text)
if user_dict is None:
return None

user: UserDataclass | None = get_user_with_email(session, user_dict["email"])
if user is None:
if user_dict["role"] == "student":
user = create_student(session, user_dict["name"], user_dict["email"])
elif user_dict["role"] == "teacher":
user = create_teacher(session, user_dict["name"], user_dict["email"])
return user


def parse_cas_xml(xml: str) -> dict | None:
"""
The authentication with CAS returns a xml-object.
This function will read the necessary attributes and return them in a dictionary.

:param xml: str: response xml from CAS
:return: None if the authentication failed else dict
"""

namespace = "{http://www.yale.edu/tp/cas}"
root: Element | None = fromstring(xml).find(f"{namespace}authenticationSuccess")
if root is None:
return None
user_information: Element | None = root.find(f"{namespace}attributes")
if user_information is None:
return None
givenname: Element | None = user_information.find(f"{namespace}givenname")
surname: Element | None = user_information.find(f"{namespace}surname")
email: Element | None = user_information.find(f"{namespace}mail")
role: list | None = user_information.findall(f"{namespace}objectClass")
if role is not None and givenname is not None and surname is not None and email is not None:
role_str = ""
for r in role:
if r.text == "ugentStudent" and role_str == "":
role_str = "student"
elif r.text == "ugentEmployee":
role_str = "teacher"

return {
"email": email.text.lower(),
"name": f"{givenname.text} {surname.text}",
"role": role_str,
}
return None
25 changes: 25 additions & 0 deletions backend/controllers/auth/token_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import contextlib
import os
from datetime import UTC, datetime, timedelta

import jwt

from domain.models.UserDataclass import UserDataclass

# Zeker aanpassen in production
jwt_secret = os.getenv("JWT_SECRET", "secret")


def verify_token(token: str) -> int | None:
with contextlib.suppress(jwt.ExpiredSignatureError, jwt.DecodeError):
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
return payload.get("uid", None)


def create_token(user: UserDataclass) -> str:
expire = datetime.now(UTC) + timedelta(days=1)
to_encode: dict = {
"uid": user.id,
"exp": expire,
}
return jwt.encode(to_encode, jwt_secret, algorithm="HS256")
15 changes: 15 additions & 0 deletions backend/domain/logic/user.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sqlalchemy import select
from sqlalchemy.orm import Session

from db.models.models import Admin, Student, Teacher, User
Expand Down Expand Up @@ -29,6 +30,20 @@ def get_user(session: Session, user_id: int) -> UserDataclass:
return get(session, User, user_id).to_domain_model()


def get_user_with_email(session: Session, email: str) -> UserDataclass | None:
stmt = select(User).where(User.email == email)
result = session.execute(stmt)
users = [r.to_domain_model() for r in result.scalars()]

if len(users) > 1:
raise NotImplementedError

if len(users) == 1:
return users[0]

return None


def get_all_users(session: Session) -> list[UserDataclass]:
return [user.to_domain_model() for user in get_all(session, User)]

Expand Down
4 changes: 4 additions & 0 deletions backend/domain/models/APIUser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ class APIUser(BaseModel):
name: str
email: EmailStr
roles: list[Role]


class LoginResponse(BaseModel):
token: str
4 changes: 4 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ SQLAlchemy==2.0.27
starlette==0.36.3
typing_extensions==4.10.0
uvicorn==0.27.1
httpx==0.27.0
defusedxml~=0.7.1
cryptography~=42.0.5
PyJWT~=2.8.0
42 changes: 28 additions & 14 deletions backend/routes/dependencies/role_dependencies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from fastapi import Depends
from fastapi.security import APIKeyHeader
from sqlalchemy.orm import Session

from controllers.auth.token_controller import verify_token
from db.sessions import get_session
from domain.logic.admin import get_admin, is_user_admin
from domain.logic.group import get_group
Expand All @@ -13,35 +15,47 @@
from domain.models.TeacherDataclass import TeacherDataclass
from routes.errors.authentication import (
InvalidAdminCredentialsError,
InvalidAuthenticationError,
InvalidStudentCredentialsError,
InvalidTeacherCredentialsError,
NoAccessToSubjectError,
)

auth_scheme = APIKeyHeader(name="cas")

def get_authenticated_user() -> int:
return 1 # Checken of een user bestaat en/of hij de juiste credentials heeft.

def get_authenticated_user(token: str = Depends(auth_scheme)) -> int:
uid = verify_token(token)
if uid is None:
raise InvalidAuthenticationError
return uid

def get_authenticated_admin(session: Session = Depends(get_session)) -> AdminDataclass:
user_id = get_authenticated_user()
if not is_user_admin(session, user_id):

def get_authenticated_admin(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> AdminDataclass:
if not is_user_admin(session, uid):
raise InvalidAdminCredentialsError
return get_admin(session, user_id)
return get_admin(session, uid)


def get_authenticated_teacher(session: Session = Depends(get_session)) -> TeacherDataclass:
user_id = get_authenticated_user()
if not is_user_teacher(session, user_id):
def get_authenticated_teacher(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> TeacherDataclass:
if not is_user_teacher(session, uid):
raise InvalidTeacherCredentialsError
return get_teacher(session, user_id)
return get_teacher(session, uid)


def get_authenticated_student(session: Session = Depends(get_session)) -> StudentDataclass:
user_id = get_authenticated_user()
if not is_user_student(session, user_id):
def get_authenticated_student(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> StudentDataclass:
if not is_user_student(session, uid):
raise InvalidStudentCredentialsError
return get_student(session, user_id)
return get_student(session, uid)


def ensure_user_authorized_for_subject(
Expand Down
4 changes: 4 additions & 0 deletions backend/routes/errors/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ class InvalidStudentCredentialsError(InvalidRoleCredentialsError):

class NoAccessToSubjectError(Exception):
ERROR_MESSAGE = "User doesn't have access to subject"


class InvalidAuthenticationError(Exception):
ERROR_MESSAGE = "User is not authenticated"
33 changes: 33 additions & 0 deletions backend/routes/login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session

from controllers.auth.authentication_controller import authenticate_user
from controllers.auth.token_controller import create_token
from db.sessions import get_session
from domain.models.APIUser import LoginResponse
from domain.models.UserDataclass import UserDataclass
from routes.errors.authentication import InvalidAuthenticationError

# test url: https://login.ugent.be/login?service=https://localhost:8080/api/login
login_router = APIRouter()


@login_router.get("/login")
def login(
ticket: str,
session: Session = Depends(get_session),
) -> LoginResponse:
"""
This function starts a session for the user.
For authentication, it uses the given ticket and the UGent CAS server (https://login.ugent.be).

:param session:
:param ticket: str: A UGent CAS ticket that will be used for authentication.
:return:
- Valid Ticket: Response: with a JWT token;
- Invalid Ticket: Response: with status_code 401 (unauthenticated) and an error message
"""
user: UserDataclass | None = authenticate_user(session, ticket)
if not user:
raise InvalidAuthenticationError
return LoginResponse(token=create_token(user))
9 changes: 4 additions & 5 deletions backend/routes/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from db.sessions import get_session
from domain.logic.basic_operations import get, get_all
from domain.logic.role_enum import Role
from domain.logic.user import convert_user, modify_user_roles
from domain.logic.user import convert_user, get_user, modify_user_roles
from domain.models.APIUser import APIUser
from domain.models.UserDataclass import UserDataclass
from routes.dependencies.role_dependencies import get_authenticated_admin, get_authenticated_user
Expand All @@ -16,10 +16,9 @@
@users_router.get("/user")
def get_current_user(
session: Session = Depends(get_session),
user_id: int = Depends(get_authenticated_user),
uid: int = Depends(get_authenticated_user),
) -> APIUser:
user: UserDataclass = get(session, User, user_id).to_domain_model()
return convert_user(session, user)
return convert_user(session, get_user(session, uid))


@users_router.get("/users", dependencies=[Depends(get_authenticated_admin)])
Expand All @@ -29,7 +28,7 @@ def get_users(session: Session = Depends(get_session)) -> list[APIUser]:


@users_router.get("/users/{uid}", dependencies=[Depends(get_authenticated_admin)])
def get_user(uid: int, session: Session = Depends(get_session)) -> APIUser:
def admin_get_user(uid: int, session: Session = Depends(get_session)) -> APIUser:
user: UserDataclass = get(session, User, uid).to_domain_model()
return convert_user(session, user)

Expand Down
14 changes: 14 additions & 0 deletions backend/tests/authentication_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import unittest

from fastapi.testclient import TestClient

from app import app


class AuthenticationTest(unittest.TestCase):
def setUp(self) -> None:
self.app = TestClient(app)


if __name__ == "__main__":
unittest.main()