Skip to content

Commit

Permalink
Add fanout example
Browse files Browse the repository at this point in the history
  • Loading branch information
chriso committed Jun 27, 2024
1 parent b369e72 commit 09126bd
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions examples/fanout/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"encoding/json"
"log"

"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/dispatchhttp"
"golang.org/x/exp/maps"
)

func main() {
getRepo := dispatch.Func("getRepo", func(ctx context.Context, name string) (*dispatchhttp.Response, error) {
return dispatchhttp.Get(context.Background(), "https://api.github.com/repos/dispatchrun/"+name)
})

getStargazers := dispatch.Func("getStargazers", func(ctx context.Context, url string) (*dispatchhttp.Response, error) {
return dispatchhttp.Get(context.Background(), url)
})

reduceStargazers := dispatch.Func("reduceStargazers", func(ctx context.Context, stargazerURLs strings) (strings, error) {
responses, err := getStargazers.Gather(stargazerURLs)
if err != nil {
return nil, err
}
stargazers := map[string]struct{}{}
for _, res := range responses {
var stars []struct {
Login string `json:"login"`
}
if err := json.Unmarshal(res.Body, &stars); err != nil {
return nil, err
}
for _, star := range stars {
stargazers[star.Login] = struct{}{}
}
}
return maps.Keys(stargazers), nil
})

fanout := dispatch.Func("fanout", func(ctx context.Context, repoNames strings) (strings, error) {
responses, err := getRepo.Gather(repoNames)
if err != nil {
return nil, err
}

var stargazerURLs []string
for _, res := range responses {
var repo struct {
StargazersURL string `json:"stargazers_url"`
}
if err := json.Unmarshal(res.Body, &repo); err != nil {
return nil, err
}
stargazerURLs = append(stargazerURLs, repo.StargazersURL)
}

return reduceStargazers.Await(stargazerURLs)
})

endpoint, err := dispatch.New(getRepo, getStargazers, reduceStargazers, fanout)
if err != nil {
log.Fatalf("failed to create endpoint: %v", err)
}

go func() {
if _, err := fanout.Dispatch(context.Background(), strings{"coroutine", "dispatch-py"}); err != nil {
log.Fatalf("failed to dispatch call: %v", err)
}
}()

if err := endpoint.ListenAndServe(); err != nil {
log.Fatalf("failed to serve endpoint: %v", err)
}
}

// TODO: update dispatchproto.Marshal to support serializing slices/maps
// natively (if they can be sent on the wire as structpb.Value)
type strings []string

func (s strings) MarshalJSON() ([]byte, error) {
return json.Marshal([]string(s))
}

func (s *strings) UnmarshalJSON(b []byte) error {
var c []string
if err := json.Unmarshal(b, &c); err != nil {
return err
}
*s = c
return nil
}

0 comments on commit 09126bd

Please sign in to comment.