diff --git a/backend/api/authentication.py b/backend/api/authentication.py index 8f568069a..b058728e7 100644 --- a/backend/api/authentication.py +++ b/backend/api/authentication.py @@ -61,7 +61,16 @@ import jwt import requests from datetime import datetime, timedelta -from fastapi import APIRouter, Header, HTTPException, Request, Response, Depends +from fastapi import ( + APIRouter, + Header, + HTTPException, + WebSocketException, + Request, + Response, + Depends, + WebSocket, +) from fastapi.exceptions import HTTPException from fastapi.security import HTTPBearer from fastapi.security.http import HTTPAuthorizationCredentials @@ -101,6 +110,21 @@ def registered_user( raise HTTPException(status_code=401, detail="Unauthorized") +def registered_user_from_websocket(token: str, user_service: UserService) -> User: + """ + Since web sockets cannot use the HTTPBearer dependency, we need to use the websocket to + extract the bearer information. + """ + try: + auth_info = jwt.decode(token, _JWT_SECRET, algorithms=[_JST_ALGORITHM]) + user = user_service.get(auth_info["pid"]) + if user: + return user + except: + ... + raise WebSocketException(status_code=401, detail="Unauthorized") + + def authenticated_pid( token: HTTPAuthorizationCredentials | None = Depends(HTTPBearer()), ) -> tuple[int, str]: diff --git a/backend/api/office_hours/queue.py b/backend/api/office_hours/queue.py new file mode 100644 index 000000000..d6367c67c --- /dev/null +++ b/backend/api/office_hours/queue.py @@ -0,0 +1,243 @@ +""" +Implements the office hours queue using websocket functionality. +""" + +from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect +from enum import Enum +from pydantic import BaseModel +from ..authentication import registered_user_from_websocket +from ...services.office_hours.office_hours import OfficeHoursService +from ...services.office_hours.ticket import OfficeHourTicketService +from ...services.user import UserService +from ...models.user import User +from ...models.office_hours.ticket import NewOfficeHoursTicket + +__authors__ = ["Ajay Gandecha"] +__copyright__ = "Copyright 2024" +__license__ = "MIT" + +api = APIRouter(prefix="/ws/office-hours") + + +class UserWebSocketConnection: + """Stores dat about the user and the web socket they are connected to.""" + + user: User + socket: WebSocket + + def __init__(self, user: User, socket: WebSocket): + self.user = user + self.socket = socket + + +# Define the websocket connection manager for the queue feature. +class QueueConnectionManager: + """ + Manages all active connections to the office hours web socket and + coordinates broadcasting updates to office hours queues to active + member connections. + """ + + def __init__(self): + # This internal field stores all of the active connections, + # indexed by office hours ID (enabling easy and efficient + # lookups and updates based on the office hours event). + self._active_student_connections: dict[int, list[UserWebSocketConnection]] = {} + self._active_staff_connections: dict[int, list[UserWebSocketConnection]] = {} + + async def queue_connect( + self, office_hours_id: int, subject: User, websocket: WebSocket + ): + """ + Creates a new connection to the web socket by a staff member given + an office hours ID taken as the websocket's query parameter. + """ + # Wait for the new connection to be accepted + await websocket.accept() + # Create the new connection object + connection = UserWebSocketConnection(user=subject, socket=websocket) + # Register the new connection in the map of active connections. + self._active_staff_connections[office_hours_id] = ( + self._active_staff_connections.get(office_hours_id, []) + [connection] + ) + + async def get_help_connect( + self, office_hours_id: int, subject: User, websocket: WebSocket + ): + """ + Creates a new connection to the web socket by a student given + an office hours ID taken as the websocket's query parameter. + """ + # Wait for the new connection to be accepted + await websocket.accept() + # Create the new connection object + connection = UserWebSocketConnection(user=subject, socket=websocket) + # Register the new connection in the map of active connections. + self._active_student_connections[office_hours_id] = ( + self._active_student_connections.get(office_hours_id, []) + [connection] + ) + + def queue_disconnect(self, office_hours_id: int, websocket: WebSocket): + """ + Removes a staff's web socket connection from the list of active connections. + """ + # Remove the web socket based on the providd office hours ID + self._active_staff_connections[office_hours_id] = [ + connection + for connection in self._active_staff_connections.get(office_hours_id, []) + if connection.socket != websocket + ] + + def get_help_disconnect(self, office_hours_id: int, websocket: WebSocket): + """ + Removes a student's web socket connection from the list of active connections. + """ + # Remove the web socket based on the providd office hours ID + self._active_student_connections[office_hours_id] = [ + connection + for connection in self._active_student_connections.get(office_hours_id, []) + if connection.socket != websocket + ] + + async def broadcast_queue_changes( + self, office_hours_id: int, oh_event_svc: OfficeHoursService + ): + """ + Broadcasts changes in the queue for a given office hours ID to all active + connections for that office hours event. + """ + # Send new queue to all staff + for connection in self._active_staff_connections.get(office_hours_id, []): + queue = oh_event_svc.get_office_hour_queue(connection.user, office_hours_id) + await connection.socket.send_json(queue.model_dump_json()) + + # Send new queue data to all students + for connection in self._active_student_connections.get(office_hours_id, []): + overview = oh_event_svc.get_office_hour_get_help_overview( + connection.user, office_hours_id + ) + await connection.socket.send_json(overview.model_dump_json()) + + +# Create the queue connection manager object +manager = QueueConnectionManager() + +# Define the queue websockets + + +class QueueWebSocketAction(Enum): + """Define the specific actions that staff can take.""" + + CALL = "CALL" + CLOSE = "CLOSE" + CANCEL = "CANCEL" + + +class QueueWebSocketData(BaseModel): + """Model to represent the data sent to the queue websocket.""" + + action: QueueWebSocketAction + id: int + + +@api.websocket("/{office_hours_id}/queue") +async def queue_websocket( + websocket: WebSocket, + office_hours_id: int, + oh_ticket_svc: OfficeHourTicketService = Depends(), + oh_event_svc: OfficeHoursService = Depends(), + user_svc: UserService = Depends(), +): + # Try to load the current user. + token = websocket.query_params.get("token") + subject = registered_user_from_websocket(token, user_svc) + # Connect the new websocket connection to the manager. + await manager.queue_connect(office_hours_id, subject, websocket) + # Send the initial queue data. + queue = oh_event_svc.get_office_hour_queue(subject, office_hours_id) + await websocket.send_json(queue.model_dump_json()) + try: + # Await receiving new data. + while True: + # When new data is sent to the websocket, read it as JSON + # and cast to the correct data model with Pydantic. + json_data = await websocket.receive_json() + data = QueueWebSocketData.model_validate(json_data) + # Depending on the type of request, call the respective + # manager actions. + if data.action == QueueWebSocketAction.CALL: + # Talls a ticket + oh_ticket_svc.call_ticket(subject, data.id) + # Broadcast the changes using the mamanger. + await manager.broadcast_queue_changes(office_hours_id, oh_event_svc) + elif data.action == QueueWebSocketAction.CLOSE: + # Close a ticket + oh_ticket_svc.close_ticket(subject, data.id) + # Broadcast the changes using the mamanger. + await manager.broadcast_queue_changes(office_hours_id, oh_event_svc) + elif data.action == QueueWebSocketAction.CANCEL: + # Close a ticket + oh_ticket_svc.cancel_ticket(subject, data.id) + # Broadcast the changes using the mamanger. + await manager.broadcast_queue_changes(office_hours_id, oh_event_svc) + except WebSocketDisconnect: + # When the websocket disconnects, remove the connection + # using the manager. + manager.get_help_disconnect(office_hours_id, websocket) + + +class GetHelpWebSocketAction(Enum): + """Define the specific actions that students can take.""" + + CREATE = "CREATE" + CANCEL = "CANCEL" + + +class GetHelpWebSocketData(BaseModel): + """Model to represent the data sent to the get help websocket.""" + + action: GetHelpWebSocketAction + id: int | None + new_ticket: NewOfficeHoursTicket | None + + +@api.websocket("/{office_hours_id}/get-help") +async def get_help_websocket( + websocket: WebSocket, + office_hours_id: int, + oh_ticket_svc: OfficeHourTicketService = Depends(), + oh_event_svc: OfficeHoursService = Depends(), + user_svc: UserService = Depends(), +): + # Try to load the current user. + token = websocket.query_params.get("token") + subject = registered_user_from_websocket(token, user_svc) + # Connect the new websocket connection to the manager. + await manager.get_help_connect(office_hours_id, subject, websocket) + # Send the initial data. + overview = oh_event_svc.get_office_hour_get_help_overview(subject, office_hours_id) + await websocket.send_json(overview.model_dump_json()) + + try: + # Await receiving new data. + while True: + # When new data is sent to the websocket, read it as JSON + # and cast to the correct data model with Pydantic. + json_data = await websocket.receive_json() + data = GetHelpWebSocketData.model_validate(json_data) + # Depending on the type of request, call the respective + # manager actions. + if data.action == GetHelpWebSocketAction.CREATE and data.new_ticket: + # Create a new ticket + oh_ticket_svc.create_ticket(subject, data.new_ticket) + # Broadcast the changes using the mamanger. + await manager.broadcast_queue_changes(office_hours_id, oh_event_svc) + elif data.action == GetHelpWebSocketAction.CANCEL: + # Close a ticket + oh_ticket_svc.cancel_ticket(subject, data.id) + # Broadcast the changes using the mamanger. + await manager.broadcast_queue_changes(office_hours_id, oh_event_svc) + except WebSocketDisconnect: + # When the websocket disconnects, remove the connection + # using the manager. + manager.get_help_disconnect(office_hours_id, websocket) diff --git a/backend/api/static_files.py b/backend/api/static_files.py index f98500667..08c689477 100644 --- a/backend/api/static_files.py +++ b/backend/api/static_files.py @@ -12,6 +12,7 @@ import os from fastapi.staticfiles import StaticFiles +from fastapi.websockets import WebSocket class StaticFileMiddleware(StaticFiles): @@ -19,6 +20,13 @@ def __init__(self, directory: os.PathLike, index: str = "index.html") -> None: self.index = index super().__init__(directory=directory, packages=None, html=True, check_dir=True) + async def __call__(self, scope, receive, send): # type: ignore + if scope["type"] != "websocket": + websocket = WebSocket(scope, receive=receive, send=send) + await websocket.accept() + else: + return await super().__call__(scope, receive, send) + def lookup_path(self, path: str) -> tuple[str, os.stat_result | None]: """Returns the index file when no match is found. diff --git a/backend/main.py b/backend/main.py index 2b2beace8..b454c18be 100644 --- a/backend/main.py +++ b/backend/main.py @@ -25,6 +25,7 @@ from .api.office_hours import ( office_hours as office_hours_event, ticket as office_hours_ticket, + queue as office_hours_queue, ) from .api.admin import users as admin_users from .api.admin import roles as admin_roles @@ -96,6 +97,7 @@ health, office_hours_event, office_hours_ticket, + office_hours_queue, hiring, admin_facts, article, diff --git a/frontend/src/app/my-courses/course/office-hours/office-hours-get-help/office-hours-get-help.component.ts b/frontend/src/app/my-courses/course/office-hours/office-hours-get-help/office-hours-get-help.component.ts index b5cefb2ed..fd3545c53 100644 --- a/frontend/src/app/my-courses/course/office-hours/office-hours-get-help/office-hours-get-help.component.ts +++ b/frontend/src/app/my-courses/course/office-hours/office-hours-get-help/office-hours-get-help.component.ts @@ -17,13 +17,19 @@ import { officeHourPageGuard } from '../office-hours.guard'; import { ActivatedRoute } from '@angular/router'; import { MyCoursesService } from 'src/app/my-courses/my-courses.service'; import { + GetHelpWebSocketAction, + GetHelpWebSocketData, OfficeHourGetHelpOverview, + OfficeHourGetHelpOverviewJson, OfficeHourTicketOverview, + parseOfficeHourGetHelpOverviewJson, TicketDraft } from 'src/app/my-courses/my-courses.model'; import { Subscription, timer } from 'rxjs'; +import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { FormBuilder, FormControl, Validators } from '@angular/forms'; import { MatSnackBar } from '@angular/material/snack-bar'; +import { LocationStrategy } from '@angular/common'; @Component({ selector: 'app-office-hours-get-help', @@ -60,6 +66,9 @@ export class OfficeHoursGetHelpComponent implements OnInit, OnDestroy { link: new FormControl('', [Validators.required]) }); + /** Connection to the office hours get help websocket */ + webSocketSubject$: WebSocketSubject; + constructor( private route: ActivatedRoute, protected formBuilder: FormBuilder, @@ -68,28 +77,46 @@ export class OfficeHoursGetHelpComponent implements OnInit, OnDestroy { ) { // Load information from the parent route this.ohEventId = this.route.snapshot.params['event_id']; + // Load the web socket connection + const url = `ws://localhost:1561/ws/office-hours/${this.ohEventId}/get-help?token=${localStorage.getItem('bearerToken')}`; + console.log(url); + this.webSocketSubject$ = webSocket({ + url: url + }); } - /** Create a timer subscription to poll office hour data at an interval at view initalization */ ngOnInit(): void { - this.timer = timer(0, 10000).subscribe(() => { - this.pollData(); + this.webSocketSubject$.subscribe((value) => { + const json: OfficeHourGetHelpOverviewJson = JSON.parse(value); + const overview = parseOfficeHourGetHelpOverviewJson(json); + this.data.set(overview); }); } - /** Remove the timer subscription when the view is destroyed so polling does not persist on other pages */ ngOnDestroy(): void { - this.timer.unsubscribe(); + this.webSocketSubject$.complete(); } + /** Create a timer subscription to poll office hour data at an interval at view initalization */ + // ngOnInit(): void { + // this.timer = timer(0, 10000).subscribe(() => { + // this.pollData(); + // }); + // } + + /** Remove the timer subscription when the view is destroyed so polling does not persist on other pages */ + // ngOnDestroy(): void { + // this.timer.unsubscribe(); + // } + /** Loads office hours data */ - pollData(): void { - this.myCoursesService - .getOfficeHoursHelpOverview(this.ohEventId) - .subscribe((getHelpData) => { - this.data.set(getHelpData); - }); - } + // pollData(): void { + // this.myCoursesService + // .getOfficeHoursHelpOverview(this.ohEventId) + // .subscribe((getHelpData) => { + // this.data.set(getHelpData); + // }); + // } isFormValid(): boolean { let contentFieldsValid = @@ -109,13 +136,19 @@ export class OfficeHoursGetHelpComponent implements OnInit, OnDestroy { /** Cancels a ticket and reloads the queue data */ cancelTicket(ticket: OfficeHourTicketOverview): void { - this.myCoursesService.cancelTicket(ticket.id).subscribe({ - next: (_) => { - this.pollData(); - this.snackBar.open('Ticket cancelled', '', { duration: 5000 }); - }, - error: (err) => this.snackBar.open(err, '', { duration: 2000 }) - }); + let action: GetHelpWebSocketData = { + action: GetHelpWebSocketAction.CANCEL, + id: ticket.id, + new_ticket: null + }; + this.webSocketSubject$.next(action); + // this.myCoursesService.cancelTicket(ticket.id).subscribe({ + // next: (_) => { + // this.pollData(); + // this.snackBar.open('Ticket cancelled', '', { duration: 5000 }); + // }, + // error: (err) => this.snackBar.open(err, '', { duration: 2000 }) + // }); } submitTicketForm() { @@ -157,15 +190,24 @@ export class OfficeHoursGetHelpComponent implements OnInit, OnDestroy { type: form_type }; - this.myCoursesService.createTicket(ticketDraft).subscribe({ - next: (_) => { - this.pollData(); - }, - error: (_) => { - this.snackBar.open(`Could not create a ticket at this time.`, '', { - duration: 2000 - }); - } - }); + // Create the web socket object + const action: GetHelpWebSocketData = { + action: GetHelpWebSocketAction.CREATE, + id: null, + new_ticket: ticketDraft + }; + + this.webSocketSubject$.next(action); + + // this.myCoursesService.createTicket(ticketDraft).subscribe({ + // next: (_) => { + // this.pollData(); + // }, + // error: (_) => { + // this.snackBar.open(`Could not create a ticket at this time.`, '', { + // duration: 2000 + // }); + // } + // }); } } diff --git a/frontend/src/app/my-courses/course/office-hours/office-hours-queue/office-hours-queue.component.ts b/frontend/src/app/my-courses/course/office-hours/office-hours-queue/office-hours-queue.component.ts index ae488adb1..408b8240e 100644 --- a/frontend/src/app/my-courses/course/office-hours/office-hours-queue/office-hours-queue.component.ts +++ b/frontend/src/app/my-courses/course/office-hours/office-hours-queue/office-hours-queue.component.ts @@ -18,10 +18,15 @@ import { ActivatedRoute } from '@angular/router'; import { Subscription, timer } from 'rxjs'; import { OfficeHourQueueOverview, - OfficeHourTicketOverview + OfficeHourQueueOverviewJson, + OfficeHourTicketOverview, + parseOfficeHourQueueOverview, + QueueWebSocketAction, + QueueWebSocketData } from 'src/app/my-courses/my-courses.model'; import { MyCoursesService } from 'src/app/my-courses/my-courses.service'; import { officeHourPageGuard } from '../office-hours.guard'; +import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; @Component({ selector: 'app-office-hours-queue', @@ -47,6 +52,9 @@ export class OfficeHoursQueueComponent implements OnInit, OnDestroy { /** Stores subscription to the timer observable that refreshes data every 10s */ timer!: Subscription; + /** Connection to the office hours get help websocket */ + webSocketSubject$: WebSocketSubject; + constructor( private route: ActivatedRoute, private snackBar: MatSnackBar, @@ -54,50 +62,87 @@ export class OfficeHoursQueueComponent implements OnInit, OnDestroy { ) { // Load information from the parent route this.ohEventId = this.route.snapshot.params['event_id']; + // Load the web socket + this.webSocketSubject$ = webSocket({ + url: `ws://localhost:1561/ws/office-hours/${this.ohEventId}/queue?token=${localStorage.getItem('bearerToken')}` + }); } - /** Create a timer subscription to poll office hour queue data at an interval at view initalization */ ngOnInit(): void { - this.timer = timer(0, 10000).subscribe(() => { - this.pollQueue(); + this.webSocketSubject$.subscribe((value) => { + const json: OfficeHourQueueOverviewJson = JSON.parse(value); + const overview = parseOfficeHourQueueOverview(json); + this.queue.set(overview); }); } - /** Remove the timer subscription when the view is destroyed so polling does not persist on other pages */ ngOnDestroy(): void { - this.timer.unsubscribe(); + this.webSocketSubject$.complete(); } + /** Create a timer subscription to poll office hour queue data at an interval at view initalization */ + // ngOnInit(): void { + // this.timer = timer(0, 10000).subscribe(() => { + // this.pollQueue(); + // }); + // } + + /** Remove the timer subscription when the view is destroyed so polling does not persist on other pages */ + // ngOnDestroy(): void { + // this.timer.unsubscribe(); + // } + /** Loads office hours queue data */ - pollQueue(): void { - this.myCoursesService - .getOfficeHoursQueue(this.ohEventId) - .subscribe((queue) => { - this.queue.set(queue); - }); - } + // pollQueue(): void { + // this.myCoursesService + // .getOfficeHoursQueue(this.ohEventId) + // .subscribe((queue) => { + // this.queue.set(queue); + // }); + // } /** Calls a ticket and reloads the queue data */ callTicket(ticket: OfficeHourTicketOverview): void { - this.myCoursesService.callTicket(ticket.id).subscribe({ - next: (_) => this.pollQueue(), - error: (err) => this.snackBar.open(err, '', { duration: 2000 }) - }); + // Create the web socket object + const action: QueueWebSocketData = { + action: QueueWebSocketAction.CALL, + id: ticket.id + }; + this.webSocketSubject$.next(action); + + // this.myCoursesService.callTicket(ticket.id).subscribe({ + // next: (_) => this.pollQueue(), + // error: (err) => this.snackBar.open(err, '', { duration: 2000 }) + // }); } /** Cancels a ticket and reloads the queue data */ cancelTicket(ticket: OfficeHourTicketOverview): void { - this.myCoursesService.cancelTicket(ticket.id).subscribe({ - next: (_) => this.pollQueue(), - error: (err) => this.snackBar.open(err, '', { duration: 2000 }) - }); + // Create the web socket object + const action: QueueWebSocketData = { + action: QueueWebSocketAction.CANCEL, + id: ticket.id + }; + this.webSocketSubject$.next(action); + + // this.myCoursesService.cancelTicket(ticket.id).subscribe({ + // next: (_) => this.pollQueue(), + // error: (err) => this.snackBar.open(err, '', { duration: 2000 }) + // }); } /** Closes a ticket and reloads the queue data */ closeTicket(ticket: OfficeHourTicketOverview): void { - this.myCoursesService.closeTicket(ticket.id).subscribe({ - next: (_) => this.pollQueue(), - error: (err) => this.snackBar.open(err, '', { duration: 2000 }) - }); + // Create the web socket object + const action: QueueWebSocketData = { + action: QueueWebSocketAction.CLOSE, + id: ticket.id + }; + this.webSocketSubject$.next(action); + + // this.myCoursesService.closeTicket(ticket.id).subscribe({ + // next: (_) => this.pollQueue(), + // error: (err) => this.snackBar.open(err, '', { duration: 2000 }) + // }); } } diff --git a/frontend/src/app/my-courses/my-courses.model.ts b/frontend/src/app/my-courses/my-courses.model.ts index 19863616a..031f28b98 100644 --- a/frontend/src/app/my-courses/my-courses.model.ts +++ b/frontend/src/app/my-courses/my-courses.model.ts @@ -273,6 +273,28 @@ export interface OfficeHours { room_id: string; } +export enum QueueWebSocketAction { + CALL = 'CALL', + CLOSE = 'CLOSE', + CANCEL = 'CANCEL' +} + +export interface QueueWebSocketData { + action: QueueWebSocketAction; + id: number; +} + +export enum GetHelpWebSocketAction { + CREATE = 'CREATE', + CANCEL = 'CANCEL' +} + +export interface GetHelpWebSocketData { + action: GetHelpWebSocketAction; + id: number | null; + new_ticket: TicketDraft | null; +} + /** * Function that converts an TermOverviewJson response model to a * TermOverview model.