Skip to content

Commit

Permalink
adds:the first version of the Notifications Cache Server
Browse files Browse the repository at this point in the history
  • Loading branch information
argahsuknesib committed Mar 6, 2024
1 parent 5a04e8e commit 9c726af
Show file tree
Hide file tree
Showing 12 changed files with 805 additions and 69 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ typings/
.yarn-integrity

# dotenv environment variables file
.env
.env.test

# parcel-bundler cache (https://parceljs.org/)
Expand Down
372 changes: 343 additions & 29 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "dist/index.js",
"scripts": {
"build": "npx tsc",
"start": "node dist/index.js",
"start": "npx tsc && node --max-old-space-size=8192 dist/index.js cache-notifications",
"test": "jest --coverage",
"lint:ts": "eslint . --ext ts --report-unused-disable-directives --max-warnings 0",
"lint:ts:fix": "eslint . --ext ts --report-unused-disable-directives --max-warnings 0 --fix",
Expand All @@ -27,9 +27,13 @@
"typescript": "^4.9.4"
},
"dependencies": {
"@types/n3": "^1.16.4",
"@types/redis": "^4.0.11",
"@typescript-eslint/parser": "^7.1.0",
"axios": "^1.6.7",
"bunyan": "^1.8.15",
"ioredis": "^5.3.2"
"commander": "^12.0.0",
"ioredis": "^5.3.2",
"n3": "^1.17.2"
}
}
10 changes: 6 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { CacheServiceHTTPServer } from "./server/CacheServiceHTTPServer";
import * as bunyan from "bunyan";
import * as fs from 'fs';
const program = require('commander');
import { program } from "commander";
const log_file = fs.createWriteStream('./logs/info.log', { flags: 'a' });

const logger = bunyan.createLogger({
Expand Down Expand Up @@ -30,7 +30,9 @@ program
.command('cache-notifications')
.description('Starts the cache service for notifications from the LDES stream stored in the solid server(s).')
.option('-p, --port <port>', 'The port where the HTTP server will listen.', '8085')
.option('-l --ldes <ldes>', 'The location of the LDES Stream', 'http://localhost:3000/aggregation_pod/aggregation/')
.option('-l --ldes <ldes>', 'The location of the LDES Stream', ['http://localhost:3000/aggregation_pod/aggregation/'])
.action((options: any) => {
new CacheServiceHTTPServer(options.port, options.pod, logger);
});
new CacheServiceHTTPServer(options.port, options.ldes, logger);
});

program.parse(process.argv);
84 changes: 70 additions & 14 deletions src/server/CacheServiceHTTPServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as http from 'http';
import * as url from 'url';
import { CacheService } from '../service/CacheService';
import { SubscribeNotification } from '../service/SubscribeNotification';

