Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatchserver package #3

Merged
merged 8 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dispatch_test

import (
"context"
"net/http"
"testing"

"github.com/dispatchrun/dispatch-go"
Expand All @@ -10,7 +11,7 @@ import (

func TestClient(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand All @@ -25,14 +26,14 @@ func TestClient(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
})
}

func TestClientEnvConfig(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.Env(
"DISPATCH_API_KEY=foobar",
Expand All @@ -50,14 +51,14 @@ func TestClientEnvConfig(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
})
}

func TestClientBatch(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand Down Expand Up @@ -86,11 +87,11 @@ func TestClientBatch(t *testing.T) {

recorder.Assert(t,
dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call1, call2},
},
dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call3, call4},
})
}
Expand Down
13 changes: 7 additions & 6 deletions dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dispatch_test

import (
"context"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestDispatchEndpoint(t *testing.T) {

func TestDispatchCall(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand All @@ -95,7 +96,7 @@ func TestDispatchCall(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{
dispatch.NewCall("http://example.com", "function1",
dispatch.Input(dispatch.Int(11)),
Expand All @@ -106,7 +107,7 @@ func TestDispatchCall(t *testing.T) {

func TestDispatchCallEnvConfig(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

endpoint, err := dispatch.New(dispatch.Env(
"DISPATCH_ENDPOINT_URL=http://example.com",
Expand All @@ -128,7 +129,7 @@ func TestDispatchCallEnvConfig(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{
dispatch.NewCall("http://example.com", "function2",
dispatch.Input(dispatch.String("foo")),
Expand All @@ -140,7 +141,7 @@ func TestDispatchCallEnvConfig(t *testing.T) {
func TestDispatchCallsBatch(t *testing.T) {
var recorder dispatchtest.CallRecorder

server := dispatchtest.NewDispatchServer(&recorder)
server := dispatchtest.NewServer(&recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestDispatchCallsBatch(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call1, call2},
})
}
Expand Down
108 changes: 108 additions & 0 deletions dispatchserver/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package dispatchserver

import (
"context"
"crypto/ed25519"
"net/http"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/internal/auth"
)

// EndpointClient is a client for a Dispatch endpoint.
//
// Note that this is not the same as dispatch.Client, which
// is a client for the Dispatch API. The client here is used
// by a Dispatch server to interact with the functions provided
// by a Dispatch endpoint.
type EndpointClient struct {
httpClient connect.HTTPClient
signingKey ed25519.PrivateKey
header http.Header
opts []connect.ClientOption

client sdkv1connect.FunctionServiceClient
}

// NewEndpointClient creates an EndpointClient.
func NewEndpointClient(endpointUrl string, opts ...EndpointClientOption) (*EndpointClient, error) {
c := &EndpointClient{}
for _, opt := range opts {
opt(c)
}

if c.httpClient == nil {
c.httpClient = http.DefaultClient
}

// Setup request signing.
if c.signingKey != nil {
signer := auth.NewSigner(c.signingKey)
c.httpClient = signer.Client(c.httpClient)
}

// Setup the gRPC client.
validator, err := validate.NewInterceptor()
if err != nil {
return nil, err
}
c.opts = append(c.opts, connect.WithInterceptors(validator))
c.client = sdkv1connect.NewFunctionServiceClient(c.httpClient, endpointUrl, c.opts...)

return c, nil
}

// EndpointClientOption configures an EndpointClient.
type EndpointClientOption func(*EndpointClient)

// SigningKey sets the signing key to use when signing requests bound
// for the endpoint.
//
// By default the EndpointClient does not sign requests to the endpoint.
func SigningKey(signingKey ed25519.PrivateKey) EndpointClientOption {
return func(c *EndpointClient) { c.signingKey = signingKey }
}

// HTTPClient sets the HTTP client to use when making requests to the endpoint.
//
// By default http.DefaultClient is used.
func HTTPClient(client connect.HTTPClient) EndpointClientOption {
return func(c *EndpointClient) { c.httpClient = client }
}

// RequestHeaders sets headers on the request to the endpoint.
func RequestHeaders(header http.Header) EndpointClientOption {
return func(c *EndpointClient) { c.header = header }
}

// ClientOptions adds options for the underlying connect (gRPC) client.
func ClientOptions(opts ...connect.ClientOption) EndpointClientOption {
return func(c *EndpointClient) { c.opts = append(c.opts, opts...) }
}

// Run sends a RunRequest and returns a RunResponse.
func (c *EndpointClient) Run(ctx context.Context, req dispatch.Request) (dispatch.Response, error) {
connectReq := connect.NewRequest(requestProto(req))

header := connectReq.Header()
for name, values := range c.header {
header[name] = values
}

res, err := c.client.Run(ctx, connectReq)
if err != nil {
return dispatch.Response{}, err
}
return newProtoResponse(res.Msg), nil
}

//go:linkname newProtoResponse github.com/dispatchrun/dispatch-go.newProtoResponse
func newProtoResponse(r *sdkv1.RunResponse) dispatch.Response

//go:linkname requestProto github.com/dispatchrun/dispatch-go.requestProto
func requestProto(r dispatch.Request) *sdkv1.RunRequest
90 changes: 90 additions & 0 deletions dispatchserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package dispatchserver

import (
"context"
"net/http"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go"
)

// Handler handles requests to a Dispatch API server.
type Handler interface {
Handle(ctx context.Context, header http.Header, calls []dispatch.Call) ([]dispatch.ID, error)
}

// HandlerFunc creates a Handler from a function.
func HandlerFunc(fn func(context.Context, http.Header, []dispatch.Call) ([]dispatch.ID, error)) Handler {
return handlerFunc(fn)
}

type handlerFunc func(context.Context, http.Header, []dispatch.Call) ([]dispatch.ID, error)

func (h handlerFunc) Handle(ctx context.Context, header http.Header, calls []dispatch.Call) ([]dispatch.ID, error) {
return h(ctx, header, calls)
}

// New creates a Server.
func New(handler Handler, opts ...connect.HandlerOption) (*Server, error) {
validator, err := validate.NewInterceptor()
if err != nil {
return nil, err
}
opts = append(opts, connect.WithInterceptors(validator))
grpcHandler := &dispatchServiceHandler{handler}
path, httpHandler := sdkv1connect.NewDispatchServiceHandler(grpcHandler, opts...)
return &Server{
path: path,
handler: httpHandler,
}, nil
}

// Server is a Dispatch API server.
type Server struct {
path string
handler http.Handler
}

// Handler returns an HTTP handler for the Dispatch API server, along with
// the path that the handler should be registered at.
func (s *Server) Handler() (string, http.Handler) {
return s.path, s.handler
}

// Serve serves the Server on the specified address.
func (s *Server) Serve(addr string) error {
mux := http.NewServeMux()
mux.Handle(s.Handler())
server := &http.Server{Addr: addr, Handler: mux}
return server.ListenAndServe()
}

type dispatchServiceHandler struct{ Handler }

func (d *dispatchServiceHandler) Dispatch(ctx context.Context, req *connect.Request[sdkv1.DispatchRequest]) (*connect.Response[sdkv1.DispatchResponse], error) {
calls := make([]dispatch.Call, len(req.Msg.Calls))
for i, c := range req.Msg.Calls {
calls[i] = newProtoCall(c)
}
ids, err := d.Handle(ctx, req.Header(), calls)
if err != nil {
return nil, err
}
if len(ids) != len(calls) {
panic("invalid handler response")
}
dispatchIDs := make([]string, len(ids))
for i, id := range ids {
dispatchIDs[i] = string(id)
}
return connect.NewResponse(&sdkv1.DispatchResponse{
DispatchIds: dispatchIDs,
}), nil
}

//go:linkname newProtoCall github.com/dispatchrun/dispatch-go.newProtoCall
func newProtoCall(c *sdkv1.Call) dispatch.Call
Loading
Loading