This repository has been archived by the owner on Sep 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #68 from SELab-2/login_route
Login route
- Loading branch information
Showing
11 changed files
with
226 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import os | ||
import string | ||
from typing import TYPE_CHECKING | ||
|
||
import httpx | ||
from defusedxml.ElementTree import fromstring | ||
from sqlalchemy.orm import Session | ||
|
||
from domain.logic.student import create_student | ||
from domain.logic.teacher import create_teacher | ||
from domain.logic.user import get_user_with_email | ||
from domain.models.UserDataclass import UserDataclass | ||
|
||
if TYPE_CHECKING: | ||
from _elementtree import Element | ||
|
||
cas_service = os.getenv("CAS_URL", "https://localhost:8080/login") | ||
|
||
|
||
def authenticate_user(session: Session, ticket: str) -> UserDataclass | None: | ||
""" | ||
This function will authenticate the user. | ||
If the use doesn't yet exist in the database, it will create an entry. | ||
a | ||
:param session: Session with the database | ||
:param ticket: A ticket from login.ugent.be/login?service=https://localhost:8080/login | ||
:return: None if the authentication failed, user: UseDataclass is the authentication was successful | ||
""" | ||
allowed_chars = set(string.ascii_letters + string.digits + "-") | ||
if not all(c in allowed_chars for c in ticket): | ||
return None | ||
user_information = httpx.get(f"https://login.ugent.be/serviceValidate?service={cas_service}&ticket={ticket}") | ||
user_dict: dict | None = parse_cas_xml(user_information.text) | ||
if user_dict is None: | ||
return None | ||
|
||
user: UserDataclass | None = get_user_with_email(session, user_dict["email"]) | ||
if user is None: | ||
if user_dict["role"] == "student": | ||
user = create_student(session, user_dict["name"], user_dict["email"]) | ||
elif user_dict["role"] == "teacher": | ||
user = create_teacher(session, user_dict["name"], user_dict["email"]) | ||
return user | ||
|
||
|
||
def parse_cas_xml(xml: str) -> dict | None: | ||
""" | ||
The authentication with CAS returns a xml-object. | ||
This function will read the necessary attributes and return them in a dictionary. | ||
:param xml: str: response xml from CAS | ||
:return: None if the authentication failed else dict | ||
""" | ||
|
||
namespace = "{http://www.yale.edu/tp/cas}" | ||
root: Element | None = fromstring(xml).find(f"{namespace}authenticationSuccess") | ||
if root is None: | ||
return None | ||
user_information: Element | None = root.find(f"{namespace}attributes") | ||
if user_information is None: | ||
return None | ||
givenname: Element | None = user_information.find(f"{namespace}givenname") | ||
surname: Element | None = user_information.find(f"{namespace}surname") | ||
email: Element | None = user_information.find(f"{namespace}mail") | ||
role: list | None = user_information.findall(f"{namespace}objectClass") | ||
if role is not None and givenname is not None and surname is not None and email is not None: | ||
role_str = "" | ||
for r in role: | ||
if r.text == "ugentStudent" and role_str == "": | ||
role_str = "student" | ||
elif r.text == "ugentEmployee": | ||
role_str = "teacher" | ||
|
||
return { | ||
"email": email.text.lower(), | ||
"name": f"{givenname.text} {surname.text}", | ||
"role": role_str, | ||
} | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import contextlib | ||
import os | ||
from datetime import UTC, datetime, timedelta | ||
|
||
import jwt | ||
|
||
from domain.models.UserDataclass import UserDataclass | ||
|
||
# Zeker aanpassen in production | ||
jwt_secret = os.getenv("JWT_SECRET", "secret") | ||
|
||
|
||
def verify_token(token: str) -> int | None: | ||
with contextlib.suppress(jwt.ExpiredSignatureError, jwt.DecodeError): | ||
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) | ||
return payload.get("uid", None) | ||
|
||
|
||
def create_token(user: UserDataclass) -> str: | ||
expire = datetime.now(UTC) + timedelta(days=1) | ||
to_encode: dict = { | ||
"uid": user.id, | ||
"exp": expire, | ||
} | ||
return jwt.encode(to_encode, jwt_secret, algorithm="HS256") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,7 @@ class APIUser(BaseModel): | |
name: str | ||
email: EmailStr | ||
roles: list[Role] | ||
|
||
|
||
class LoginResponse(BaseModel): | ||
token: str |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from fastapi import APIRouter, Depends | ||
from sqlalchemy.orm import Session | ||
|
||
from controllers.auth.authentication_controller import authenticate_user | ||
from controllers.auth.token_controller import create_token | ||
from db.sessions import get_session | ||
from domain.models.APIUser import LoginResponse | ||
from domain.models.UserDataclass import UserDataclass | ||
from routes.errors.authentication import InvalidAuthenticationError | ||
|
||
# test url: https://login.ugent.be/login?service=https://localhost:8080/api/login | ||
login_router = APIRouter() | ||
|
||
|
||
@login_router.get("/login") | ||
def login( | ||
ticket: str, | ||
session: Session = Depends(get_session), | ||
) -> LoginResponse: | ||
""" | ||
This function starts a session for the user. | ||
For authentication, it uses the given ticket and the UGent CAS server (https://login.ugent.be). | ||
:param session: | ||
:param ticket: str: A UGent CAS ticket that will be used for authentication. | ||
:return: | ||
- Valid Ticket: Response: with a JWT token; | ||
- Invalid Ticket: Response: with status_code 401 (unauthenticated) and an error message | ||
""" | ||
user: UserDataclass | None = authenticate_user(session, ticket) | ||
if not user: | ||
raise InvalidAuthenticationError | ||
return LoginResponse(token=create_token(user)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import unittest | ||
|
||
from fastapi.testclient import TestClient | ||
|
||
from app import app | ||
|
||
|
||
class AuthenticationTest(unittest.TestCase): | ||
def setUp(self) -> None: | ||
self.app = TestClient(app) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |