Skip to content

Commit

Permalink
Simplify client usage
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Jun 4, 2024
1 parent b907024 commit 5bbb835
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 101 deletions.
171 changes: 83 additions & 88 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,132 +6,118 @@ import (
"net/http"
"os"
"strings"
"sync"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
)

// DefaultApiUrl is the default Dispatch API URL.
const DefaultApiUrl = "https://api.dispatch.run"

// Client is a client for Dispatch.
// Client is a client for the Dispatch API.
//
// The Client can be used to dispatch function calls.
type Client struct {
// ApiKey is the Dispatch API key to use for authentication when
// dispatching calls to functions. If omitted, the the value of the
// DISPATCH_API_KEY environment variable is used.
ApiKey string

// ApiUrl is the URL of the Dispatch API to use when dispatching calls
// to functions. If omitted, the value of the DISPATCH_API_URL
// environment variable is used. If both are unset/empty, the default URL
// (DefaultApiUrl) is used.
ApiUrl string

// Env are environment variables to parse configuration from.
// If nil, environment variables are read from os.Environ().
Env []string

// Client is the HTTP client to use when making requests to Dispatch.
// If nil, http.DefaultClient is used.
Client *http.Client
apiKey string
apiUrl string
env []string
httpClient *http.Client

client sdkv1connect.DispatchServiceClient
err error
mu sync.Mutex
}

