Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Commit

Permalink
Implement authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
msathieu committed Mar 13, 2024
1 parent 1ed07ad commit 20e10ca
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 39 deletions.
10 changes: 9 additions & 1 deletion backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from starlette.responses import JSONResponse

from db.errors.database_errors import ActionAlreadyPerformedError, ItemNotFoundError, NoSuchRelationError
from routes.errors.authentication import InvalidRoleCredentialsError, NoAccessToSubjectError
from routes.errors.authentication import InvalidRoleCredentialsError, InvalidTokenError, NoAccessToSubjectError
from routes.group import group_router
from routes.login import login_router
from routes.project import project_router
Expand Down Expand Up @@ -87,5 +87,13 @@ def no_such_relation_error_handler(request: Request, exc: NoSuchRelationError) -
)


@app.exception_handler(InvalidTokenError)
def invalid_token_error_handler(request: Request, exc: NoSuchRelationError) -> JSONResponse:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": str(exc)},
)


if __name__ == "__main__":
uvicorn.run("app:app")
12 changes: 6 additions & 6 deletions backend/controllers/auth/token_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

def verify_token(token: str) -> int | None:
with contextlib.suppress(jwt.ExpiredSignatureError, jwt.DecodeError):
payload = jwt.decode(token, jwt_secret)
return payload.get("userid", None)
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
return payload.get("uid", None)


def create_token(user: UserDataclass) -> str:
exprire = datetime.now(UTC) + timedelta(days=1)
expire = datetime.now(UTC) + timedelta(days=1)
to_encode: dict = {
"userid": user.id,
"exp": exprire,
"uid": user.id,
"exp": expire,
}
return jwt.encode(to_encode, jwt_secret)
return jwt.encode(to_encode, jwt_secret, algorithm="HS256")
42 changes: 28 additions & 14 deletions backend/routes/dependencies/role_dependencies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from fastapi import Depends
from fastapi.security import APIKeyHeader
from sqlalchemy.orm import Session

from controllers.auth.token_controller import verify_token
from db.sessions import get_session
from domain.logic.admin import get_admin, is_user_admin
from domain.logic.group import get_group
Expand All @@ -15,33 +17,45 @@
InvalidAdminCredentialsError,
InvalidStudentCredentialsError,
InvalidTeacherCredentialsError,
InvalidTokenError,
NoAccessToSubjectError,
)

auth_scheme = APIKeyHeader(name="cas")

def get_authenticated_user() -> int:
return 1 # Checken of een user bestaat en/of hij de juiste credentials heeft.

def get_authenticated_user(token: str = Depends(auth_scheme)) -> int:
uid = verify_token(token)
if uid is None:
raise InvalidTokenError
return uid

def get_authenticated_admin(session: Session = Depends(get_session)) -> AdminDataclass:
user_id = get_authenticated_user()
if not is_user_admin(session, user_id):

def get_authenticated_admin(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> AdminDataclass:
if not is_user_admin(session, uid):
raise InvalidAdminCredentialsError
return get_admin(session, user_id)
return get_admin(session, uid)


def get_authenticated_teacher(session: Session = Depends(get_session)) -> TeacherDataclass:
user_id = get_authenticated_user()
if not is_user_teacher(session, user_id):
def get_authenticated_teacher(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> TeacherDataclass:
if not is_user_teacher(session, uid):
raise InvalidTeacherCredentialsError
return get_teacher(session, user_id)
return get_teacher(session, uid)


def get_authenticated_student(session: Session = Depends(get_session)) -> StudentDataclass:
user_id = get_authenticated_user()
if not is_user_student(session, user_id):
def get_authenticated_student(
session: Session = Depends(get_session),
uid: int = Depends(get_authenticated_user),
) -> StudentDataclass:
if not is_user_student(session, uid):
raise InvalidStudentCredentialsError
return get_student(session, user_id)
return get_student(session, uid)


def ensure_user_authorized_for_subject(
Expand Down
4 changes: 4 additions & 0 deletions backend/routes/errors/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ class InvalidStudentCredentialsError(InvalidRoleCredentialsError):

class NoAccessToSubjectError(Exception):
ERROR_MESSAGE = "User doesn't have access to subject"


class InvalidTokenError(Exception):
ERROR_MESSAGE = "User is not authenticated"
25 changes: 7 additions & 18 deletions backend/routes/user.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
from fastapi import APIRouter, Depends, Request, Response, status
from fastapi import APIRouter, Depends, Response, status
from sqlalchemy.orm import Session

from controllers.auth.token_controller import verify_token
from db.models.models import User
from db.sessions import get_session
from domain.logic.basic_operations import get, get_all
from domain.logic.role_enum import Role
from domain.logic.user import convert_user, modify_user_roles
from domain.logic.user import convert_user, get_user, modify_user_roles
from domain.models.APIUser import APIUser
from domain.models.UserDataclass import UserDataclass
from routes.dependencies.role_dependencies import get_authenticated_admin
from routes.dependencies.role_dependencies import get_authenticated_admin, get_authenticated_user

users_router = APIRouter()


@users_router.get("/user")
def get_current_user(
response: Response,
request: Request,
session: Session = Depends(get_session),
) -> APIUser | None:
token: str | None = request.cookies.get("token")
if token is None:
response.status_code = 401
return None
user_id: int | None = verify_token(token)
if user_id is None:
response.status_code = 401
return None
user: UserDataclass = get(session, User, user_id).to_domain_model()
return convert_user(session, user)
uid: int = Depends(get_authenticated_user),
) -> APIUser:
return convert_user(session, get_user(session, uid))


@users_router.get("/users", dependencies=[Depends(get_authenticated_admin)])
Expand All @@ -39,7 +28,7 @@ def get_users(session: Session = Depends(get_session)) -> list[APIUser]:


@users_router.get("/users/{uid}", dependencies=[Depends(get_authenticated_admin)])
def get_user(uid: int, session: Session = Depends(get_session)) -> APIUser:
def admin_get_user(uid: int, session: Session = Depends(get_session)) -> APIUser:
user: UserDataclass = get(session, User, uid).to_domain_model()
return convert_user(session, user)

Expand Down

0 comments on commit 20e10ca

Please sign in to comment.