Skip to content

Commit

Permalink
feat: Add Redis support
Browse files Browse the repository at this point in the history
  • Loading branch information
smessie committed Oct 2, 2024
1 parent 60b0484 commit 83b9f9d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 37 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ You will probably want to configure a `urn:solid-server:default:LDESConfig` and
- `value` of the relation
- `bucket` is the target fragment id
- `path` of the relation
- optional `timeStamp` specifies the starting timestamp of this fragment.
- `metaCollection` specifies the metadata collection, this collection contains information about each ingested stream. JSON objects with following fields:
- `id` the id of the ingested stream.
- `type` specifies the metadata type. Often "https://w3id.org/sds#Stream".
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
export * from "./DBConfig";
export * from "./repositories/Repository";
export * from "./repositories/MongoDBRepository";
export * from "./repositories/RedisRepository";

// LDES interface implementations
export * from "./ldes/SDSView";
Expand Down
21 changes: 4 additions & 17 deletions src/ldes/SDSView.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,12 @@ export class SDSView implements View {
const members: string[] = [];
const relations = <RelationParameters[]>[];

const search: any = {};
const id = segs.length > 0 ? segs.join("/") : undefined;
const timestampValue = query["timestamp"];
const id = segs.length > 0 ? segs.join("/") : (await this.repository.findRoots(this.streamId))[0];

if (timestampValue) {
search.timeStamp = new Date(timestampValue).getTime();
}

this.logger.verbose("Finding fragment for " + JSON.stringify(search));
const fragment = await this.repository.findBucket(this.streamId, id, search);
this.logger.verbose(`Finding fragment for stream '${this.streamId}' and id '${id}'.`);
const fragment = await this.repository.findBucket(this.streamId, id);

if (fragment) {
if (timestampValue && fragment.timeStamp) {
this.logger.info("Redirect to proper fragment URL");
// Redirect to the correct resource, we now have the timestamp;
throw new RedirectHttpError(302, "found", `/${fragment.id!}`);
}
fragment.relations = fragment.relations || [];

const rels: RelationParameters[] = fragment!.relations.map(
Expand All @@ -148,9 +137,7 @@ export class SDSView implements View {
relations.push(...rels);
members.push(...(fragment.members || []));
} else {
this.logger.error(
"No such bucket found! " + JSON.stringify(search),
);
this.logger.error(`No such bucket found! (stream: '${this.streamId}', id: '${id}')`);
throw new RedirectHttpError(404, "No fragment found", "");
}

Expand Down
14 changes: 3 additions & 11 deletions src/repositories/MongoDBRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Bucket, BucketSearch, Repository } from "./Repository";
import { Bucket, Repository } from "./Repository";
import { getLoggerFor } from "@solid/community-server";
import { Db, MongoClient } from "mongodb";
import { Member } from "@treecg/types";
Expand Down Expand Up @@ -49,17 +49,10 @@ export class MongoDBRepository implements Repository {
.then((roots) => roots.map((root) => root.id)) ?? [];
}

async findBucket(type: string, id?: string, search?: BucketSearch): Promise<Bucket | null> {
async findBucket(type: string, id: string): Promise<Bucket | null> {
const filter: any = { streamId: type, id: id };
if (search?.leaf) {
filter.leaf = search.leaf;
}
if (search?.timeStamp) {
filter.timeStamp = { $lte: new Date(search.timeStamp) };
}
return this.db?.collection(this.index)
.find(filter)
.sort({ timeStamp: -1 })
.limit(1)
.next()
.then((entry) => {
Expand All @@ -69,9 +62,8 @@ export class MongoDBRepository implements Repository {
return {
id: entry.id,
streamId: entry.streamId,
leaf: entry.leaf,
root: entry.root,
value: entry.value,
timeStamp: entry.timeStamp,
immutable: entry.immutable,
members: entry.members,
relations: entry.relations,
Expand Down
121 changes: 121 additions & 0 deletions src/repositories/RedisRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { Bucket, Repository } from "./Repository";
import { createClient, RedisClientType, RediSearchSchema, SchemaFieldTypes } from "redis";
import { getLoggerFor } from "@solid/community-server";
import { Member } from "@treecg/types";
import { DataFactory, Parser } from "n3";
import namedNode = DataFactory.namedNode;

export class RedisRepository implements Repository {
protected url: string;
protected metadata: string;
protected data: string;
protected index: string;

protected client: RedisClientType | undefined;

protected logger = getLoggerFor(this);

constructor(url: string, metadata: string, data: string, index: string) {
this.url = url;
this.metadata = metadata;
this.data = data;
this.index = index;
}

async open(): Promise<void> {
this.client = createClient({ url: this.url });
await this.client.connect();

this.logger.debug(`Connected to ${this.url}`);

await this.createSearchIndex();
}

async close(): Promise<void> {
await this.client?.disconnect();

this.logger.debug(`Closed connection to ${this.url}`);
}

async findMetadata(type: string, id: string): Promise<string | null> {
return this.client?.get(`${this.metadata}:${encodeURIComponent(type)}:${encodeURIComponent(id)}`) ?? null;
}

async findRoots(streamId: string): Promise<string[]> {
const result = await this.client?.ft.search(`idx:${this.index}`, `(@streamId:{${this.encodeKey(streamId)}}) (@root:{true})`, { RETURN: ["id"] });
return result?.documents.map((doc) => doc.value.id as string) ?? [];
}

async findBucket(type: string, id: string): Promise<Bucket | null> {
const query = `(@streamId:{${this.encodeKey(type)}) (@id:{${this.encodeKey(id)}})`;
const result = (await this.client?.ft.search(`idx:${this.index}`, query))?.documents ?? [];
const doc = result.pop();
if (!doc) {
return null;
}

const members = await this.client?.sMembers(`${this.index}:${encodeURIComponent(type)}:${encodeURIComponent(doc.value.id as string)}:members`) ?? [];
const relations = ((await this.client?.sMembers(`${this.index}:${encodeURIComponent(type)}:${encodeURIComponent(doc.value.id as string)}:relations`)) ?? []).map((relation) => JSON.parse(relation));

return {
id: doc.id,
streamId: type,
root: doc.value.root as unknown as boolean,
value: doc.value.value ? (doc.value.value as string) : undefined,
immutable: doc.value.immutable as unknown as boolean,
members: members,
relations: relations,
};
}

async findMembers(members: string[]): Promise<Member[]> {
return (await this.client?.mGet(members.map((member) => `${this.data}:${encodeURIComponent(member)}`)) ?? [])
.filter((entry) => entry !== null)
.map((entry, i) => {
return {
id: namedNode(members[i]),
quads: new Parser().parse(entry),
};
});
}

async createSearchIndex(): Promise<void> {
try {
const schema: RediSearchSchema = {
"$.streamId": {
type: SchemaFieldTypes.TAG,
AS: "streamId",
},
"$.id": {
type: SchemaFieldTypes.TAG,
AS: "id",
},
"$.root": {
type: SchemaFieldTypes.TAG,
AS: "root",
},
"$.value": {
type: SchemaFieldTypes.TEXT,
AS: "value",
},
"$.immutable": {
type: SchemaFieldTypes.TAG,
AS: "immutable",
},
};
await this.client?.ft.create(`idx:${this.index}`, schema, {
ON: "JSON",
PREFIX: this.index,
});
this.logger.info(`Created index idx:${this.index}`);
} catch (e) {
// Index already exists
this.logger.debug(`Index idx:${this.index} already exists`);
}
}

encodeKey(key: string): string {
// Add \\ in front of ., :, /, -
return key.replace(/([.:\/-])/g, "\\$1");
}
}
18 changes: 10 additions & 8 deletions src/repositories/Repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import { DBConfig } from "../DBConfig";
import { env } from "process";
import { MongoDBRepository } from "./MongoDBRepository";
import { Member } from "@treecg/types";
import { RedisRepository } from "./RedisRepository";

export type Bucket = {
id: string;
streamId: string;
leaf: boolean;
root: boolean;
value?: string;
timeStamp?: number;
immutable?: boolean;
members: string[];
relations: Relation[];
Expand All @@ -22,11 +22,6 @@ export type Relation = {
timestampRelation?: boolean;
};

export type BucketSearch = {
leaf?: boolean;
timeStamp?: number;
};

export interface Repository {
open(): Promise<void>;

Expand All @@ -36,7 +31,7 @@ export interface Repository {

findRoots(streamId: string): Promise<string[]>;

findBucket(type: string, id?: string, search?: BucketSearch): Promise<Bucket | null>;
findBucket(type: string, id: string): Promise<Bucket | null>;

findMembers(members: string[]): Promise<Member[]>;
}
Expand All @@ -52,6 +47,13 @@ export function getRepository(dbConfig: DBConfig): Repository {
dbConfig.data,
dbConfig.index,
);
} else if (url.startsWith("redis://")) {
return new RedisRepository(
url,
dbConfig.meta,
dbConfig.data,
dbConfig.index,
);
} else {
throw new Error("Unknown database type");
}
Expand Down

0 comments on commit 83b9f9d

Please sign in to comment.