Skip to content

Commit

Permalink
Merge pull request #45 from fahim-tazz/add-match-service
Browse files Browse the repository at this point in the history
Add match service
  • Loading branch information
fahim-tazz authored Oct 20, 2024
2 parents b429426 + d95ecd3 commit be925fc
Show file tree
Hide file tree
Showing 7 changed files with 1,400 additions and 60 deletions.
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


### API Endpoints
#### Question Service
The question microservice exposes the following API endpoints:
- ```questions/all``` - Returns all the questions available on the database.
- ```questions/byId/[Question ID]``` - Returns a single question by ID number.
Expand All @@ -13,8 +14,49 @@ The question microservice exposes the following API endpoints:
- ```questions/delete/[Question ID]``` - Deletes a question from the database by ID number.
- ```questions/update/[Question ID]``` - Updates a question from the database by ID number.
- ```questions/patch/[Question ID]``` - Patches a question from the database by ID number.
The matching microservice exposes the following API endpoints:
- ```matching/produce``` - Produce a message to Kafka.

#### Matching Service
The matching microservice exposes the following API endpoint (via a WebSocket connection):
- ```/``` - Persistently opens a connection to the service.

Message Protocol (JSON):

**Note**: WebSocket only sends strings, so use ```JSON.stringify()``` to convert your JS request objects into strings (and ```JSON.parse()``` to parse responses) when communicating via WebSocket.<br>

1. To enqueue a user:
```
{
event: "enqueue",
userId: <ID of the user>,
questions: [<Array containing question IDs.>]
}
```
Replies with:

- On successful match within 30s:
```
{
event: "match-success",
userId: <ID of this user>,
peerUserId: <ID of the other user>,
agreedQuestion: <The question that has been agreed upon by both users>
}
```
- On timeout after 30s (no match found):
```
{
event: "match-timeout",
userId: <ID of this user>
}
```
2. To dequeue a user:
- Replies with an ```dequeue-success``` event.
```
{
"event": "dequeue",
"userId": <ID of this user>,
}
```

### Running PeerPrep
In the root directory, run
Expand Down
158 changes: 135 additions & 23 deletions backend/matching-service/controllers/matchingController.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,173 @@
// controllers/matchingController.js
const { Kafka } = require('kafkajs');
const EventEmitter = require('events');
const QUEUE_TIME = 30000;
const BATCH_INTERVAL = 10000;

// Kafka setup
const kafka = new Kafka({
clientId: 'matching-service',
brokers: ['kafka:9092'], // 'kafka' is the service name from docker-compose
});

process.on('SIGTERM', async () => {
console.log('Shutting down...');
await producer.disconnect();
await consumer.disconnect();
process.exit(0);
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'matching-group' });

