Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

fixed size worker pool to serve new job requests #72

Merged
merged 1 commit into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/mistryd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"os"
"runtime"
"strconv"

"github.com/skroutz/mistry/pkg/filesystem"
Expand All @@ -21,6 +22,9 @@ type Config struct {
ProjectsPath string `json:"projects_path"`
BuildPath string `json:"build_path"`
Mounts map[string]string `json:"mounts"`

Concurrency int `json:"job_concurrency"`
Backlog int `json:"job_backlog"`
}

// ParseConfig accepts the listening address, a filesystem adapter and a
Expand Down Expand Up @@ -55,5 +59,15 @@ func ParseConfig(addr string, fs filesystem.FileSystem, r io.Reader) (*Config, e
return nil, err
}

if cfg.Concurrency == 0 {
// our work is CPU bound so number of cores is OK
cfg.Concurrency = runtime.NumCPU()
}

if cfg.Backlog == 0 {
// by default allow a request spike double the worker capacity
cfg.Backlog = cfg.Concurrency * 2
}

return cfg, nil
}
4 changes: 3 additions & 1 deletion cmd/mistryd/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"build_path": "/var/lib/mistry/data",
"mounts": {
"/var/lib/mistry/.ssh": "/home/mistry/.ssh"
}
},
"job_concurrency": 5,
"job_backlog": 20
}
4 changes: 3 additions & 1 deletion cmd/mistryd/config.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"build_path": "/tmp",
"mounts": {
"/tmp": "/tmp"
}
},
"job_concurrency": 5,
"job_backlog": 100
}
8 changes: 7 additions & 1 deletion cmd/mistryd/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Job struct {
Project string
Params types.Params
Group string
Rebuild bool

RootBuildPath string
PendingBuildPath string
Expand Down Expand Up @@ -62,7 +63,12 @@ type Job struct {

// NewJobFromRequest returns a new Job from the JobRequest
func NewJobFromRequest(jr types.JobRequest, cfg *Config) (*Job, error) {
return NewJob(jr.Project, jr.Params, jr.Group, cfg)
j, err := NewJob(jr.Project, jr.Params, jr.Group, cfg)
if err != nil {
return nil, err
}
j.Rebuild = jr.Rebuild
return j, nil
}

// NewJob returns a new Job for the given project. project and cfg cannot be
Expand Down
1 change: 1 addition & 0 deletions cmd/mistryd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,6 @@ func StartServer(cfg *Config) error {
}()
s.Log.Printf("Listening on %s...", cfg.Addr)
wg.Wait()
s.workerPool.Stop()
return nil
}
53 changes: 32 additions & 21 deletions cmd/mistryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ import (
type Server struct {
Log *log.Logger

srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config
srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config
workerPool *WorkerPool

// web-view related

Expand Down Expand Up @@ -87,6 +88,7 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
s.workerPool = NewWorkerPool(s, cfg.Concurrency, cfg.Backlog, logger)
return s, nil
}

Expand Down Expand Up @@ -118,28 +120,43 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) {
return
}

if _, isAsync := r.URL.Query()["async"]; isAsync {
s.handleNewJobAsync(j, jr, w)
// send the work item to the worker pool
future, err := s.workerPool.SendWork(j)
if err != nil {
// the in-memory queue is overloaded, we have to wait for the workers to pick
// up new items.
// return a 503 to signal that the server is overloaded and for clients to try
// again later
// 503 is an appropriate status code to signal that the server is overloaded
// for all users, while 429 would have been used if we implemented user-specific
// throttling
s.Log.Print("Failed to send message to work queue")
w.WriteHeader(http.StatusServiceUnavailable)
return
}

// if async, we're done, otherwise wait for the result in the result channel
_, async := r.URL.Query()["async"]
if async {
s.Log.Printf("Scheduled %s", j)
w.WriteHeader(http.StatusCreated)
} else {
s.handleNewJobSync(j, jr, w)
s.Log.Printf("Waiting for result of %s...", j)
s.writeWorkResult(j, future.Wait(), w)
}
}

// handleNewJobSync triggers the build synchronously, and writes the
// build result JSON to the response
func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWriter) {
s.Log.Printf("Building %s...", j)
buildInfo, err := s.Work(context.Background(), j, jr)
if err != nil {
http.Error(w, fmt.Sprintf("Error building %s: %s", j, err),
func (s *Server) writeWorkResult(j *Job, r WorkResult, w http.ResponseWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally use pointers (ie. j *Job) unless there's a reason not to. I think it should be r *WorkResult) for consistency.

if r.Err != nil {
http.Error(w, fmt.Sprintf("Error building %s: %s", j, r.Err),
http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusCreated)
w.Header().Set("Content-Type", "application/json")

resp, err := json.Marshal(buildInfo)
resp, err := json.Marshal(r.BuildInfo)
if err != nil {
s.Log.Print(err)
}
Expand All @@ -149,12 +166,6 @@ func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWr
}
}

func (s *Server) handleNewJobAsync(j *Job, jr types.JobRequest, w http.ResponseWriter) {
s.Log.Printf("Scheduling %s...", j)
go s.Work(context.Background(), j, jr)
w.WriteHeader(http.StatusCreated)
}

// HandleIndex returns all available jobs.
func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
Expand Down
8 changes: 8 additions & 0 deletions cmd/mistryd/testdata/projects/sleep/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM debian:stretch

COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh
RUN chmod +x /usr/local/bin/docker-entrypoint.sh

WORKDIR /data

ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
4 changes: 4 additions & 0 deletions cmd/mistryd/testdata/projects/sleep/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

sleep 10
4 changes: 2 additions & 2 deletions cmd/mistryd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// Work performs the work denoted by j and returns a BuildInfo upon
// successful completion, or an error.
func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildInfo *types.BuildInfo, err error) {
func (s *Server) Work(ctx context.Context, j *Job) (buildInfo *types.BuildInfo, err error) {
log := log.New(os.Stderr, fmt.Sprintf("[worker] [%s] ", j), log.LstdFlags)
start := time.Now()
_, err = os.Stat(j.ReadyBuildPath)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildIn
return
}

err = j.BuildImage(ctx, s.cfg.UID, client, out, jr.Rebuild, jr.Rebuild)
err = j.BuildImage(ctx, s.cfg.UID, client, out, j.Rebuild, j.Rebuild)
if err != nil {
err = workErr("could not build docker image", err)
return
Expand Down
113 changes: 113 additions & 0 deletions cmd/mistryd/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be a separate package. This would force us to think more of our API. It should be pretty easy to do so, but you have to see all my other comments to understand why.

Note that if this becomes a separate package, names will get better:

  • NewWorkerPool -> New
  • WorkResult -> Result
  • etc.


import (
"context"
"errors"
"fmt"
"log"
"sync"

"github.com/skroutz/mistry/pkg/types"
)

// WorkResult contains the result of a job, either a buildinfo or an error
type WorkResult struct {
BuildInfo *types.BuildInfo
Err error
}

// FutureWorkResult is a WorkResult that may not yet have become available and
// can be Wait()'ed on
type FutureWorkResult struct {
result <-chan WorkResult
}

// Wait waits for the result to become available and returns it
func (f FutureWorkResult) Wait() WorkResult {
r, ok := <-f.result
if !ok {
// this should never happen, reading from the result channel is exclusive to
// this future
panic("Failed to read from result channel")
}
return r
}

// workItem contains a job and a channel to place the job result. struct
// used in the internal work queue
type workItem struct {
job *Job
result chan<- WorkResult
}

// WorkerPool implements a fixed-size pool of worker goroutines that can be sent
// build jobs and communicate their result
type WorkerPool struct {
// the fixed amount of goroutines that will be handling running jobs
concurrency int

// the maximum backlog of pending requests. if exceeded, sending new work
// to the pool will return an error
backlogSize int

queue chan workItem
wg sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(s *Server, concurrency, backlog int, logger *log.Logger) *WorkerPool {
p := new(WorkerPool)
p.concurrency = concurrency
p.backlogSize = backlog
p.queue = make(chan workItem, backlog)

for i := 0; i < concurrency; i++ {
go work(s, i, p.queue, &p.wg)
p.wg.Add(1)
}
logger.Printf("Set up %d workers", concurrency)
return p
}

// Stop signals the workers to close and blocks until they are closed.
func (p *WorkerPool) Stop() {
close(p.queue)
p.wg.Wait()
}

// SendWork schedules work on p and returns a FutureWorkResult. The actual result can be
// obtained by FutureWorkResult.Wait(). An error is returned if the backlog is full and
// cannot accept any new work items
func (p *WorkerPool) SendWork(j *Job) (FutureWorkResult, error) {
resultQueue := make(chan WorkResult, 1)
wi := workItem{j, resultQueue}
result := FutureWorkResult{resultQueue}

select {
case p.queue <- wi:
return result, nil
default:
return result, errors.New("queue is full")
}
}

// work listens to the workQueue, runs Work() on any incoming work items, and
// sends the result through the result queue
func work(s *Server, id int, queue <-chan workItem, wg *sync.WaitGroup) {
defer wg.Done()
logPrefix := fmt.Sprintf("[worker %d]", id)
for item := range queue {
s.Log.Printf("%s received work item %#v", logPrefix, item)
buildInfo, err := s.Work(context.Background(), item.job)

select {
case item.result <- WorkResult{buildInfo, err}:
s.Log.Printf("%s wrote result to the result channel", logPrefix)
default:
// this should never happen, the result chan should be unique for this worker
s.Log.Panicf("%s failed to write result to the result channel", logPrefix)
}
close(item.result)
}
s.Log.Printf("%s exiting...", logPrefix)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applies to the whole PR: Methods should be located just below their corresponding types. Constructors should be right after their corresponding types. Unexported methods of a type should come after its exported ones.

76 changes: 76 additions & 0 deletions cmd/mistryd/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"testing"
"time"

"github.com/skroutz/mistry/pkg/types"
)

func TestBacklogLimit(t *testing.T) {
wp, cfg := setupQueue(t, 1, 0)
defer wp.Stop()

params := types.Params{"test": "pool-backlog-limit"}
params2 := types.Params{"test": "pool-backlog-limit2"}
project := "simple"

sendWorkNoErr(wp, project, params, cfg, t)
_, _, err := sendWork(wp, project, params2, cfg, t)

if err == nil {
t.Fatal("Expected error")
}
}

func TestConcurrency(t *testing.T) {
// instatiate server with 1 worker
wp, cfg := setupQueue(t, 1, 100)
defer wp.Stop()

project := "sleep"
params := types.Params{"test": "pool-concurrency"}
params2 := types.Params{"test": "pool-concurrency2"}

sendWorkNoErr(wp, project, params, cfg, t)
// give the chance for the worker to start work
time.Sleep(1 * time.Second)

j, _ := sendWorkNoErr(wp, project, params2, cfg, t)

// the queue should contain only 1 item, the work item for the 2nd job
assertEq(len(wp.queue), 1, t)
select {
case i, ok := <-wp.queue:
if !ok {
t.Fatalf("Unexpectedly closed worker pool queue")
}
assertEq(i.job, j, t)
default:
t.Fatalf("Expected to find a work item in the queue")
}
}

func setupQueue(t *testing.T, workers, backlog int) (*WorkerPool, *Config) {
cfg := testcfg
cfg.Concurrency = workers
cfg.Backlog = backlog

s, err := NewServer(cfg, nil)
failIfError(err, t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

As a side note, I opened an issue #74 to package and refactor the test helpers.

return s.workerPool, cfg
}

func sendWork(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult, error) {
j, err := NewJob(project, params, "", cfg)
failIfError(err, t)

r, err := wp.SendWork(j)
return j, r, err
}

func sendWorkNoErr(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult) {
j, r, err := sendWork(wp, project, params, cfg, t)
failIfError(err, t)
return j, r
}