Skip to content

Commit

Permalink
Merge pull request #3 from dispatchrun/dispatchserver
Browse files Browse the repository at this point in the history
dispatchserver package
  • Loading branch information
chriso authored Jun 10, 2024
2 parents 1831a3a + dc653d6 commit 3111a8d
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 149 deletions.
15 changes: 8 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dispatch_test

import (
"context"
"net/http"
"testing"

"github.com/dispatchrun/dispatch-go"
Expand All @@ -10,7 +11,7 @@ import (

func TestClient(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand All @@ -25,14 +26,14 @@ func TestClient(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
})
}

func TestClientEnvConfig(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.Env(
"DISPATCH_API_KEY=foobar",
Expand All @@ -50,14 +51,14 @@ func TestClientEnvConfig(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
})
}

func TestClientBatch(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand Down Expand Up @@ -86,11 +87,11 @@ func TestClientBatch(t *testing.T) {

recorder.Assert(t,
dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call1, call2},
},
dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call3, call4},
})
}
Expand Down
13 changes: 7 additions & 6 deletions dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dispatch_test

import (
"context"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestDispatchEndpoint(t *testing.T) {

func TestDispatchCall(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand All @@ -95,7 +96,7 @@ func TestDispatchCall(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{
dispatch.NewCall("http://example.com", "function1",
dispatch.Input(dispatch.Int(11)),
Expand All @@ -106,7 +107,7 @@ func TestDispatchCall(t *testing.T) {

func TestDispatchCallEnvConfig(t *testing.T) {
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)
server := dispatchtest.NewServer(recorder)

endpoint, err := dispatch.New(dispatch.Env(
"DISPATCH_ENDPOINT_URL=http://example.com",
Expand All @@ -128,7 +129,7 @@ func TestDispatchCallEnvConfig(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{
dispatch.NewCall("http://example.com", "function2",
dispatch.Input(dispatch.String("foo")),
Expand All @@ -140,7 +141,7 @@ func TestDispatchCallEnvConfig(t *testing.T) {
func TestDispatchCallsBatch(t *testing.T) {
var recorder dispatchtest.CallRecorder

server := dispatchtest.NewDispatchServer(&recorder)
server := dispatchtest.NewServer(&recorder)

client, err := dispatch.NewClient(dispatch.APIKey("foobar"), dispatch.APIUrl(server.URL))
if err != nil {
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestDispatchCallsBatch(t *testing.T) {
}

recorder.Assert(t, dispatchtest.DispatchRequest{
ApiKey: "foobar",
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call1, call2},
})
}
Expand Down
108 changes: 108 additions & 0 deletions dispatchserver/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package dispatchserver

import (
"context"
"crypto/ed25519"
"net/http"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/internal/auth"
)

// EndpointClient is a client for a Dispatch endpoint.
//
// Note that this is not the same as dispatch.Client, which
// is a client for the Dispatch API. The client here is used
// by a Dispatch server to interact with the functions provided
// by a Dispatch endpoint.
type EndpointClient struct {
httpClient connect.HTTPClient
signingKey ed25519.PrivateKey
header http.Header
opts []connect.ClientOption

client sdkv1connect.FunctionServiceClient
}

// NewEndpointClient creates an EndpointClient.
func NewEndpointClient(endpointUrl string, opts ...EndpointClientOption) (*EndpointClient, error) {
c := &EndpointClient{}
for _, opt := range opts {
opt(c)
}

if c.httpClient == nil {
c.httpClient = http.DefaultClient
}

// Setup request signing.
if c.signingKey != nil {
signer := auth.NewSigner(c.signingKey)
c.httpClient = signer.Client(c.httpClient)
}

// Setup the gRPC client.
validator, err := validate.NewInterceptor()
if err != nil {
return nil, err
}
c.opts = append(c.opts, connect.WithInterceptors(validator))
c.client = sdkv1connect.NewFunctionServiceClient(c.httpClient, endpointUrl, c.opts...)

return c, nil
}

// EndpointClientOption configures an EndpointClient.
type EndpointClientOption func(*EndpointClient)

// SigningKey sets the signing key to use when signing requests bound
// for the endpoint.
//
// By default the EndpointClient does not sign requests to the endpoint.
func SigningKey(signingKey ed25519.PrivateKey) EndpointClientOption {
return func(c *EndpointClient) { c.signingKey = signingKey }
}

// HTTPClient sets the HTTP client to use when making requests to the endpoint.
//
// By default http.DefaultClient is used.
func HTTPClient(client connect.HTTPClient) EndpointClientOption {
return func(c *EndpointClient) { c.httpClient = client }
}

// RequestHeaders sets headers on the request to the endpoint.
func RequestHeaders(header http.Header) EndpointClientOption {
return func(c *EndpointClient) { c.header = header }
}

// ClientOptions adds options for the underlying connect (gRPC) client.
func ClientOptions(opts ...connect.ClientOption) EndpointClientOption {
return func(c *EndpointClient) { c.opts = append(c.opts, opts...) }
}

// Run sends a RunRequest and returns a RunResponse.
func (c *EndpointClient) Run(ctx context.Context, req dispatch.Request) (dispatch.Response, error) {
connectReq := connect.NewRequest(requestProto(req))

header := connectReq.Header()
for name, values := range c.header {
header[name] = values
}

res, err := c.client.Run(ctx, connectReq)
if err != nil {
return dispatch.Response{}, err
}
return newProtoResponse(res.Msg), nil
}

//go:linkname newProtoResponse github.com/dispatchrun/dispatch-go.newProtoResponse
func newProtoResponse(r *sdkv1.RunResponse) dispatch.Response

//go:linkname requestProto github.com/dispatchrun/dispatch-go.requestProto
func requestProto(r dispatch.Request) *sdkv1.RunRequest
90 changes: 90 additions & 0 deletions dispatchserver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package dispatchserver

import (
"context"
"net/http"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go"
)

// Handler handles requests to a Dispatch API server.
type Handler interface {
Handle(ctx context.Context, header http.Header, calls []dispatch.Call) ([]dispatch.ID, error)
}

// HandlerFunc creates a Handler from a function.
func HandlerFunc(fn func(context.Context, http.Header, []dispatch.Call) ([]dispatch.ID, error)) Handler {
return handlerFunc(fn)
}

type handlerFunc func(context.Context, http.Header, []dispatch.Call) ([]dispatch.ID, error)

func (h handlerFunc) Handle(ctx context.Context, header http.Header, calls []dispatch.Call) ([]dispatch.ID, error) {
return h(ctx, header, calls)
}

// New creates a Server.
func New(handler Handler, opts ...connect.HandlerOption) (*Server, error) {
validator, err := validate.NewInterceptor()
if err != nil {
return nil, err
}
opts = append(opts, connect.WithInterceptors(validator))
grpcHandler := &dispatchServiceHandler{handler}
path, httpHandler := sdkv1connect.NewDispatchServiceHandler(grpcHandler, opts...)
return &Server{
path: path,
handler: httpHandler,
}, nil
}

// Server is a Dispatch API server.
type Server struct {
path string
handler http.Handler
}

// Handler returns an HTTP handler for the Dispatch API server, along with
// the path that the handler should be registered at.
func (s *Server) Handler() (string, http.Handler) {
return s.path, s.handler
}

// Serve serves the Server on the specified address.
func (s *Server) Serve(addr string) error {
mux := http.NewServeMux()
mux.Handle(s.Handler())
server := &http.Server{Addr: addr, Handler: mux}
return server.ListenAndServe()
}

type dispatchServiceHandler struct{ Handler }

func (d *dispatchServiceHandler) Dispatch(ctx context.Context, req *connect.Request[sdkv1.DispatchRequest]) (*connect.Response[sdkv1.DispatchResponse], error) {
calls := make([]dispatch.Call, len(req.Msg.Calls))
for i, c := range req.Msg.Calls {
calls[i] = newProtoCall(c)
}
ids, err := d.Handle(ctx, req.Header(), calls)
if err != nil {
return nil, err
}
if len(ids) != len(calls) {
panic("invalid handler response")
}
dispatchIDs := make([]string, len(ids))
for i, id := range ids {
dispatchIDs[i] = string(id)
}
return connect.NewResponse(&sdkv1.DispatchResponse{
DispatchIds: dispatchIDs,
}), nil
}

//go:linkname newProtoCall github.com/dispatchrun/dispatch-go.newProtoCall
func newProtoCall(c *sdkv1.Call) dispatch.Call
Loading

0 comments on commit 3111a8d

Please sign in to comment.