// Dispatch dispatches a function call.
func (c *Client) Dispatch(ctx context.Context, call Call) (ID, error) {
batch := c.Batch()
batch.Add(call)
ids, err := batch.Dispatch(ctx)
if err != nil {
return "", err
// NewClient creates a Client.
func NewClient(opts ...ClientOption) (*Client, error) {
c := &Client{env: os.Environ()}
for _, opt := range opts {
opt(c)
}
return ids[0], nil
}

// Batch creates a Batch.
func (c *Client) Batch() Batch {
return Batch{client: c}
}

func (c *Client) dispatchClient() (sdkv1connect.DispatchServiceClient, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.err != nil {
return nil, c.err
if c.apiKey == "" {
c.apiKey, _ = getenv(c.env, "DISPATCH_API_KEY")
}
if c.client != nil {
return c.client, nil
if c.apiKey == "" {
return nil, fmt.Errorf("API key has not been set. Use WithAPIKey(..), or set the DISPATCH_API_KEY environment variable")
}

apiKey := c.ApiKey
if apiKey == "" {
envApiKey, ok := getenv(c.Env, "DISPATCH_API_KEY")
if !ok || envApiKey == "" {
c.err = fmt.Errorf("Dispatch API key not found. Check DISPATCH_API_KEY")
return nil, c.err
}
apiKey = envApiKey
if c.apiUrl == "" {
c.apiUrl, _ = getenv(c.env, "DISPATCH_API_URL")
}
if c.apiUrl == "" {
c.apiUrl = DefaultApiUrl
}

apiUrl := c.ApiUrl
if apiUrl == "" {
envApiUrl, ok := getenv(c.Env, "DISPATCH_API_URL")
if ok && envApiUrl != "" {
apiUrl = envApiUrl
} else {
apiUrl = DefaultApiUrl
}
if c.httpClient == nil {
c.httpClient = http.DefaultClient
}

authenticatingInterceptor := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
authorization := "Bearer " + c.apiKey
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
req.Header().Add("Authorization", "Bearer "+apiKey)
req.Header().Add("Authorization", authorization)
return next(ctx, req)
}
})

validatingInterceptor, err := validate.NewInterceptor()
if err != nil {
c.err = err
return nil, err
}

c.client = sdkv1connect.NewDispatchServiceClient(c.httpClient(), apiUrl,
c.client = sdkv1connect.NewDispatchServiceClient(c.httpClient, c.apiUrl,
connect.WithInterceptors(validatingInterceptor, authenticatingInterceptor))

return c.client, nil
return c, nil
}

func (c *Client) httpClient() *http.Client {
if c.Client != nil {
return c.Client
}
return http.DefaultClient
// ClientOption configures a Client.
type ClientOption func(*Client)

// WithAPIKey sets the Dispatch API key to use for authentication when
// dispatching function calls through a Client.
//
// It defaults to the value of the DISPATCH_API_KEY environment variable.
func WithAPIKey(apiKey string) ClientOption {
return func(c *Client) { c.apiKey = apiKey }
}

func getenv(env []string, name string) (string, bool) {
if env == nil {
env = os.Environ()
}
for _, s := range env {
n, v, ok := strings.Cut(s, "=")
if ok && n == name {
return v, true
}
// WithAPIUrl sets the URL of the Dispatch API.
//
// It defaults to the value of the DISPATCH_API_URL environment variable,
// or DefaultApiUrl if DISPATCH_API_URL is unset.
func WithAPIUrl(apiUrl string) ClientOption {
return func(c *Client) { c.apiUrl = apiUrl }
}

// DefaultApiUrl is the default Dispatch API URL.
const DefaultApiUrl = "https://api.dispatch.run"

// WithClientEnv sets the environment variables that a Client parses
// default configuration from.
//
// It defaults to os.Environ().
func WithClientEnv(env []string) ClientOption {
return func(c *Client) { c.env = env }
}

// Dispatch dispatches a function call.
func (c *Client) Dispatch(ctx context.Context, call Call) (ID, error) {
batch := c.Batch()
batch.Add(call)
ids, err := batch.Dispatch(ctx)
if err != nil {
return "", err
}
return "", false
return ids[0], nil
}

// Batch creates a Batch.
func (c *Client) Batch() Batch {
return Batch{client: c.client}
}

// Batch is used to submit a batch of function calls to Dispatch.
type Batch struct {
client *Client
client sdkv1connect.DispatchServiceClient

calls []*sdkv1.Call
}
Expand All @@ -150,14 +136,23 @@ func (b *Batch) Add(calls ...Call) {

// Dispatch dispatches the batch of function calls.
func (b *Batch) Dispatch(ctx context.Context) ([]ID, error) {
client, err := b.client.dispatchClient()
if err != nil {
return nil, err
}
req := connect.NewRequest(&sdkv1.DispatchRequest{Calls: b.calls})
res, err := client.Dispatch(ctx, req)
res, err := b.client.Dispatch(ctx, req)
if err != nil {
return nil, err
}
return res.Msg.DispatchIds, nil
}

func getenv(env []string, name string) (string, bool) {
if env == nil {
env = os.Environ()
}
for _, s := range env {
n, v, ok := strings.Cut(s, "=")
if ok && n == name {
return v, true
}
}
return "", false
}
36 changes: 26 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ func TestClient(t *testing.T) {

server := dispatchtest.NewDispatchServer(&recorder)

client := &dispatch.Client{ApiKey: "foobar", ApiUrl: server.URL}
client, err := dispatch.NewClient(dispatch.WithAPIKey("foobar"), dispatch.WithAPIUrl(server.URL))
if err != nil {
t.Fatal(err)
}

call, err := dispatch.NewCall("http://example.com", "function1", wrapperspb.Int32(11))
if err != nil {
Expand All @@ -35,14 +38,16 @@ func TestClient(t *testing.T) {
}

func TestClientEnvConfig(t *testing.T) {
var recorder dispatchtest.CallRecorder
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)

server := dispatchtest.NewDispatchServer(&recorder)

client := &dispatch.Client{Env: []string{
client, err := dispatch.NewClient(dispatch.WithClientEnv([]string{
"DISPATCH_API_KEY=foobar",
"DISPATCH_API_URL=" + server.URL,
}}
}))
if err != nil {
t.Fatal(err)
}

call, err := dispatch.NewCall("http://example.com", "function1", wrapperspb.Int32(11))
if err != nil {
Expand All @@ -63,11 +68,13 @@ func TestClientEnvConfig(t *testing.T) {
}

func TestClientBatch(t *testing.T) {
var recorder dispatchtest.CallRecorder

server := dispatchtest.NewDispatchServer(&recorder)
recorder := &dispatchtest.CallRecorder{}
server := dispatchtest.NewDispatchServer(recorder)

client := &dispatch.Client{ApiKey: "foobar", ApiUrl: server.URL}
client, err := dispatch.NewClient(dispatch.WithAPIKey("foobar"), dispatch.WithAPIUrl(server.URL))
if err != nil {
t.Fatal(err)
}

call1, err := dispatch.NewCall("http://example.com", "function1", wrapperspb.Int32(11))
if err != nil {
Expand Down Expand Up @@ -112,3 +119,12 @@ func TestClientBatch(t *testing.T) {
},
})
}

func TestClientNoAPIKey(t *testing.T) {
_, err := dispatch.NewClient(dispatch.WithClientEnv(nil))
if err == nil {
t.Fatalf("expected an error")
} else if err.Error() != "API key has not been set. Use WithAPIKey(..), or set the DISPATCH_API_KEY environment variable" {
t.Errorf("unexpected error: %v", err)
}
}
16 changes: 13 additions & 3 deletions dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,14 @@ func TestDispatchCalls(t *testing.T) {

server := dispatchtest.NewDispatchServer(&recorder)

client, err := dispatch.NewClient(dispatch.WithAPIKey("foobar"), dispatch.WithAPIUrl(server.URL))
if err != nil {
t.Fatal(err)
}

d := &dispatch.Dispatch{
EndpointUrl: "http://example.com",
Client: dispatch.Client{ApiKey: "foobar", ApiUrl: server.URL},
Client: *client, // FIXME
}

fn := dispatch.NewPrimitiveFunction("function1", func(ctx context.Context, req *sdkv1.RunRequest) *sdkv1.RunResponse {
Expand All @@ -130,7 +135,7 @@ func TestDispatchCalls(t *testing.T) {

d.Register(fn)

_, err := fn.Dispatch(context.Background(), wrapperspb.Int32(11), dispatch.WithExpiration(10*time.Second))
_, err = fn.Dispatch(context.Background(), wrapperspb.Int32(11), dispatch.WithExpiration(10*time.Second))
if err != nil {
t.Fatal(err)
}
Expand All @@ -153,9 +158,14 @@ func TestDispatchCallsBatch(t *testing.T) {

server := dispatchtest.NewDispatchServer(&recorder)

client, err := dispatch.NewClient(dispatch.WithAPIKey("foobar"), dispatch.WithAPIUrl(server.URL))
if err != nil {
t.Fatal(err)
}

d := &dispatch.Dispatch{
EndpointUrl: "http://example.com",
Client: dispatch.Client{ApiKey: "foobar", ApiUrl: server.URL},
Client: *client, // FIXME
}

fn1 := dispatch.NewPrimitiveFunction("function1", func(ctx context.Context, req *sdkv1.RunRequest) *sdkv1.RunResponse {
Expand Down

0 comments on commit 5bbb835

Please sign in to comment.