Skip to content

Commit

Permalink
changes to service interface and addition of example services
Browse files Browse the repository at this point in the history
  • Loading branch information
maartyman committed Feb 8, 2024
1 parent 9b4dd64 commit 9792131
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 2 deletions.
2 changes: 2 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export * from './fetch/NativeFetch';
export * from './init/AppRunner';

export * from './service/IService';
export * from './service/ServiceEmpty';
export * from './service/ServiceAggregation';

export * from './service-registry/ServiceRegistry';
export * from './service-registry/ServiceRegistryHardcodedTestOnly';
5 changes: 3 additions & 2 deletions lib/service-registry/ServiceRegistryHardcodedTestOnly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export class ServiceRegistryHardcodedTestOnly implements ServiceRegistry {
public async initializeServices(): Promise<void> {
await Promise.all(
this.services.map(
async(aggregatorService): Promise<void> => aggregatorService.initialize(),
async(aggregatorService): Promise<void> => new Promise<void>((resolve): void =>
aggregatorService.subscribeInitialized(resolve)),
),
);
}
Expand Down Expand Up @@ -46,7 +47,7 @@ export class ServiceRegistryHardcodedTestOnly implements ServiceRegistry {
public get descriptions(): string[] {
const result = [];
for (const service of this.services) {
result.push(service.description);
result.push(service.description.toString());
}
return result;
}
Expand Down
70 changes: 70 additions & 0 deletions lib/service/ServiceAggregation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { StreamParser, StreamWriter } from 'n3';
import { v4 } from 'uuid';
import type { IFetch } from '../fetch/IFetch';
import type { IPod } from '../pod/IPod';
import { AsyncConstructor } from '../core/AsyncConstructor';
import type { IService, Operation, OperationResult, OperationTestResult, ServiceDescription } from './IService';

export class ServiceAggregation extends AsyncConstructor implements IService {
public fetch: IFetch;
public pod: IPod;
private podLocation: string | undefined;

public constructor(args: ServiceAggregationArgs) {
super(args);
this.fetch = args.fetch;
this.pod = args.pod;
}

protected async initialize(args: ServiceAggregationArgs): Promise<void> {
this.podLocation = await args.pod.newServiceLocation(this.description);
}

public async test(operation: Operation): Promise<OperationTestResult> {
if (operation.operation !== 'Aggregation') {
throw new Error('Not an aggregation operation');
}
return {
aggregatorService: this,
operation,
runnable: true,
};
}

public async run(operation: Operation): Promise<OperationResult> {
const resultLocation = `${this.podLocation}/${v4()}.ttl`;

const streamWriter = new StreamWriter();
for (const source of operation.sources) {
const streamParser = new StreamParser();
// eslint-disable-next-line ts/no-unsafe-argument
(await this.fetch.fetch(source)).body?.pipeThrough(streamParser as any);
streamParser.pipe(streamWriter);
}

await this.fetch.fetch(resultLocation, {
method: 'PUT',
body: streamWriter,
headers: {
'Content-Type': 'text/turtle', // eslint-disable-line ts/naming-convention
},
});

return {
aggregatorService: this,
operation,
resultLocation,
};
}

public get description(): ServiceDescription {
return {
toString: (): string => 'Aggregation',
};
}
}

export type ServiceAggregationArgs = {
fetch: IFetch;
pod: IPod;
};
51 changes: 51 additions & 0 deletions lib/service/ServiceEmpty.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { AsyncConstructor } from '../core/AsyncConstructor';
import type { IPod } from '../pod/IPod';
import type { IFetch } from '../fetch/IFetch';
import type { IService, Operation, OperationResult, OperationTestResult, ServiceDescription } from './IService';
import type { ServiceAggregationArgs } from './ServiceAggregation';

export class ServiceEmpty extends AsyncConstructor implements IService {
private podLocation: string | undefined;
public fetch: IFetch;
public pod: IPod;

public constructor(args: ServiceAggregationArgs) {
super(args);
this.fetch = args.fetch;
this.pod = args.pod;
}

protected async initialize(args: ServiceAggregationArgs): Promise<void> {
this.podLocation = await args.pod.newServiceLocation(this.description);
}

public async test(operation: Operation): Promise<OperationTestResult> {
if (operation.operation !== 'Empty') {
throw new Error('Not a Empty operation');
}
return {
aggregatorService: this,
operation,
runnable: true,
};
}

public async run(operation: Operation): Promise<OperationResult> {
return {
aggregatorService: this,
operation,
resultLocation: '',
};
}

public get description(): ServiceDescription {
return {
toString: (): string => 'Empty',
};
}
}

export type ServiceEmptyArgs = {
fetch: IFetch;
pod: IPod;
};

0 comments on commit 9792131

Please sign in to comment.