Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Web Socket Functionality for the Office Hours Queue and Ticket System #634

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion backend/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,16 @@
import jwt
import requests
from datetime import datetime, timedelta
from fastapi import APIRouter, Header, HTTPException, Request, Response, Depends
from fastapi import (
APIRouter,
Header,
HTTPException,
WebSocketException,
Request,
Response,
Depends,
WebSocket,
)
from fastapi.exceptions import HTTPException
from fastapi.security import HTTPBearer
from fastapi.security.http import HTTPAuthorizationCredentials
Expand Down Expand Up @@ -101,6 +110,21 @@ def registered_user(
raise HTTPException(status_code=401, detail="Unauthorized")


def registered_user_from_websocket(token: str, user_service: UserService) -> User:
"""
Since web sockets cannot use the HTTPBearer dependency, we need to use the websocket to
extract the bearer information.
"""
try:
auth_info = jwt.decode(token, _JWT_SECRET, algorithms=[_JST_ALGORITHM])
user = user_service.get(auth_info["pid"])
if user:
return user
except:
...
raise WebSocketException(status_code=401, detail="Unauthorized")


def authenticated_pid(
token: HTTPAuthorizationCredentials | None = Depends(HTTPBearer()),
) -> tuple[int, str]:
Expand Down
243 changes: 243 additions & 0 deletions backend/api/office_hours/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
"""
Implements the office hours queue using websocket functionality.
"""

from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from enum import Enum
from pydantic import BaseModel
from ..authentication import registered_user_from_websocket
from ...services.office_hours.office_hours import OfficeHoursService
from ...services.office_hours.ticket import OfficeHourTicketService
from ...services.user import UserService
from ...models.user import User
from ...models.office_hours.ticket import NewOfficeHoursTicket

__authors__ = ["Ajay Gandecha"]
__copyright__ = "Copyright 2024"
__license__ = "MIT"

api = APIRouter(prefix="/ws/office-hours")


class UserWebSocketConnection:
"""Stores dat about the user and the web socket they are connected to."""

user: User
socket: WebSocket

def __init__(self, user: User, socket: WebSocket):
self.user = user
self.socket = socket


# Define the websocket connection manager for the queue feature.
class QueueConnectionManager:
"""
Manages all active connections to the office hours web socket and
coordinates broadcasting updates to office hours queues to active
member connections.
"""

def __init__(self):
# This internal field stores all of the active connections,
# indexed by office hours ID (enabling easy and efficient
# lookups and updates based on the office hours event).
self._active_student_connections: dict[int, list[UserWebSocketConnection]] = {}
self._active_staff_connections: dict[int, list[UserWebSocketConnection]] = {}

async def queue_connect(
self, office_hours_id: int, subject: User, websocket: WebSocket
):
"""
Creates a new connection to the web socket by a staff member given
an office hours ID taken as the websocket's query parameter.
"""
# Wait for the new connection to be accepted
await websocket.accept()
# Create the new connection object
connection = UserWebSocketConnection(user=subject, socket=websocket)
# Register the new connection in the map of active connections.
self._active_staff_connections[office_hours_id] = (
self._active_staff_connections.get(office_hours_id, []) + [connection]
)

async def get_help_connect(
self, office_hours_id: int, subject: User, websocket: WebSocket
):
"""
Creates a new connection to the web socket by a student given
an office hours ID taken as the websocket's query parameter.
"""
# Wait for the new connection to be accepted
await websocket.accept()
# Create the new connection object
connection = UserWebSocketConnection(user=subject, socket=websocket)
# Register the new connection in the map of active connections.
self._active_student_connections[office_hours_id] = (
self._active_student_connections.get(office_hours_id, []) + [connection]
)

def queue_disconnect(self, office_hours_id: int, websocket: WebSocket):
"""
Removes a staff's web socket connection from the list of active connections.
"""
# Remove the web socket based on the providd office hours ID
self._active_staff_connections[office_hours_id] = [
connection
for connection in self._active_staff_connections.get(office_hours_id, [])
if connection.socket != websocket
]

def get_help_disconnect(self, office_hours_id: int, websocket: WebSocket):
"""
Removes a student's web socket connection from the list of active connections.
"""
# Remove the web socket based on the providd office hours ID
self._active_student_connections[office_hours_id] = [
connection
for connection in self._active_student_connections.get(office_hours_id, [])
if connection.socket != websocket
]

async def broadcast_queue_changes(
self, office_hours_id: int, oh_event_svc: OfficeHoursService
):
"""
Broadcasts changes in the queue for a given office hours ID to all active
connections for that office hours event.
"""
# Send new queue to all staff
for connection in self._active_staff_connections.get(office_hours_id, []):
queue = oh_event_svc.get_office_hour_queue(connection.user, office_hours_id)
await connection.socket.send_json(queue.model_dump_json())

# Send new queue data to all students
for connection in self._active_student_connections.get(office_hours_id, []):
overview = oh_event_svc.get_office_hour_get_help_overview(
connection.user, office_hours_id
)
await connection.socket.send_json(overview.model_dump_json())


# Create the queue connection manager object
manager = QueueConnectionManager()

# Define the queue websockets


class QueueWebSocketAction(Enum):
"""Define the specific actions that staff can take."""

CALL = "CALL"
CLOSE = "CLOSE"
CANCEL = "CANCEL"


class QueueWebSocketData(BaseModel):
"""Model to represent the data sent to the queue websocket."""

action: QueueWebSocketAction
id: int


@api.websocket("/{office_hours_id}/queue")
async def queue_websocket(
websocket: WebSocket,
office_hours_id: int,
oh_ticket_svc: OfficeHourTicketService = Depends(),
oh_event_svc: OfficeHoursService = Depends(),
user_svc: UserService = Depends(),
):
# Try to load the current user.
token = websocket.query_params.get("token")
subject = registered_user_from_websocket(token, user_svc)
# Connect the new websocket connection to the manager.
await manager.queue_connect(office_hours_id, subject, websocket)
# Send the initial queue data.
queue = oh_event_svc.get_office_hour_queue(subject, office_hours_id)
await websocket.send_json(queue.model_dump_json())
try:
# Await receiving new data.
while True:
# When new data is sent to the websocket, read it as JSON
# and cast to the correct data model with Pydantic.
json_data = await websocket.receive_json()
data = QueueWebSocketData.model_validate(json_data)
# Depending on the type of request, call the respective
# manager actions.
if data.action == QueueWebSocketAction.CALL:
# Talls a ticket
oh_ticket_svc.call_ticket(subject, data.id)
# Broadcast the changes using the mamanger.
await manager.broadcast_queue_changes(office_hours_id, oh_event_svc)
elif data.action == QueueWebSocketAction.CLOSE:
# Close a ticket
oh_ticket_svc.close_ticket(subject, data.id)
# Broadcast the changes using the mamanger.
await manager.broadcast_queue_changes(office_hours_id, oh_event_svc)
elif data.action == QueueWebSocketAction.CANCEL:
# Close a ticket
oh_ticket_svc.cancel_ticket(subject, data.id)
# Broadcast the changes using the mamanger.
await manager.broadcast_queue_changes(office_hours_id, oh_event_svc)
except WebSocketDisconnect:
# When the websocket disconnects, remove the connection
# using the manager.
manager.get_help_disconnect(office_hours_id, websocket)


class GetHelpWebSocketAction(Enum):
"""Define the specific actions that students can take."""

CREATE = "CREATE"
CANCEL = "CANCEL"


class GetHelpWebSocketData(BaseModel):
"""Model to represent the data sent to the get help websocket."""

action: GetHelpWebSocketAction
id: int | None
new_ticket: NewOfficeHoursTicket | None


@api.websocket("/{office_hours_id}/get-help")
async def get_help_websocket(
websocket: WebSocket,
office_hours_id: int,
oh_ticket_svc: OfficeHourTicketService = Depends(),
oh_event_svc: OfficeHoursService = Depends(),
user_svc: UserService = Depends(),
):
# Try to load the current user.
token = websocket.query_params.get("token")
subject = registered_user_from_websocket(token, user_svc)
# Connect the new websocket connection to the manager.
await manager.get_help_connect(office_hours_id, subject, websocket)
# Send the initial data.
overview = oh_event_svc.get_office_hour_get_help_overview(subject, office_hours_id)
await websocket.send_json(overview.model_dump_json())

try:
# Await receiving new data.
while True:
# When new data is sent to the websocket, read it as JSON
# and cast to the correct data model with Pydantic.
json_data = await websocket.receive_json()
data = GetHelpWebSocketData.model_validate(json_data)
# Depending on the type of request, call the respective
# manager actions.
if data.action == GetHelpWebSocketAction.CREATE and data.new_ticket:
# Create a new ticket
oh_ticket_svc.create_ticket(subject, data.new_ticket)
# Broadcast the changes using the mamanger.
await manager.broadcast_queue_changes(office_hours_id, oh_event_svc)
elif data.action == GetHelpWebSocketAction.CANCEL:
# Close a ticket
oh_ticket_svc.cancel_ticket(subject, data.id)
# Broadcast the changes using the mamanger.
await manager.broadcast_queue_changes(office_hours_id, oh_event_svc)
except WebSocketDisconnect:
# When the websocket disconnects, remove the connection
# using the manager.
manager.get_help_disconnect(office_hours_id, websocket)
8 changes: 8 additions & 0 deletions backend/api/static_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@
import os

from fastapi.staticfiles import StaticFiles
from fastapi.websockets import WebSocket


class StaticFileMiddleware(StaticFiles):
def __init__(self, directory: os.PathLike, index: str = "index.html") -> None:
self.index = index
super().__init__(directory=directory, packages=None, html=True, check_dir=True)

async def __call__(self, scope, receive, send): # type: ignore
if scope["type"] != "websocket":
websocket = WebSocket(scope, receive=receive, send=send)
await websocket.accept()
else:
return await super().__call__(scope, receive, send)

def lookup_path(self, path: str) -> tuple[str, os.stat_result | None]:
"""Returns the index file when no match is found.

Expand Down
2 changes: 2 additions & 0 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .api.office_hours import (
office_hours as office_hours_event,
ticket as office_hours_ticket,
queue as office_hours_queue,
)
from .api.admin import users as admin_users
from .api.admin import roles as admin_roles
Expand Down Expand Up @@ -96,6 +97,7 @@
health,
office_hours_event,
office_hours_ticket,
office_hours_queue,
hiring,
admin_facts,
article,
Expand Down
Loading