Skip to content

Commit

Permalink
Merge pull request #98 from CS3219-AY2425S1/kervyn/abort-match
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnetteeee authored Nov 13, 2024
2 parents 079d24a + 5c14e7d commit a40b8b0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 31 deletions.
31 changes: 27 additions & 4 deletions message-queue/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const addDataToCollabExchange = (data: CollabExchangeData, key: string) => {

let waitingUsers: { [key: string]: (data: any) => void } = {}

let isStreamClosed = false;

const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: any[] }> => {
let changeStream: ChangeStream = null
try {
Expand Down Expand Up @@ -108,16 +110,18 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
const timer = setInterval(async () => {
console.log("Step 4: Timeout reached, removing user from queue")
await db.collection("usersQueue").deleteOne({ user_id: userData.user_id })
resolve({ matchedUsers: [] })
delete waitingUsers[userData.user_id]

// Close change stream on timeout
if (changeStream) {
if (changeStream && !isStreamClosed) {
try {
console.log("Closing change stream on timeout")
changeStream.close()
isStreamClosed = true;
} catch (e) {
console.error(e)
} finally {
resolve({ matchedUsers: [] })
return { matchedUsers: [] }
}
}
Expand All @@ -127,8 +131,8 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
changeStream = db.collection("usersQueue").watch()
changeStream.on("change", async (change) => {
console.log("Step 5: Change detected", change.operationType)

if (change.operationType === "insert") {
isStreamClosed = false;
if (change.operationType === "insert" && !isStreamClosed) {
const newUser = change.fullDocument

if (
Expand Down Expand Up @@ -158,6 +162,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream after match")
changeStream.close()
isStreamClosed = true;
}
}
} else {
Expand All @@ -166,6 +171,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream after delete event")
changeStream.close()
isStreamClosed = true;
}
}
})
Expand All @@ -177,6 +183,7 @@ const matchUsers = async (userData: any, key: string): Promise<{ matchedUsers: a
if (changeStream) {
console.log("Closing change stream on error")
changeStream.close()
isStreamClosed = true;
}

return { matchedUsers: [] }
Expand Down Expand Up @@ -243,6 +250,22 @@ app.post("/match", async (req: Request, res: Response) => {
}
})

app.post("/match/cancel", async (req: Request, res: Response) => {
const user_id = req.body.user_id;
if (user_id) {
delete waitingUsers[user_id]
await db.collection("usersQueue").deleteOne({ user_id: user_id })

res.status(200).json({
message: "User successfully removed from waiting queue."
})
} else {
res.status(400).json({
message: "Invalid user_id provided. Please try again."
})
}
})

const port = process.env.PORT || 3002;
app.listen(port, () => {
console.log(`Matching service running on port ${port}.`);
Expand Down
7 changes: 7 additions & 0 deletions peer-prep-fe/src/app/models/match.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ export interface MatchRequest {
key: string; // easy_queue, medium_queue, hard_queue
}

export interface MatchRequestCancel {
user_id: string
}

export interface MatchResponse {
matchedUsers: UserData[]; // 0 or 2 - is an array of UserDatas
sessionId: string;
timeout: boolean;
}

export interface MatchRequestCancelResponse {
message: string
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ export class MatchModalComponent implements OnInit, OnDestroy {
}

// TODO: HANDLE CANCEL MATCH - SYNC W MATCHING SVC BE PEEPS @KERVYN @IVAN
cancelMatch() {
async cancelMatch() {
this.isVisible = false;
await this.matchService.cancelMatchRequest(this.userData.user_id);
if (this.countdownSubscription) {
this.countdownSubscription.unsubscribe();
}
Expand Down
63 changes: 37 additions & 26 deletions peer-prep-fe/src/services/match.service.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
import { CategoryService } from '../services/category.service';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { Injectable} from '@angular/core';
import {MatchRequest, MatchResponse} from '../app/models/match.model';
import { UserData } from '../types/userdata';
import { baseUrlProduction } from '../../constants';
import { HttpClient, HttpHeaders } from "@angular/common/http"
import { Injectable } from "@angular/core"
import { lastValueFrom, Observable } from "rxjs"

import { lastValueFrom, Observable } from 'rxjs';
import { baseUrlProduction } from "../../constants"
import { MatchRequest, MatchRequestCancel, MatchRequestCancelResponse, MatchResponse } from "../app/models/match.model"
import { CategoryService } from "../services/category.service"
import { UserData } from "../types/userdata"

@Injectable({
providedIn: 'root'
providedIn: "root"
})

export class MatchService {
private apiUrl = this.isProduction() ? `${baseUrlProduction}/match` : 'http://localhost:3002/match';
private apiUrl = this.isProduction() ? `${baseUrlProduction}/match` : "http://localhost:3002/match"

constructor(private http: HttpClient) {}

isProduction(): boolean {
return window.location.hostname !== "localhost"
}

constructor(private http: HttpClient) {}
// send user data and difficulty to rabbitMQ (match request)
// this is a post request, which returns a response
async sendMatchRequest(userData: UserData, queueName: string): Promise<MatchResponse> {
const matchRequest: MatchRequest = {
userData,
key: queueName
}
// return lastValueFrom(this.http.post<MatchResponse>(`${this.apiUrl}`, matchRequest));
const response = await this.http.post<MatchResponse>(`${this.apiUrl}`, matchRequest).toPromise()
if (!response) {
throw new Error("Match response is undefined")
}
return response
}

isProduction (): boolean {
return window.location.hostname !== 'localhost';
async cancelMatchRequest(user_id: string) {
const cancelMatchRequest: MatchRequestCancel = {
user_id
}

// send user data and difficulty to rabbitMQ (match request)
// this is a post request, which returns a response
async sendMatchRequest(userData: UserData, queueName: string): Promise<MatchResponse> {
const matchRequest: MatchRequest = {
userData,
key: queueName
}
// return lastValueFrom(this.http.post<MatchResponse>(`${this.apiUrl}`, matchRequest));
const response = await this.http.post<MatchResponse>(`${this.apiUrl}`, matchRequest).toPromise();
if (!response) {
throw new Error('Match response is undefined');
}
return response;
const response = await this.http.post<MatchRequestCancelResponse>(`${this.apiUrl}/cancel`, cancelMatchRequest).toPromise()
if (!response) {
throw new Error("Given user_id is invalid, request cancelled unsuccessfully")
}
return response;
}
}

0 comments on commit a40b8b0

Please sign in to comment.