From 97921314f57fa8155c33de190f9e2766ae208dba Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Thu, 8 Feb 2024 15:40:28 +0100 Subject: [PATCH] changes to service interface and addition of example services --- lib/index.ts | 2 + .../ServiceRegistryHardcodedTestOnly.ts | 5 +- lib/service/ServiceAggregation.ts | 70 +++++++++++++++++++ lib/service/ServiceEmpty.ts | 51 ++++++++++++++ 4 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 lib/service/ServiceAggregation.ts create mode 100644 lib/service/ServiceEmpty.ts diff --git a/lib/index.ts b/lib/index.ts index fde7969..64d9aa6 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -12,6 +12,8 @@ export * from './fetch/NativeFetch'; export * from './init/AppRunner'; export * from './service/IService'; +export * from './service/ServiceEmpty'; +export * from './service/ServiceAggregation'; export * from './service-registry/ServiceRegistry'; export * from './service-registry/ServiceRegistryHardcodedTestOnly'; diff --git a/lib/service-registry/ServiceRegistryHardcodedTestOnly.ts b/lib/service-registry/ServiceRegistryHardcodedTestOnly.ts index 40d48a8..7839b81 100644 --- a/lib/service-registry/ServiceRegistryHardcodedTestOnly.ts +++ b/lib/service-registry/ServiceRegistryHardcodedTestOnly.ts @@ -14,7 +14,8 @@ export class ServiceRegistryHardcodedTestOnly implements ServiceRegistry { public async initializeServices(): Promise { await Promise.all( this.services.map( - async(aggregatorService): Promise => aggregatorService.initialize(), + async(aggregatorService): Promise => new Promise((resolve): void => + aggregatorService.subscribeInitialized(resolve)), ), ); } @@ -46,7 +47,7 @@ export class ServiceRegistryHardcodedTestOnly implements ServiceRegistry { public get descriptions(): string[] { const result = []; for (const service of this.services) { - result.push(service.description); + result.push(service.description.toString()); } return result; } diff --git a/lib/service/ServiceAggregation.ts b/lib/service/ServiceAggregation.ts new file mode 100644 index 0000000..ae48ad6 --- /dev/null +++ b/lib/service/ServiceAggregation.ts @@ -0,0 +1,70 @@ +import { StreamParser, StreamWriter } from 'n3'; +import { v4 } from 'uuid'; +import type { IFetch } from '../fetch/IFetch'; +import type { IPod } from '../pod/IPod'; +import { AsyncConstructor } from '../core/AsyncConstructor'; +import type { IService, Operation, OperationResult, OperationTestResult, ServiceDescription } from './IService'; + +export class ServiceAggregation extends AsyncConstructor implements IService { + public fetch: IFetch; + public pod: IPod; + private podLocation: string | undefined; + + public constructor(args: ServiceAggregationArgs) { + super(args); + this.fetch = args.fetch; + this.pod = args.pod; + } + + protected async initialize(args: ServiceAggregationArgs): Promise { + this.podLocation = await args.pod.newServiceLocation(this.description); + } + + public async test(operation: Operation): Promise { + if (operation.operation !== 'Aggregation') { + throw new Error('Not an aggregation operation'); + } + return { + aggregatorService: this, + operation, + runnable: true, + }; + } + + public async run(operation: Operation): Promise { + const resultLocation = `${this.podLocation}/${v4()}.ttl`; + + const streamWriter = new StreamWriter(); + for (const source of operation.sources) { + const streamParser = new StreamParser(); + // eslint-disable-next-line ts/no-unsafe-argument + (await this.fetch.fetch(source)).body?.pipeThrough(streamParser as any); + streamParser.pipe(streamWriter); + } + + await this.fetch.fetch(resultLocation, { + method: 'PUT', + body: streamWriter, + headers: { + 'Content-Type': 'text/turtle', // eslint-disable-line ts/naming-convention + }, + }); + + return { + aggregatorService: this, + operation, + resultLocation, + }; + } + + public get description(): ServiceDescription { + return { + toString: (): string => 'Aggregation', + }; + } +} + +export type ServiceAggregationArgs = { + fetch: IFetch; + pod: IPod; +}; diff --git a/lib/service/ServiceEmpty.ts b/lib/service/ServiceEmpty.ts new file mode 100644 index 0000000..82e2baf --- /dev/null +++ b/lib/service/ServiceEmpty.ts @@ -0,0 +1,51 @@ +import { AsyncConstructor } from '../core/AsyncConstructor'; +import type { IPod } from '../pod/IPod'; +import type { IFetch } from '../fetch/IFetch'; +import type { IService, Operation, OperationResult, OperationTestResult, ServiceDescription } from './IService'; +import type { ServiceAggregationArgs } from './ServiceAggregation'; + +export class ServiceEmpty extends AsyncConstructor implements IService { + private podLocation: string | undefined; + public fetch: IFetch; + public pod: IPod; + + public constructor(args: ServiceAggregationArgs) { + super(args); + this.fetch = args.fetch; + this.pod = args.pod; + } + + protected async initialize(args: ServiceAggregationArgs): Promise { + this.podLocation = await args.pod.newServiceLocation(this.description); + } + + public async test(operation: Operation): Promise { + if (operation.operation !== 'Empty') { + throw new Error('Not a Empty operation'); + } + return { + aggregatorService: this, + operation, + runnable: true, + }; + } + + public async run(operation: Operation): Promise { + return { + aggregatorService: this, + operation, + resultLocation: '', + }; + } + + public get description(): ServiceDescription { + return { + toString: (): string => 'Empty', + }; + } +} + +export type ServiceEmptyArgs = { + fetch: IFetch; + pod: IPod; +};