Skip to content

Commit

Permalink
Test submitting a batch of calls
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Jun 4, 2024
1 parent ad8ddb4 commit b907024
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 64 deletions.
43 changes: 43 additions & 0 deletions dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,46 @@ func TestDispatchCalls(t *testing.T) {
},
})
}

func TestDispatchCallsBatch(t *testing.T) {
var recorder dispatchtest.CallRecorder

server := dispatchtest.NewDispatchServer(&recorder)

d := &dispatch.Dispatch{
EndpointUrl: "http://example.com",
Client: dispatch.Client{ApiKey: "foobar", ApiUrl: server.URL},
}

fn1 := dispatch.NewPrimitiveFunction("function1", func(ctx context.Context, req *sdkv1.RunRequest) *sdkv1.RunResponse {
panic("not implemented")
})
fn2 := dispatch.NewFunction("function2", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
panic("not implemented")
})

d.Register(fn1)
d.Register(fn2)

call1, err := fn1.BuildCall(wrapperspb.Int32(11), dispatch.WithExpiration(10*time.Second))
if err != nil {
t.Fatal(err)
}
call2, err := fn2.BuildCall(wrapperspb.String("foo"), dispatch.WithVersion("xyzzy"))
if err != nil {
t.Fatal(err)
}

batch := d.Batch()
batch.Add(call1, call2)
if _, err := batch.Dispatch(context.Background()); err != nil {
t.Fatal(err)
}

dispatchtest.AssertDispatchRequests(t, recorder.Requests, []dispatchtest.DispatchRequest{
{
ApiKey: "foobar",
Calls: []dispatch.Call{call1, call2},
},
})
}
12 changes: 6 additions & 6 deletions dispatchlambda/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func TestHandlerEmptyPayload(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, nil
})
h := dispatchlambda.Handler(fn)
Expand All @@ -26,7 +26,7 @@ func TestHandlerEmptyPayload(t *testing.T) {
}

func TestHandlerShortPayload(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, nil
})
h := dispatchlambda.Handler(fn)
Expand All @@ -35,7 +35,7 @@ func TestHandlerShortPayload(t *testing.T) {
}

func TestHandlerNonBase64Payload(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, nil
})
h := dispatchlambda.Handler(fn)
Expand All @@ -44,7 +44,7 @@ func TestHandlerNonBase64Payload(t *testing.T) {
}

func TestHandlerInvokePayloadNotProtobufMessage(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, nil
})
h := dispatchlambda.Handler(fn)
Expand All @@ -56,7 +56,7 @@ func TestHandlerInvokePayloadNotProtobufMessage(t *testing.T) {
}

func TestHandlerInvokeError(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, errors.New("invoke error")
})
h := dispatchlambda.Handler(fn)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestHandlerInvokeError(t *testing.T) {
}

