diff --git a/cmd/mistryd/config.go b/cmd/mistryd/config.go index 62394b8..6529cec 100644 --- a/cmd/mistryd/config.go +++ b/cmd/mistryd/config.go @@ -5,6 +5,7 @@ import ( "errors" "io" "os" + "runtime" "strconv" "github.com/skroutz/mistry/pkg/filesystem" @@ -21,6 +22,9 @@ type Config struct { ProjectsPath string `json:"projects_path"` BuildPath string `json:"build_path"` Mounts map[string]string `json:"mounts"` + + Concurrency int `json:"job_concurrency"` + Backlog int `json:"job_backlog"` } // ParseConfig accepts the listening address, a filesystem adapter and a @@ -55,5 +59,15 @@ func ParseConfig(addr string, fs filesystem.FileSystem, r io.Reader) (*Config, e return nil, err } + if cfg.Concurrency == 0 { + // our work is CPU bound so number of cores is OK + cfg.Concurrency = runtime.NumCPU() + } + + if cfg.Backlog == 0 { + // by default allow a request spike double the worker capacity + cfg.Backlog = cfg.Concurrency * 2 + } + return cfg, nil } diff --git a/cmd/mistryd/config.sample.json b/cmd/mistryd/config.sample.json index f62d9d9..27e2f9b 100644 --- a/cmd/mistryd/config.sample.json +++ b/cmd/mistryd/config.sample.json @@ -3,5 +3,7 @@ "build_path": "/var/lib/mistry/data", "mounts": { "/var/lib/mistry/.ssh": "/home/mistry/.ssh" - } + }, + "job_concurrency": 5, + "job_backlog": 20 } diff --git a/cmd/mistryd/config.test.json b/cmd/mistryd/config.test.json index b6e2b80..0768c8f 100644 --- a/cmd/mistryd/config.test.json +++ b/cmd/mistryd/config.test.json @@ -3,5 +3,7 @@ "build_path": "/tmp", "mounts": { "/tmp": "/tmp" - } + }, + "job_concurrency": 5, + "job_backlog": 100 } diff --git a/cmd/mistryd/job.go b/cmd/mistryd/job.go index 250dd60..7ae8b4a 100644 --- a/cmd/mistryd/job.go +++ b/cmd/mistryd/job.go @@ -35,6 +35,7 @@ type Job struct { Project string Params types.Params Group string + Rebuild bool RootBuildPath string PendingBuildPath string @@ -62,7 +63,12 @@ type Job struct { // NewJobFromRequest returns a new Job from the JobRequest func NewJobFromRequest(jr types.JobRequest, cfg *Config) (*Job, error) { - return NewJob(jr.Project, jr.Params, jr.Group, cfg) + j, err := NewJob(jr.Project, jr.Params, jr.Group, cfg) + if err != nil { + return nil, err + } + j.Rebuild = jr.Rebuild + return j, nil } // NewJob returns a new Job for the given project. project and cfg cannot be diff --git a/cmd/mistryd/main.go b/cmd/mistryd/main.go index 45d4560..bcf2d1e 100644 --- a/cmd/mistryd/main.go +++ b/cmd/mistryd/main.go @@ -181,5 +181,6 @@ func StartServer(cfg *Config) error { }() s.Log.Printf("Listening on %s...", cfg.Addr) wg.Wait() + s.workerPool.Stop() return nil } diff --git a/cmd/mistryd/server.go b/cmd/mistryd/server.go index 3e936a1..fa39ade 100644 --- a/cmd/mistryd/server.go +++ b/cmd/mistryd/server.go @@ -37,10 +37,11 @@ import ( type Server struct { Log *log.Logger - srv *http.Server - jq *JobQueue - pq *ProjectQueue - cfg *Config + srv *http.Server + jq *JobQueue + pq *ProjectQueue + cfg *Config + workerPool *WorkerPool // web-view related @@ -87,6 +88,7 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) { s.pq = NewProjectQueue() s.br = broker.NewBroker(s.Log) s.tq = new(sync.Map) + s.workerPool = NewWorkerPool(s, cfg.Concurrency, cfg.Backlog, logger) return s, nil } @@ -118,20 +120,35 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) { return } - if _, isAsync := r.URL.Query()["async"]; isAsync { - s.handleNewJobAsync(j, jr, w) + // send the work item to the worker pool + future, err := s.workerPool.SendWork(j) + if err != nil { + // the in-memory queue is overloaded, we have to wait for the workers to pick + // up new items. + // return a 503 to signal that the server is overloaded and for clients to try + // again later + // 503 is an appropriate status code to signal that the server is overloaded + // for all users, while 429 would have been used if we implemented user-specific + // throttling + s.Log.Print("Failed to send message to work queue") + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + // if async, we're done, otherwise wait for the result in the result channel + _, async := r.URL.Query()["async"] + if async { + s.Log.Printf("Scheduled %s", j) + w.WriteHeader(http.StatusCreated) } else { - s.handleNewJobSync(j, jr, w) + s.Log.Printf("Waiting for result of %s...", j) + s.writeWorkResult(j, future.Wait(), w) } } -// handleNewJobSync triggers the build synchronously, and writes the -// build result JSON to the response -func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWriter) { - s.Log.Printf("Building %s...", j) - buildInfo, err := s.Work(context.Background(), j, jr) - if err != nil { - http.Error(w, fmt.Sprintf("Error building %s: %s", j, err), +func (s *Server) writeWorkResult(j *Job, r WorkResult, w http.ResponseWriter) { + if r.Err != nil { + http.Error(w, fmt.Sprintf("Error building %s: %s", j, r.Err), http.StatusInternalServerError) return } @@ -139,7 +156,7 @@ func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWr w.WriteHeader(http.StatusCreated) w.Header().Set("Content-Type", "application/json") - resp, err := json.Marshal(buildInfo) + resp, err := json.Marshal(r.BuildInfo) if err != nil { s.Log.Print(err) } @@ -149,12 +166,6 @@ func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWr } } -func (s *Server) handleNewJobAsync(j *Job, jr types.JobRequest, w http.ResponseWriter) { - s.Log.Printf("Scheduling %s...", j) - go s.Work(context.Background(), j, jr) - w.WriteHeader(http.StatusCreated) -} - // HandleIndex returns all available jobs. func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { diff --git a/cmd/mistryd/testdata/projects/sleep/Dockerfile b/cmd/mistryd/testdata/projects/sleep/Dockerfile new file mode 100644 index 0000000..88c7eef --- /dev/null +++ b/cmd/mistryd/testdata/projects/sleep/Dockerfile @@ -0,0 +1,8 @@ +FROM debian:stretch + +COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh +RUN chmod +x /usr/local/bin/docker-entrypoint.sh + +WORKDIR /data + +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] diff --git a/cmd/mistryd/testdata/projects/sleep/docker-entrypoint.sh b/cmd/mistryd/testdata/projects/sleep/docker-entrypoint.sh new file mode 100644 index 0000000..29746a4 --- /dev/null +++ b/cmd/mistryd/testdata/projects/sleep/docker-entrypoint.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +sleep 10 diff --git a/cmd/mistryd/worker.go b/cmd/mistryd/worker.go index f118079..e85d9a3 100644 --- a/cmd/mistryd/worker.go +++ b/cmd/mistryd/worker.go @@ -19,7 +19,7 @@ import ( // Work performs the work denoted by j and returns a BuildInfo upon // successful completion, or an error. -func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildInfo *types.BuildInfo, err error) { +func (s *Server) Work(ctx context.Context, j *Job) (buildInfo *types.BuildInfo, err error) { log := log.New(os.Stderr, fmt.Sprintf("[worker] [%s] ", j), log.LstdFlags) start := time.Now() _, err = os.Stat(j.ReadyBuildPath) @@ -176,7 +176,7 @@ func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildIn return } - err = j.BuildImage(ctx, s.cfg.UID, client, out, jr.Rebuild, jr.Rebuild) + err = j.BuildImage(ctx, s.cfg.UID, client, out, j.Rebuild, j.Rebuild) if err != nil { err = workErr("could not build docker image", err) return diff --git a/cmd/mistryd/worker_pool.go b/cmd/mistryd/worker_pool.go new file mode 100644 index 0000000..2cb7ec1 --- /dev/null +++ b/cmd/mistryd/worker_pool.go @@ -0,0 +1,113 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "sync" + + "github.com/skroutz/mistry/pkg/types" +) + +// WorkResult contains the result of a job, either a buildinfo or an error +type WorkResult struct { + BuildInfo *types.BuildInfo + Err error +} + +// FutureWorkResult is a WorkResult that may not yet have become available and +// can be Wait()'ed on +type FutureWorkResult struct { + result <-chan WorkResult +} + +// Wait waits for the result to become available and returns it +func (f FutureWorkResult) Wait() WorkResult { + r, ok := <-f.result + if !ok { + // this should never happen, reading from the result channel is exclusive to + // this future + panic("Failed to read from result channel") + } + return r +} + +// workItem contains a job and a channel to place the job result. struct +// used in the internal work queue +type workItem struct { + job *Job + result chan<- WorkResult +} + +// WorkerPool implements a fixed-size pool of worker goroutines that can be sent +// build jobs and communicate their result +type WorkerPool struct { + // the fixed amount of goroutines that will be handling running jobs + concurrency int + + // the maximum backlog of pending requests. if exceeded, sending new work + // to the pool will return an error + backlogSize int + + queue chan workItem + wg sync.WaitGroup +} + +// NewWorkerPool creates a new worker pool +func NewWorkerPool(s *Server, concurrency, backlog int, logger *log.Logger) *WorkerPool { + p := new(WorkerPool) + p.concurrency = concurrency + p.backlogSize = backlog + p.queue = make(chan workItem, backlog) + + for i := 0; i < concurrency; i++ { + go work(s, i, p.queue, &p.wg) + p.wg.Add(1) + } + logger.Printf("Set up %d workers", concurrency) + return p +} + +// Stop signals the workers to close and blocks until they are closed. +func (p *WorkerPool) Stop() { + close(p.queue) + p.wg.Wait() +} + +// SendWork schedules work on p and returns a FutureWorkResult. The actual result can be +// obtained by FutureWorkResult.Wait(). An error is returned if the backlog is full and +// cannot accept any new work items +func (p *WorkerPool) SendWork(j *Job) (FutureWorkResult, error) { + resultQueue := make(chan WorkResult, 1) + wi := workItem{j, resultQueue} + result := FutureWorkResult{resultQueue} + + select { + case p.queue <- wi: + return result, nil + default: + return result, errors.New("queue is full") + } +} + +// work listens to the workQueue, runs Work() on any incoming work items, and +// sends the result through the result queue +func work(s *Server, id int, queue <-chan workItem, wg *sync.WaitGroup) { + defer wg.Done() + logPrefix := fmt.Sprintf("[worker %d]", id) + for item := range queue { + s.Log.Printf("%s received work item %#v", logPrefix, item) + buildInfo, err := s.Work(context.Background(), item.job) + + select { + case item.result <- WorkResult{buildInfo, err}: + s.Log.Printf("%s wrote result to the result channel", logPrefix) + default: + // this should never happen, the result chan should be unique for this worker + s.Log.Panicf("%s failed to write result to the result channel", logPrefix) + } + close(item.result) + } + s.Log.Printf("%s exiting...", logPrefix) +} diff --git a/cmd/mistryd/worker_pool_test.go b/cmd/mistryd/worker_pool_test.go new file mode 100644 index 0000000..0337c51 --- /dev/null +++ b/cmd/mistryd/worker_pool_test.go @@ -0,0 +1,76 @@ +package main + +import ( + "testing" + "time" + + "github.com/skroutz/mistry/pkg/types" +) + +func TestBacklogLimit(t *testing.T) { + wp, cfg := setupQueue(t, 1, 0) + defer wp.Stop() + + params := types.Params{"test": "pool-backlog-limit"} + params2 := types.Params{"test": "pool-backlog-limit2"} + project := "simple" + + sendWorkNoErr(wp, project, params, cfg, t) + _, _, err := sendWork(wp, project, params2, cfg, t) + + if err == nil { + t.Fatal("Expected error") + } +} + +func TestConcurrency(t *testing.T) { + // instatiate server with 1 worker + wp, cfg := setupQueue(t, 1, 100) + defer wp.Stop() + + project := "sleep" + params := types.Params{"test": "pool-concurrency"} + params2 := types.Params{"test": "pool-concurrency2"} + + sendWorkNoErr(wp, project, params, cfg, t) + // give the chance for the worker to start work + time.Sleep(1 * time.Second) + + j, _ := sendWorkNoErr(wp, project, params2, cfg, t) + + // the queue should contain only 1 item, the work item for the 2nd job + assertEq(len(wp.queue), 1, t) + select { + case i, ok := <-wp.queue: + if !ok { + t.Fatalf("Unexpectedly closed worker pool queue") + } + assertEq(i.job, j, t) + default: + t.Fatalf("Expected to find a work item in the queue") + } +} + +func setupQueue(t *testing.T, workers, backlog int) (*WorkerPool, *Config) { + cfg := testcfg + cfg.Concurrency = workers + cfg.Backlog = backlog + + s, err := NewServer(cfg, nil) + failIfError(err, t) + return s.workerPool, cfg +} + +func sendWork(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult, error) { + j, err := NewJob(project, params, "", cfg) + failIfError(err, t) + + r, err := wp.SendWork(j) + return j, r, err +} + +func sendWorkNoErr(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult) { + j, r, err := sendWork(wp, project, params, cfg, t) + failIfError(err, t) + return j, r +}