This repository has been archived by the owner on Sep 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
174 additions
and
4,909 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
[session] | ||
service=https://localhost:8080/api/login | ||
cookie_domain=localhost | ||
max_cookie_age=86400 | ||
service=https://localhost:8080/login | ||
access_token_expire_minutes=10 | ||
secret_key=f19a1fb01efac6d7d254065ce1949f0d3b584c867b5625306f0481f64f14471c | ||
algorithm=HS256 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,61 @@ | ||
import logging | ||
|
||
import httpx | ||
from defusedxml.ElementTree import fromstring | ||
from sqlalchemy.orm import Session | ||
|
||
from controllers.properties.Properties import Properties | ||
from db.interface.UserDAO import UserDAO | ||
from domain.logic.student import create_student | ||
from domain.logic.teacher import create_teacher | ||
from domain.logic.user import get_user_with_email | ||
from domain.models.UserDataclass import UserDataclass | ||
|
||
props: Properties = Properties() | ||
|
||
|
||
# TODO: Should return a user object instead of a dict | ||
def authenticate_user(ticket: str) -> dict | None: | ||
def authenticate_user(session: Session, ticket: str) -> UserDataclass | None: | ||
service = props.get("session", "service") | ||
user_information = httpx.get(f"https://login.ugent.be/serviceValidate?service={service}&ticket={ticket}") | ||
user: UserDataclass | None = parse_cas_xml(user_information.text) | ||
user_dict: dict | None = parse_cas_xml(user_information.text) | ||
|
||
if user_dict is None: | ||
return None | ||
|
||
user: UserDataclass | None = get_user_with_email(session, user_dict["email"]) | ||
if user is None: | ||
if user_dict["role"] == "student": | ||
user = create_student(session, user_dict["name"], user_dict["email"]) | ||
elif user_dict["role"] == "teacher": | ||
user = create_teacher(session, user_dict["name"], user_dict["email"]) | ||
return user | ||
|
||
|
||
def parse_cas_xml(xml: str) -> UserDataclass | None: | ||
def parse_cas_xml(xml: str) -> dict | None: | ||
namespace = "{http://www.yale.edu/tp/cas}" | ||
user = {} | ||
|
||
root = fromstring(xml) | ||
if root.find(f"{namespace}authenticationSuccess"): | ||
attributes_xml = (root | ||
.find(f"{namespace}authenticationSuccess") | ||
.find(f"{namespace}attributes")) | ||
|
||
givenname = attributes_xml.find(f"{namespace}givenname").text | ||
surname = attributes_xml.find(f"{namespace}surname").text | ||
mail = attributes_xml.find(f"{namespace}mail").text | ||
|
||
user["name"] = f"{givenname} {surname}" | ||
user["mail"] = mail.lower() | ||
user_dataclass = UserDAO.createUser(user["name"], user["mail"]) | ||
return user | ||
.find(f"{namespace}attributes") | ||
) | ||
|
||
givenname: str = attributes_xml.find(f"{namespace}givenname").text | ||
surname: str = attributes_xml.find(f"{namespace}surname").text | ||
email: str = attributes_xml.find(f"{namespace}mail").text | ||
role: str = attributes_xml.findall(f"{namespace}objectClass") | ||
|
||
# TODO: Checking if there are other roles that need to be added | ||
role_str: str = "" | ||
for r in role: | ||
if r.text == "ugentStudent" and role_str == "": | ||
role_str = "student" | ||
elif r.text == "ugentEmployee": | ||
role_str = "teacher" | ||
|
||
return { | ||
"email": email.lower(), | ||
"name": f"{givenname} {surname}", | ||
"role": role_str, | ||
} | ||
return None |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,28 @@ | ||
import contextlib | ||
from datetime import UTC, datetime, timedelta | ||
|
||
import jwt | ||
|
||
from controllers.properties.Properties import Properties | ||
from domain.models import UserDataclass | ||
from domain.models.UserDataclass import UserDataclass | ||
|
||
props: Properties = Properties() | ||
|
||
|
||
def get_token(user: UserDataclass) -> str: | ||
def verify_token(token: str) -> int | None: | ||
secret = props.get("session", "secret_key") | ||
algorithm = props.get("session", "algorithm") | ||
with contextlib.suppress(jwt.ExpiredSignatureError, jwt.DecodeError): | ||
payload = jwt.decode(token, secret, algorithms=[algorithm]) | ||
return payload.get("userid", None) | ||
|
||
|
||
pass | ||
def create_token(user: UserDataclass) -> str: | ||
exprire = datetime.now(UTC) + timedelta(minutes=int(props.get("session", "access_token_expire_minutes"))) | ||
to_encode: dict = { | ||
"userid": user.id, | ||
"exp": exprire, | ||
} | ||
algorithm: str = props.get("session", "algorithm") | ||
secret: str = props.get("session", "secret_key") | ||
return jwt.encode(to_encode, secret, algorithm=algorithm) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from fastapi import APIRouter, Depends, Response | ||
from sqlalchemy.orm import Session | ||
|
||
from controllers.auth.authentication_controller import authenticate_user | ||
from controllers.auth.token_controller import create_token | ||
from controllers.properties.Properties import Properties | ||
from db.sessions import get_session | ||
from domain.models.UserDataclass import UserDataclass | ||
|
||
# test url: https://login.ugent.be/login?service=https://localhost:8080/api/login | ||
login_router = APIRouter() | ||
props: Properties = Properties() | ||
|
||
|
||
@login_router.get("/login") | ||
def login( | ||
ticket: str, | ||
session: Session = Depends(get_session), | ||
) -> Response: | ||
""" | ||
This function starts a session for the user. | ||
For authentication, it uses the given ticket and the UGent CAS server (https://login.ugent.be). | ||
:param session: | ||
:param ticket: str: A UGent CAS ticket that will be used for authentication. | ||
:return: | ||
- Valid Ticket: Response: with a JWT token; | ||
- Invalid Ticket: Response: with status_code 401 (unauthenticated) and an error message | ||
""" | ||
user: UserDataclass | None = authenticate_user(session, ticket) | ||
if user: | ||
return Response(content=create_token(user)) | ||
return Response(status_code=401, content="Invalid Ticket!") | ||
|
||
|
||
# TODO proper handle logout | ||
@login_router.get("/logout") | ||
def logout() -> Response: | ||
""" | ||
This function will log a user out, by removing the session from storage | ||
:return: A confirmation that the logout was successful, and it tells the browser to remove the cookie. | ||
""" | ||
response: Response = Response(content="You've been successfully logged out") | ||
response.set_cookie("token", "") | ||
return response |
Oops, something went wrong.