/**
* A class for the HTTP server that interacts with the cache service to handle requests.
Expand All @@ -9,31 +11,46 @@ import { CacheService } from '../service/CacheService';
export class CacheServiceHTTPServer {
private readonly cacheService: CacheService;
private readonly server: http.Server;
private pod_url: string;
public logger: any;
private subscription_notification: SubscribeNotification;
private pod_url_array: string[];

/**
* Creates an instance of CacheServiceHTTPServer.
* @param {number} port - The port where the HTTP server will listen.
* @param {string} pod_url - The location of the Solid Pod from which the notifications are retrieved.
* @param {string[]} pod_url - The location of the Solid Pod from which the notifications are retrieved.
* @param {*} logger - The logger object.
* @memberof CacheServiceHTTPServer
*/
constructor(port: number, pod_url: string, logger: any) {
constructor(port: number, pod_url: string[], logger: any) {
this.logger = logger;
this.cacheService = new CacheService();
this.pod_url = pod_url;
this.pod_url_array = pod_url;
this.subscription_notification = new SubscribeNotification(this.pod_url_array);
this.server = http.createServer(this.request_handler.bind(this));
this.setupServer(port);
this.setupServerAndSubscribe(port);
}
/**
* Sets up the HTTP server where it listens on the specified port as well as connects to the cache service.
* @private
* @param {number} port - The port where the HTTP server will listen.
* @memberof CacheServiceHTTPServer
*/
private async setupServer(port: number) {
await this.cacheService.connect();
this.server.listen(port, () => {
console.log(`Server listening on port ${port}`);
});
private async setupServerAndSubscribe(port: number) {
if (await this.cacheService.get_status() === "connecting") {
const subscription_successful = await this.subscription_notification.subscribe();
if(subscription_successful) {
console.log(`Subscription was successful`);

}
this.server.listen(port, () => {
this.logger.info(`Server listening on port ${port}`);
});
}
else {
this.logger.error("Cache service is not connecting");
await this.cacheService.connect();
}
}
/**
* Handles the requests to the HTTP server.
Expand All @@ -43,7 +60,7 @@ export class CacheServiceHTTPServer {
* @returns {Promise<void>} - A promise which responses nothing.
* @memberof CacheServiceHTTPServer
*/
private async request_handler(request: http.IncomingMessage, response: http.ServerResponse) {
public async request_handler(request: http.IncomingMessage, response: http.ServerResponse) {
if (request.method === 'POST') {
await this.handleNotificationPostRequest(request, response);
}
Expand All @@ -67,7 +84,35 @@ export class CacheServiceHTTPServer {
* @memberof CacheServiceHTTPServer
*/
private async handleNotificationPostRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {

let body = '';
request.on('data', (chunk) => {
body += chunk.toString();
});
request.on('end', async () => {
try {
const notification = JSON.parse(body);
const published = (new Date(notification.published).getTime()).toString();
const resource_location = notification.object;
try {
const resource_fetch_response = await fetch(resource_location, {
method: 'GET',
headers: {
'Accept': 'text/turtle'
}
});
this.logger.info("Resource fetched successfully");
const response_text = await resource_fetch_response.text();
this.cacheService.set(published, response_text);
} catch (error) {
this.logger.error("Error fetching the resource: " + error);
}
response.writeHead(200, 'OK');
response.end('OK');
} catch (error) {
response.writeHead(400, 'Bad Request');
response.end('Bad Request');
}
});
}
/**
* Handles the GET requests to the HTTP server, which are requests from the clients to retrieve the notifications.
Expand All @@ -78,7 +123,13 @@ export class CacheServiceHTTPServer {
* @memberof CacheServiceHTTPServer
*/
private async handleClientGetRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {

this.logger.info(`GET request received for ${request.url}`)
console.log(`GET request received for ${request.url}`);
const parsed_url = url.parse(request.url!, true);
const query_parameters = parsed_url.query;
const event_time = query_parameters.event_time as string | undefined || 'Anonymous';
response.writeHead(200, 'OK', { 'Content-Type': 'text/turtle' });
response.end(await this.cacheService.get(event_time));
}
/**
* Handles the DELETE requests to the HTTP server.
Expand All @@ -89,7 +140,12 @@ export class CacheServiceHTTPServer {
* @memberof CacheServiceHTTPServer
*/
private async handleNotificationDeleteRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {

const parsed_url = url.parse(request.url!, true);
const query_parameters = parsed_url.query;
const event_time = query_parameters.event_time as string | undefined || 'Anonymous';
await this.cacheService.delete(event_time);
response.writeHead(200, 'OK');
response.end('OK');
}


Expand Down
5 changes: 5 additions & 0 deletions src/service/CacheService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ describe('CacheService', () => {
const is_disconnected = await cacheService.disconnect();
expect(is_disconnected).toBe(true);
});

it('should_describe_the_cache', async() => {
const status = await cacheService.get_status();
expect(status).toBe('wait');
})
});
9 changes: 9 additions & 0 deletions src/service/CacheService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from "ioredis";
import { RedisStatus } from "../utils/Types";
/**
* A service for interacting with the Redis cache.
* @class CacheService
Expand Down Expand Up @@ -79,5 +80,13 @@ export class CacheService {
async delete(key: string) {
await this.client.del(key);
}
/**
* Get the status of the Redis cache.
* @returns {Promise<RedisStatus>} - The current status of the Redis Cache, returned as a promise.
* @memberof CacheService
*/
async get_status(): Promise<RedisStatus> {
return await this.client.status;
}
}

91 changes: 91 additions & 0 deletions src/service/SubscribeNotification.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { SubscribeNotification } from './SubscribeNotification';
import { extract_ldp_inbox, extract_subscription_server } from '../utils/Util';

jest.mock('../utils/Util', () => ({
extract_ldp_inbox: jest.fn(),
extract_subscription_server: jest.fn(),
}));

describe('SubscribeNotification', () => {
afterEach(() => {
jest.clearAllMocks();
});

it('should subscribe successfully', async () => {
const streams = ['stream1', 'stream2'];
const subscribeNotification = new SubscribeNotification(streams);

(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
(extract_subscription_server as jest.Mock).mockResolvedValueOnce({
location: 'http://subscription-server1',
});

const fetchMock = jest.fn().mockResolvedValueOnce({ status: 200 });
global.fetch = fetchMock;

const result = await subscribeNotification.subscribe();

expect(result).toBe(true);
expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
expect(fetchMock).toHaveBeenCalledWith('http://subscription-server1', {
method: 'POST',
headers: {
'Content-Type': 'application/ld+json',
},
body: JSON.stringify({
'@context': ['https://www.w3.org/ns/solid/notification/v1'],
type: 'http://www.w3.org/ns/solid/notifications#WebhookChannel2023',
topic: 'inbox1',
sendTo: 'http://localhost:8085/',
}),
});
});

it('should throw an error if subscription server is undefined', async () => {
const streams = ['stream1'];
const subscribeNotification = new SubscribeNotification(streams);

(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
(extract_subscription_server as jest.Mock).mockResolvedValueOnce(undefined);

await expect(subscribeNotification.subscribe()).rejects.toThrow(
'Subscription server is undefined.'
);

expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
});

it('should throw an error if subscription to the notification server fails', async () => {
const streams = ['stream1'];
const subscribeNotification = new SubscribeNotification(streams);

(extract_ldp_inbox as jest.Mock).mockResolvedValueOnce('inbox1');
(extract_subscription_server as jest.Mock).mockResolvedValueOnce({
location: 'http://subscription-server1',
});

const fetchMock = jest.fn().mockResolvedValueOnce({ status: 500 });
global.fetch = fetchMock;

await expect(subscribeNotification.subscribe()).rejects.toThrow(
'The subscription to the notification server failed.'
);

expect(extract_ldp_inbox).toHaveBeenCalledWith('stream1');
expect(extract_subscription_server).toHaveBeenCalledWith('inbox1');
expect(fetchMock).toHaveBeenCalledWith('http://subscription-server1', {
method: 'POST',
headers: {
'Content-Type': 'application/ld+json',
},
body: JSON.stringify({
'@context': ['https://www.w3.org/ns/solid/notification/v1'],
type: 'http://www.w3.org/ns/solid/notifications#WebhookChannel2023',
topic: 'inbox1',
sendTo: 'http://localhost:8085/',
}),
});
});
});
65 changes: 46 additions & 19 deletions src/service/SubscribeNotification.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
export class SubscribeNotification {
public solid_pod_urls: string[];
import { extract_ldp_inbox, extract_subscription_server } from "../utils/Util";

constructor(solid_pod_urls: string[]) {
this.solid_pod_urls = solid_pod_urls;
/**
* This class is used to subscribe to the notification server for real-time notifications.
* @class SubscribeNotification
*/
export class SubscribeNotification {
private ldes_streams: string[];
/**
* Creates an instance of SubscribeNotification.
* @param {string[]} streams - An array of LDES streams to subscribe to, for real-time notifications.
* @memberof SubscribeNotification
*/
constructor(streams: string[]) {
this.ldes_streams = streams;
}

public async subscribe() {
console.log(`Subscribing to notifications for ${this.solid_pod_urls}...`);

/**
* Subscribes to the notification server for each LDES stream in the constructor, using the inbox and subscription server.
* @returns {(Promise<boolean | undefined>)} - Returns a promise with a boolean or undefined. If the subscription is successful, it returns true. If the subscription fails, it throws an error.
* @memberof SubscribeNotification
*/
public async subscribe(): Promise<boolean | undefined> {
for (const stream of this.ldes_streams) {
const inbox = await extract_ldp_inbox(stream) as string;
const subscription_server = await extract_subscription_server(inbox);
if (subscription_server === undefined) {
throw new Error("Subscription server is undefined.");
} else {
const response = await fetch(subscription_server.location, {
method: 'POST',
headers: {
'Content-Type': 'application/ld+json'
},
body: JSON.stringify({
"@context": ["https://www.w3.org/ns/solid/notification/v1"],
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
"topic": inbox,
"sendTo": "http://localhost:8085/"
})
});
if (response.status === 200) {
return true;
}
else {
throw new Error("The subscription to the notification server failed.");
}
}
}
}

}


/**
* sending the server `http://localhost:3000/.notifications/WebhookChannel2023/`
{
"@context": [ "https://www.w3.org/ns/solid/notification/v1" ],
"type": "http://www.w3.org/ns/solid/notifications#WebhookChannel2023",
"topic": "http://localhost:3000/aggregation_pod/",
"sendTo": "http://localhost:3001/"
}
*/
7 changes: 7 additions & 0 deletions src/utils/Types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export type SubscriptionServerNotification = {
location: string,
channelType: string,
channelLocation: string
}

export type RedisStatus = "wait" | "reconnecting" | "connecting" | "connect" | "ready" | "close" | "end";
Loading

0 comments on commit 9c726af

Please sign in to comment.