// Produce a message to Kafka (used in the POST /produce route)
const produceMessage = async (req, res) => {
const { message } = req.body;
(async () => {
try {
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [{ value: message }],
await consumer.connect();
} catch(error) {
console.error(error)
}
})();

const eventEmitter = new EventEmitter();
let dequeued = new Map();

const matchmakeUser = async (userId, questions) => {
return new Promise((resolve, reject) => {
produceMessage({
userId: userId,
questions: questions
}, false);
eventEmitter.once(`success-${userId}`, (peerUserId, question) => {
const res = {
event: "match-success",
userId: userId,
peerUserId: peerUserId,
agreedQuestion: question
}
resolve(JSON.stringify(res));
// resolve(`User ${userId} matched with User ${peerUserId}.`)
});
eventEmitter.once(`dequeue-${userId}`, () => {
dequeued.set(userId, true);
const res = {
event: "dequeued-success",
userId: userId
}
resolve(JSON.stringify(res));
// resolve(`User ${userId} dequeued from matchmaking.`)
})
setTimeout(() => {
const rejectionMsg = {
event: "match-timeout",
userId: userId
};
reject(JSON.stringify(rejectionMsg));
// reject(`No matches for ${userId}.`)
}, QUEUE_TIME);
})
}

const dequeueUser = async (userId) => {
eventEmitter.emit(`dequeue-${userId}`);
}

// Produce a message to Kafka (used in the POST /produce route)
const produceMessage = async (request, isRequeue = false) => {
const msg = {
userId: request.userId,
questions: request.questions,
enqueueTime: isRequeue ? request.enqueueTime : Date.now()
}
const stringifiedMsg = JSON.stringify(msg)
const message = {
topic: 'test-topic',
messages: [
{value: stringifiedMsg}
],
}
try {
await producer.send(message).then(() => {
console.log(`Enqueued message: ${stringifiedMsg}`)
});
await producer.disconnect();
res.status(200).send(`Message produced: ${message}`);
} catch (error) {
console.error('Error producing message:', error);
res.status(500).send('Failed to produce message');
}
};

// Produce a startup message when the service starts
const produceStartupMessage = async () => {
try {
await producer.connect();
const message = 'Hello from producer';
await producer.send({
topic: 'test-topic',
messages: [{ value: message }],
});
console.log(`Produced startup message: ${message}`);
await producer.disconnect();
} catch (error) {
console.error('Error producing startup message:', error);
}
};

let batch = [];

const batchProcess = () => {
if (batch.length == 0) {
console.log("No messages to process in this batch cycle.");
return;
}
batch.sort((A, B) => A.questions.length - B.questions.length);
console.log(`sorted batch is`, batch);
let questionDict = new Map();
let unmatchedUsers = new Map();
batch.forEach((user) => {
if (!dequeued.has(user.userId)) {
unmatchedUsers.set(user.userId, user);
}
});
for (const user of batch) {
if (Date.now() - user.enqueueTime >= QUEUE_TIME) {
// User has timed out.
// TODO: send timeout event emitter.
unmatchedUsers.delete(user.userId);
continue;
}
if (!unmatchedUsers.has(user.userId)) {
// User has already been matched/dequeued.
continue;
}

user.questions.forEach((question) => {
const peerUserId = questionDict.get(question);
// Note: UserId cannot be 0,
// since 0 is falsy and would break this if-conditional.
if (peerUserId && unmatchedUsers.has(peerUserId)) {
// Found match!!
eventEmitter.emit(`success-${user.userId}`, peerUserId, question)
eventEmitter.emit(`success-${peerUserId}`, user.userId, question)
unmatchedUsers.delete(user.userId);
unmatchedUsers.delete(peerUserId);
} else {
// Else, keep looking
questionDict.set(question, user.userId)
}
})
}
for (const [key, user] of unmatchedUsers) {
produceMessage(user, true)
console.log(`User ${key} returned to queue.`)
}
batch = [];
dequeued.clear();
};

// Start consuming messages from Kafka
const runConsumer = async () => {
try {
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

setInterval(batchProcess, BATCH_INTERVAL);

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
const parsedJsonMsg = JSON.parse(message.value);
batch.push(parsedJsonMsg);
},
});
} catch (error) {
Expand All @@ -64,7 +176,7 @@ const runConsumer = async () => {
};

module.exports = {
produceMessage,
produceStartupMessage,
runConsumer,
matchmakeUser,
dequeueUser
};
48 changes: 29 additions & 19 deletions backend/matching-service/index.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
// index.js
const express = require('express');
const kafkaRoutes = require('./routes/matchingRoutes');
const { produceStartupMessage, runConsumer } = require('./controllers/matchingController');
const WebSocket = require("ws");

// Create an Express app
const app = express();
const port = process.env.PORT || 3002;
const wss = new WebSocket.Server({port: 3002});
const { matchmakeUser, runConsumer, dequeueUser} = require("./controllers/matchingController");

// Middleware to parse JSON bodies
app.use(express.json());
console.log("Started Websocket server!!!");

// Use the Kafka routes
app.use('/matching', kafkaRoutes);
runConsumer().catch(console.error);

// Start the Express server and Kafka consumer
app.listen(port, async () => {
console.log(`Matching service running on port ${port}`);
wss.on("connection", (ws) => {
console.log("New Client Connected");
ws.send("Welcome to websocket server");

// Produce a message on startup
await produceStartupMessage();
ws.on('message', async (msg) => {
msg = JSON.parse(msg)
if (msg.event == "enqueue") {
let res;
try {
res = await matchmakeUser(msg.userId, msg.questions)
} catch (failure) {
res = failure
}
ws.send(res)
ws.close()
} else if (msg.event == "dequeue") {
dequeueUser(msg.userId);
ws.close();
console.log("User has been dequeued")
}
});

// Start consuming messages
runConsumer().catch(console.error);
});
ws.on("close", () => {
console.log("Client has disconnected")
});
})
Loading

0 comments on commit be925fc

Please sign in to comment.