Skip to content

Commit

Permalink
busify orderBy & add extensionFunction & move aggregateEval & functio…
Browse files Browse the repository at this point in the history
…n in single bus
  • Loading branch information
jitsedesmet committed Nov 1, 2023
1 parent f8ba965 commit a1734f8
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
import { ActorBindingsAggregatorFactory } from '@comunica/bus-bindings-aggeregator-factory';
import type { IActorTest } from '@comunica/core';
import { RegularOperator } from '@comunica/expression-evaluator';
import type { ITermFunction } from '@comunica/types';
import { AverageAggregator } from './AverageAggregator';

/**
Expand All @@ -22,13 +23,22 @@ export class ActorBindingsAggregatorFactoryAverage extends ActorBindingsAggregat
return {};
}

public async run(action: IActionBindingsAggregatorFactory): Promise<IActorBindingsAggregatorFactoryOutput> {
public async run({ factory, context, expr }: IActionBindingsAggregatorFactory):
Promise<IActorBindingsAggregatorFactoryOutput> {
return {
aggregator: new AverageAggregator(
await action.factory.createEvaluator(action.expr, action.context),
action.expr.distinct,
await action.factory.createTermFunction({ functionName: RegularOperator.ADDITION }),
await action.factory.createTermFunction({ functionName: RegularOperator.DIVISION }),
await factory.createEvaluator(expr, context),
expr.distinct,
<ITermFunction> await factory.createFunction({
functionName: RegularOperator.ADDITION,
context,
definitionType: 'onTerm',
}),
<ITermFunction> await factory.createFunction({
functionName: RegularOperator.DIVISION,
context,
definitionType: 'onTerm',
}),
),
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import type { ExpressionEvaluator } from '@comunica/expression-evaluator';
import { AggregateEvaluator, typedLiteral, TypeURL } from '@comunica/expression-evaluator';
import * as E from '@comunica/expression-evaluator/lib/expressions';
import type { RegularFunction } from '@comunica/expression-evaluator/lib/functions';
import type { IBindingsAggregator, IExpressionEvaluator } from '@comunica/types';
import type { IBindingsAggregator, IExpressionEvaluator, ITermFunction } from '@comunica/types';
import type * as RDF from '@rdfjs/types';

interface IAverageState {
Expand All @@ -16,8 +15,8 @@ export class AverageAggregator extends AggregateEvaluator implements IBindingsAg
public constructor(
evaluator: IExpressionEvaluator,
distinct: boolean,
private readonly additionFunction: RegularFunction,
private readonly divisionFunction: RegularFunction,
private readonly additionFunction: ITermFunction,
private readonly divisionFunction: ITermFunction,
throwError?: boolean,
) {
super(evaluator, distinct, throwError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from '@comunica/bus-bindings-aggeregator-factory';
import type { IActorTest } from '@comunica/core';
import { RegularOperator } from '@comunica/expression-evaluator';
import type { ITermFunction } from '@comunica/types';
import { SumAggregator } from './SumAggregator';

/**
Expand All @@ -25,12 +26,17 @@ export class ActorBindingsAggregatorFactorySum extends ActorBindingsAggregatorFa
return {};
}

public async run(action: IActionBindingsAggregatorFactory): Promise<IActorBindingsAggregatorFactoryOutput> {
public async run({ factory, expr, context }: IActionBindingsAggregatorFactory):
Promise<IActorBindingsAggregatorFactoryOutput> {
return {
aggregator: new SumAggregator(
await action.factory.createEvaluator(action.expr, action.context),
action.expr.distinct,
await action.factory.createTermFunction({ functionName: RegularOperator.ADDITION }),
await factory.createEvaluator(expr, context),
expr.distinct,
<ITermFunction> await factory.createFunction({
functionName: RegularOperator.ADDITION,
context,
definitionType: 'onTerm',
}),
),
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import type { ExpressionEvaluator } from '@comunica/expression-evaluator';
import { AggregateEvaluator, typedLiteral, TypeURL } from '@comunica/expression-evaluator';
import type * as E from '@comunica/expression-evaluator/lib/expressions';
import type { RegularFunction } from '@comunica/expression-evaluator/lib/functions';
import type { IExpressionEvaluator } from '@comunica/types';
import type { IExpressionEvaluator, ITermFunction } from '@comunica/types';
import type * as RDF from '@rdfjs/types';

type SumState = E.NumericLiteral;
Expand All @@ -12,7 +11,7 @@ export class SumAggregator extends AggregateEvaluator {

public constructor(evaluator: IExpressionEvaluator,
distinct: boolean,
private readonly additionFunction: RegularFunction,
private readonly additionFunction: ITermFunction,
throwError?: boolean) {
super(evaluator, distinct, throwError);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { ExpressionEvaluator } from '@comunica/expression-evaluator/lib/evaluators/ExpressionEvaluator';
import type * as E from '@comunica/expression-evaluator/lib/expressions';
import { TermTransformer } from '@comunica/expression-evaluator/lib/transformers/TermTransformer';

Check failure on line 3 in packages/bus-bindings-aggregator-factory/lib/AggregateEvaluator.ts

View workflow job for this annotation

GitHub Actions / lint

'@comunica/expression-evaluator' should be listed in the project's dependencies. Run 'npm i -S @comunica/expression-evaluator' to add it
import { TypeAlias } from '@comunica/expression-evaluator/lib/util/Consts';

Check failure on line 4 in packages/bus-bindings-aggregator-factory/lib/AggregateEvaluator.ts

View workflow job for this annotation

GitHub Actions / lint

'@comunica/expression-evaluator' should be listed in the project's dependencies. Run 'npm i -S @comunica/expression-evaluator' to add it
import { EmptyAggregateError } from '@comunica/expression-evaluator/lib/util/Errors';

Check failure on line 5 in packages/bus-bindings-aggregator-factory/lib/AggregateEvaluator.ts

View workflow job for this annotation

GitHub Actions / lint

'@comunica/expression-evaluator' should be listed in the project's dependencies. Run 'npm i -S @comunica/expression-evaluator' to add it
import { isSubTypeOf } from '@comunica/expression-evaluator/lib/util/TypeHandling';

Check failure on line 6 in packages/bus-bindings-aggregator-factory/lib/AggregateEvaluator.ts

View workflow job for this annotation

GitHub Actions / lint

'@comunica/expression-evaluator' should be listed in the project's dependencies. Run 'npm i -S @comunica/expression-evaluator' to add it
import type { IExpressionEvaluator } from '@comunica/types';
import type * as RDF from '@rdfjs/types';
import * as RdfString from 'rdf-string';

Check failure on line 9 in packages/bus-bindings-aggregator-factory/lib/AggregateEvaluator.ts

View workflow job for this annotation

GitHub Actions / lint

'rdf-string' should be listed in the project's dependencies. Run 'npm i -S rdf-string' to add it
import type * as E from '../expressions';
import { TermTransformer } from '../transformers/TermTransformer';
import { TypeAlias } from '../util/Consts';
import { EmptyAggregateError } from '../util/Errors';
import { isSubTypeOf } from '../util/TypeHandling';
import type { ExpressionEvaluator } from './ExpressionEvaluator';

/**
* This is the base class for all aggregators.
Expand Down
2 changes: 1 addition & 1 deletion packages/context-entries/lib/Keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export const KeysInitQuery = {
* The dictionary-based extensionFunctions context entry may be used instead, but not simultaneously.
*/
extensionFunctionCreator: new ActionContextKey<
(functionNamedNode: RDF.NamedNode) => ((args: RDF.Term[]) => Promise<RDF.Term>) | undefined
(functionNamedNode: RDF.NamedNode) => Promise<((args: RDF.Term[]) => Promise<RDF.Term>) | undefined>
// eslint-disable-next-line @typescript-eslint/no-extra-parens
>('@comunica/actor-init-query:extensionFunctionCreator'),
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { MediatorQueryOperation } from '@comunica/bus-query-operation';
import { ActorQueryOperation, materializeOperation } from '@comunica/bus-query-operation';
import type { FunctionBusType, IActionContext, TermFunctionBusType } from '@comunica/types';
import type { FunctionBusType, IActionContext } from '@comunica/types';
import type * as RDF from '@rdfjs/types';
import { LRUCache } from 'lru-cache';
import type { Algebra as Alg } from 'sparqlalgebrajs';
import * as E from '../expressions';
import { expressionToVar } from '../functions/Helpers';
import type { FunctionArgumentsCache } from '../functions/OverloadTree';
Expand All @@ -14,10 +13,10 @@ import * as Err from '../util/Errors';
import type { SuperTypeCallback, TypeCache, ISuperTypeProvider } from '../util/TypeHandling';