func TestHandlerInvokeFunction(t *testing.T) {
fn := dispatch.NewGenericFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("handler", func(ctx context.Context, input *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return wrapperspb.String("output"), nil
})
h := dispatchlambda.Handler(fn)
Expand Down
110 changes: 55 additions & 55 deletions function.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,64 +27,13 @@ type Function interface {
register(endpoint string, client *Client)
}

// NewPrimitiveFunction creates a PrimitiveFunction.
func NewPrimitiveFunction(name string, fn func(context.Context, *sdkv1.RunRequest) *sdkv1.RunResponse) *PrimitiveFunction {
return &PrimitiveFunction{name: name, fn: fn}
}

// PrimitiveFunction is a function that's close to the underlying
// Dispatch protocol, accepting a RunRequest and returning a RunResponse.
type PrimitiveFunction struct {
name string
fn func(context.Context, *sdkv1.RunRequest) *sdkv1.RunResponse

endpoint string
client *Client
}

// Name is the name of the function.
func (f *PrimitiveFunction) Name() string {
return f.name
}

// Run runs the function.
func (f *PrimitiveFunction) Run(ctx context.Context, req *sdkv1.RunRequest) *sdkv1.RunResponse {
return f.fn(ctx, req)
}

// BuildCall constructs a call for the function.
func (f *PrimitiveFunction) BuildCall(input proto.Message, opts ...CallOption) (Call, error) {
if f.endpoint == "" {
return Call{}, fmt.Errorf("cannot build function call: endpoint has not been registered")
}
return NewCall(f.endpoint, f.name, input, opts...)
}

// Dispatch dispatches a call to the function.
func (f *PrimitiveFunction) Dispatch(ctx context.Context, input proto.Message, opts ...CallOption) (ID, error) {
if f.client == nil {
return "", fmt.Errorf("cannot dispatch function call: client has not been registered")
}
call, err := f.BuildCall(input, opts...)
if err != nil {
return "", err
}
return f.client.Dispatch(ctx, call)
}

// register registers an endpoint and client.
func (f *PrimitiveFunction) register(endpoint string, client *Client) {
f.endpoint = endpoint
f.client = client
}

// NewGenericFunction creates a GenericFunction.
func NewGenericFunction[Input, Output proto.Message](name string, fn func(context.Context, Input) (Output, error)) *GenericFunction[Input, Output] {
// NewFunction creates a Dispatch function.
func NewFunction[Input, Output proto.Message](name string, fn func(context.Context, Input) (Output, error)) *GenericFunction[Input, Output] {
return &GenericFunction[Input, Output]{name: name, fn: fn}
}

// GenericFunction is a higher level Dispatch function that accepts
// arbitrary input and returns arbitrary output.
// GenericFunction is a Dispatch function that accepts arbitrary input
// and returns arbitrary output.
type GenericFunction[Input, Output proto.Message] struct {
name string
fn func(ctx context.Context, input Input) (Output, error)
Expand Down Expand Up @@ -222,3 +171,54 @@ func (f *GenericFunction[Input, Output]) entrypoint(input Input) func() any {
}
}
}

// NewPrimitiveFunction creates a PrimitiveFunction.
func NewPrimitiveFunction(name string, fn func(context.Context, *sdkv1.RunRequest) *sdkv1.RunResponse) *PrimitiveFunction {
return &PrimitiveFunction{name: name, fn: fn}
}

// PrimitiveFunction is a function that's close to the underlying
// Dispatch protocol, accepting a RunRequest and returning a RunResponse.
type PrimitiveFunction struct {
name string
fn func(context.Context, *sdkv1.RunRequest) *sdkv1.RunResponse

endpoint string
client *Client
}

// Name is the name of the function.
func (f *PrimitiveFunction) Name() string {
return f.name
}

// Run runs the function.
func (f *PrimitiveFunction) Run(ctx context.Context, req *sdkv1.RunRequest) *sdkv1.RunResponse {
return f.fn(ctx, req)
}

// BuildCall constructs a call for the function.
func (f *PrimitiveFunction) BuildCall(input proto.Message, opts ...CallOption) (Call, error) {
if f.endpoint == "" {
return Call{}, fmt.Errorf("cannot build function call: endpoint has not been registered")
}
return NewCall(f.endpoint, f.name, input, opts...)
}

// Dispatch dispatches a call to the function.
func (f *PrimitiveFunction) Dispatch(ctx context.Context, input proto.Message, opts ...CallOption) (ID, error) {
if f.client == nil {
return "", fmt.Errorf("cannot dispatch function call: client has not been registered")
}
call, err := f.BuildCall(input, opts...)
if err != nil {
return "", err
}
return f.client.Dispatch(ctx, call)
}

// register registers an endpoint and client.
func (f *PrimitiveFunction) register(endpoint string, client *Client) {
f.endpoint = endpoint
f.client = client
}
6 changes: 3 additions & 3 deletions function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestFunctionRunInvalidCoroutineType(t *testing.T) {
fn := dispatch.NewGenericFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, nil
})

Expand All @@ -25,7 +25,7 @@ func TestFunctionRunInvalidCoroutineType(t *testing.T) {
func TestFunctionRunError(t *testing.T) {
oops := errors.New("oops")

fn := dispatch.NewGenericFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return nil, oops
})

Expand Down Expand Up @@ -55,7 +55,7 @@ func TestFunctionRunError(t *testing.T) {
}

func TestFunctionRunResult(t *testing.T) {
fn := dispatch.NewGenericFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
fn := dispatch.NewFunction("foo", func(ctx context.Context, req *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
return wrapperspb.String("world"), nil
})

Expand Down

0 comments on commit b907024

Please sign in to comment.