export type AsyncExtensionFunction = (args: RDF.Term[]) => Promise<RDF.Term>;
export type AsyncExtensionFunctionCreator = (functionNamedNode: RDF.NamedNode) => AsyncExtensionFunction | undefined;
export type AsyncExtensionFunctionCreator = (functionNamedNode: RDF.NamedNode) =>
Promise<AsyncExtensionFunction | undefined>;

export interface IAsyncEvaluatorContext {
extensionFunctionCreator?: AsyncExtensionFunctionCreator;
now?: Date;
baseIRI?: string;
typeCache?: TypeCache;
Expand All @@ -26,7 +25,6 @@ export interface IAsyncEvaluatorContext {
defaultTimeZone?: ITimeZoneRepresentation;
actionContext: IActionContext;
mediatorQueryOperation: MediatorQueryOperation;
mediatorTermFunction: TermFunctionBusType;
mediatorFunction: FunctionBusType;
}

Expand All @@ -35,7 +33,7 @@ export interface IAsyncEvaluatorContext {
* It also holds all context items needed for evaluating functions.
*/
export class ContextualizedEvaluator {
protected readonly transformer: AlgebraTransformer;
public readonly transformer: AlgebraTransformer;

private readonly subEvaluators: Record<string,
(expr: E.Expression, mapping: RDF.Bindings) => Promise<E.Term> | E.Term> =
Expand All @@ -47,7 +45,6 @@ export class ContextualizedEvaluator {
[E.ExpressionType.Named]: this.evalFunction.bind(this),
[E.ExpressionType.Existence]: this.evalExistence.bind(this),
[E.ExpressionType.Aggregate]: this.evalAggregate.bind(this),
[E.ExpressionType.AsyncExtension]: this.evalFunction.bind(this),
};

// Context items
Expand All @@ -59,7 +56,6 @@ export class ContextualizedEvaluator {
public readonly defaultTimeZone: ITimeZoneRepresentation;
public readonly actionContext: IActionContext;
public readonly mediatorQueryOperation: MediatorQueryOperation;
public readonly mediatorTermFunction: TermFunctionBusType;
public readonly mediatorFunction: FunctionBusType;

public constructor(context: IAsyncEvaluatorContext) {
Expand All @@ -70,24 +66,17 @@ export class ContextualizedEvaluator {
cache: context.typeCache || new LRUCache({ max: 1_000 }),
discoverer: context.getSuperType || (() => 'term'),
};
// eslint-disable-next-line unicorn/no-useless-undefined
this.extensionFunctionCreator = context.extensionFunctionCreator || (() => undefined);
this.defaultTimeZone = context.defaultTimeZone || extractTimeZone(this.now);
this.actionContext = context.actionContext;
this.mediatorQueryOperation = context.mediatorQueryOperation;
this.mediatorTermFunction = context.mediatorTermFunction;
this.mediatorFunction = context.mediatorFunction;

this.transformer = new AlgebraTransformer(
this.superTypeProvider,
this.mediatorFunction,
args => this.mediatorFunction({ ...args, context: this.actionContext }),
);
}

public translate(algExpr: Alg.Expression): Promise<E.Expression> {
return this.transformer.transformAlgebra(algExpr);
}

public async evaluateAsInternal(expr: E.Expression, mapping: RDF.Bindings): Promise<E.Term> {
const evaluator = this.subEvaluators[expr.expressionType];
if (!evaluator) {
Expand All @@ -108,7 +97,7 @@ export class ContextualizedEvaluator {
return this.transformer.transformRDFTermUnsafe(term);
}

private async evalFunction(expr: E.Operator | E.SpecialOperator | E.Named | E.AsyncExtension, mapping: RDF.Bindings):
private async evalFunction(expr: E.Operator | E.SpecialOperator | E.Named, mapping: RDF.Bindings):
Promise<E.Term> {
return expr.apply({
args: expr.args,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,69 @@
import type { MediatorBindingsAggregatorFactory } from '@comunica/bus-bindings-aggeregator-factory';
import type { MediatorQueryOperation } from '@comunica/bus-query-operation';
import { KeysInitQuery } from '@comunica/context-entries';
import type {
IActionContext,
import type { IActionContext,
IBindingsAggregator,
IExpressionEvaluator,
IExpressionEvaluatorFactory,
FunctionBusType, TermFunctionBusType, IOrderByEvaluator,
} from '@comunica/types';
FunctionBusType, IOrderByEvaluator,
FunctionExpression, ITermFunction, OrderByBus,
IEvalContext } from '@comunica/types';
import type * as RDF from '@rdfjs/types';
import { DataFactory } from 'rdf-data-factory';
import type { Algebra as Alg } from 'sparqlalgebrajs';
import type * as E from '../expressions';
import type { TermSparqlFunction, SparqlFunction } from '../functions';
import { namedFunctions, regularFunctions, specialFunctions } from '../functions';
import { FunctionDefinition, namedFunctions, regularFunctions, specialFunctions } from '../functions';
import type * as C from '../util/Consts';
import { RegularOperator } from '../util/Consts';
import type { IAsyncEvaluatorContext } from './ContextualizedEvaluator';
import type { IAsyncEvaluatorContext, AsyncExtensionFunction } from './ContextualizedEvaluator';
import { ContextualizedEvaluator } from './ContextualizedEvaluator';
import { ExpressionEvaluator } from './ExpressionEvaluator';
import { OrderByEvaluator } from './OrderByEvaluator';

export class ExpressionEvaluatorFactory implements IExpressionEvaluatorFactory {
public readonly mediatorBindingsAggregatorFactory: MediatorBindingsAggregatorFactory;
public readonly mediatorQueryOperation: MediatorQueryOperation;
public readonly functionsBus: FunctionBusType = async({ functionName }) => {
const res: SparqlFunction | undefined = {
public readonly functionsBus: FunctionBusType = async({ functionName, context }) => {
const res: FunctionExpression | undefined = {
...regularFunctions,
...specialFunctions,
...namedFunctions,
}[<C.NamedOperator | C.Operator> functionName];
if (res) {
return res;
}
throw new Error('nah!');
};

public readonly termFunctionsBus: TermFunctionBusType = async({ functionName }) => {
const res: TermSparqlFunction<any> | undefined = {
...regularFunctions,
...namedFunctions,
}[<C.NamedOperator | C.RegularOperator> functionName];
if (res) {
return res;
const extensionFinder: ((functionNamedNode: RDF.NamedNode) =>
Promise<((args: RDF.Term[]) => Promise<RDF.Term>) | undefined>) | undefined =
context.get(KeysInitQuery.extensionFunctionCreator);
if (extensionFinder) {
const definition = await extensionFinder(new DataFactory<RDF.Quad>().namedNode(functionName));
if (definition) {
return new NamedExtension(definition);
}
}
const extensionMap: Record<string, (args: RDF.Term[]) => Promise<RDF.Term>> | undefined =
context.get(KeysInitQuery.extensionFunctions);
if (extensionMap) {
const definition = extensionMap[functionName];
if (definition) {
return new NamedExtension(definition);
}
}
throw new Error('nah!');
};

public readonly orderByBus: OrderByBus = async({ context }) =>
new OrderByEvaluator(new ContextualizedEvaluator({
now: context.get(KeysInitQuery.queryTimestamp),
baseIRI: context.get(KeysInitQuery.baseIRI),
functionArgumentsCache: context.get(KeysInitQuery.functionArgumentsCache),
actionContext: context,
mediatorQueryOperation: this.mediatorQueryOperation,
mediatorFunction: this.functionsBus,
}),
<ITermFunction> await this.functionsBus({ functionName: RegularOperator.EQUAL, context, definitionType: 'onTerm' }),
<ITermFunction> await this.functionsBus({ functionName: RegularOperator.LT, context, definitionType: 'onTerm' }));

public constructor(args: IExpressionEvaluatorFactoryArgs) {
this.mediatorBindingsAggregatorFactory = args.mediatorBindingsAggregatorFactory;
this.mediatorQueryOperation = args.mediatorQueryOperation;
Expand All @@ -62,12 +81,11 @@ export class ExpressionEvaluatorFactory implements IExpressionEvaluatorFactory {
actionContext: context,
mediatorQueryOperation: this.mediatorQueryOperation,
mediatorFunction: this.functionsBus,
mediatorTermFunction: this.termFunctionsBus,
...legacyContext,
});
return new ExpressionEvaluator(defContextEval, await defContextEval.translate(algExpr));
return new ExpressionEvaluator(defContextEval, await defContextEval.transformer.transformAlgebra(algExpr));
}
return new ExpressionEvaluator(contextEval, await contextEval.translate(algExpr));
return new ExpressionEvaluator(contextEval, await contextEval.transformer.transformAlgebra(algExpr));
}

public async createAggregator(algExpr: Alg.AggregateExpression, context: IActionContext):
Expand All @@ -79,33 +97,36 @@ export class ExpressionEvaluatorFactory implements IExpressionEvaluatorFactory {
})).aggregator;
}

public createTermFunction(arg: { functionName: string; arguments?: E.TermExpression[] }):
Promise<TermSparqlFunction<any>> {
return this.termFunctionsBus(arg);
}

public createFunction(arg: { functionName: string; arguments: Alg.Expression[] }): Promise<SparqlFunction> {
return this.functionsBus(arg);
}
public createFunction = this.functionsBus;

public async createOrderByEvaluator(context: IActionContext, legacyContext: Partial<IAsyncEvaluatorContext> = {}):
public async createOrderByEvaluator(context: IActionContext):
Promise<IOrderByEvaluator> {
return new OrderByEvaluator({
now: context.get(KeysInitQuery.queryTimestamp),
baseIRI: context.get(KeysInitQuery.baseIRI),
functionArgumentsCache: context.get(KeysInitQuery.functionArgumentsCache),
actionContext: context,
mediatorQueryOperation: this.mediatorQueryOperation,
mediatorFunction: this.functionsBus,
mediatorTermFunction: this.termFunctionsBus,
...legacyContext,
},
await this.createTermFunction({ functionName: RegularOperator.EQUAL }),
await this.createTermFunction({ functionName: RegularOperator.LT }));
return this.orderByBus({ context });
}
}

interface IExpressionEvaluatorFactoryArgs {
mediatorBindingsAggregatorFactory: MediatorBindingsAggregatorFactory;
mediatorQueryOperation: MediatorQueryOperation;
}

// TODO: this thing will be it's own actor but it's just a little special.
// It will also be the only consumer of the context items:
// KeysInitQuery.extensionFunctions and KeysInitQuery.extensionFunctionCreator
class NamedExtension extends FunctionDefinition {
// TODO: the context should be checked in the test part of the actor.
// The fact that this can be done is async now is a nice feature!
// It means that named function definitions could be queried over the web!
// TODO: when all is done, this should be injected in some way!
protected arity = Number.POSITIVE_INFINITY;
public constructor(private readonly functionDefinition: AsyncExtensionFunction) {
super();
}

public apply = async({ args, exprEval, mapping }: IEvalContext): Promise<E.TermExpression> => {
const evaluatedArgs: E.Term[] = await Promise.all(args.map(arg => exprEval.evaluateAsInternal(arg, mapping)));
return exprEval.transformer.transformRDFTermUnsafe(
await this.functionDefinition(evaluatedArgs.map(term => term.toRDF())),
);
};
}
Loading

0 comments on commit a1734f8

Please sign in to comment.