Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
factor logic out to worker_pool.go
Browse files Browse the repository at this point in the history
  • Loading branch information
dtheodor committed May 4, 2018
1 parent 987e203 commit 38b925e
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 62 deletions.
2 changes: 1 addition & 1 deletion cmd/mistryd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,6 @@ func StartServer(cfg *Config) error {
}()
s.Log.Printf("Listening on %s...", cfg.Addr)
wg.Wait()
close(s.workQueue)
s.WorkerPool.Stop()
return nil
}
75 changes: 14 additions & 61 deletions cmd/mistryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@ import (
type Server struct {
Log *log.Logger

srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config

// queue to place job work items. the worker pool listens to this queue to process new jobs
maxWorkers int
workQueueSize int
workQueue chan WorkItem
srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config
WorkerPool *WorkerPool

// web-view related

Expand Down Expand Up @@ -92,9 +88,7 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
s.workQueueSize = 100
s.maxWorkers = 1
s.workQueue = make(chan WorkItem, s.workQueueSize)
s.WorkerPool = NewWorkerPool(s, 5, 100, logger)
return s, nil
}

Expand Down Expand Up @@ -127,11 +121,8 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) {
}

// send the work item to the worker pool
workItem := WorkItem{j, jr, make(chan WorkResult, 1)}

select {
case s.workQueue <- workItem:
default:
future, err := s.WorkerPool.SendWork(j, jr)
if err != nil {
// the in-memory queue is overloaded, we have to wait for the workers to pick
// up new items.
// return a 503 to signal that the server is overloaded and for clients to try
Expand All @@ -150,48 +141,18 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusCreated)
} else {
s.Log.Printf("Waiting for result for %s...", j)
result, ok := <-workItem.ResultChan
if !ok {
result, err := future.Wait()
if err != nil {
s.Log.Print("Failed to read from result channel")
w.WriteHeader(http.StatusInternalServerError)
}
s.Log.Printf("Received result for %s, returning", j)
s.writeJobResultResponse(j, result, w)
}
}

// WorkResult contains the result of a job, either a buildinfo or an error
type WorkResult struct {
BuildInfo *types.BuildInfo
Err error
}

// WorkItem contains a job and a channel to place the job result
type WorkItem struct {
Job *Job
JobRequest types.JobRequest
ResultChan chan WorkResult
}

func (s *Server) worker(id int) {
logPrefix := fmt.Sprintf("Worker %d:", id)
for item := range s.workQueue {
s.Log.Printf("%s received work item %#v", logPrefix, item)
buildInfo, err := s.Work(context.Background(), item.Job, item.JobRequest)

select {
case item.ResultChan <- WorkResult{buildInfo, err}:
s.Log.Printf("%s placed result in result channel", logPrefix)
close(item.ResultChan)
default:
// we can't place the result to the channel, this is a bug
s.Log.Printf("Failed to write to the result channel")
} else {
s.Log.Printf("Received result for %s, returning", j)
s.writeWorkResult(j, result, w)
}
}
s.Log.Printf("%s work items channel closed, returning...", logPrefix)
}

func (s *Server) writeJobResultResponse(j *Job, r WorkResult, w http.ResponseWriter) {
func (s *Server) writeWorkResult(j *Job, r WorkResult, w http.ResponseWriter) {
if r.Err != nil {
http.Error(w, fmt.Sprintf("Error building %s: %s", j, r.Err),
http.StatusInternalServerError)
Expand Down Expand Up @@ -458,18 +419,10 @@ func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) {
// non-nil error.
func (s *Server) ListenAndServe() error {
s.Log.Printf("Configuration: %#v", s.cfg)
s.setupWorkerPool()
go s.br.ListenForClients()
return s.srv.ListenAndServe()
}

func (s *Server) setupWorkerPool() {
for i := 0; i < s.maxWorkers; i++ {
go s.worker(i)
}
s.Log.Printf("Setup %d workers", s.maxWorkers)
}

// RebuildResult contains result data on the rebuild operation
type RebuildResult struct {
successful int
Expand Down
97 changes: 97 additions & 0 deletions cmd/mistryd/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"context"
"fmt"
"log"

"github.com/skroutz/mistry/pkg/types"
)

// WorkerPool implements a fixed-size pool of worker goroutines that can be sent
// build jobs and communicate their result
type WorkerPool struct {
MaxWorkers int
QueueSize int
queue chan workItem
}

// WorkResult contains the result of a job, either a buildinfo or an error
type WorkResult struct {
BuildInfo *types.BuildInfo
Err error
}

// FutureWorkResult is a WorkResult that has not yet become available
type FutureWorkResult struct {
resultQueue <-chan WorkResult
}

// Wait waits for the result to become available and returns it
func (future FutureWorkResult) Wait() (WorkResult, error) {
result, ok := <-future.resultQueue
if !ok {
return WorkResult{}, fmt.Errorf("Failed to read from result channel")
}
return result, nil
}

// workItem contains a job and a channel to place the job result
type workItem struct {
job *Job
jobRequest types.JobRequest
result chan<- WorkResult
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(s *Server, maxWorkers, queueSize int, logger *log.Logger) *WorkerPool {
p := new(WorkerPool)
p.MaxWorkers = maxWorkers
p.QueueSize = queueSize
p.queue = make(chan workItem, queueSize)

for i := 0; i < maxWorkers; i++ {
go poolWorker(s, i, p.queue)
}
logger.Printf("Set up %d workers", maxWorkers)
return p
}

// Stop stops the pool workers and closes the queue
func (p *WorkerPool) Stop() {
close(p.queue)
}

// SendWork sends work to the pool, and receive a work result that can be waited on
func (p *WorkerPool) SendWork(j *Job, jr types.JobRequest) (FutureWorkResult, error) {
resultQueue := make(chan WorkResult, 1)
wi := workItem{j, jr, resultQueue}
result := FutureWorkResult{resultQueue}

select {
case p.queue <- wi:
return result, nil
default:
return result, fmt.Errorf("queue is full")
}
}

// poolWorker listens to the workQueue, runs Work() on any incoming work items, and
// sends the result through the result queue
func poolWorker(s *Server, id int, workQueue <-chan workItem) {
logPrefix := fmt.Sprintf("[worker %d]", id)
for item := range workQueue {
s.Log.Printf("%s received work item %#v", logPrefix, item)
buildInfo, err := s.Work(context.Background(), item.job, item.jobRequest)

select {
case item.result <- WorkResult{buildInfo, err}:
s.Log.Printf("%s wrote result to the result channel", logPrefix)
default:
// this is a bug, should log error or panic here
s.Log.Printf("Failed to write result to the result channel")
}
close(item.result)
}
s.Log.Printf("%s exiting...", logPrefix)
}

0 comments on commit 38b925e

Please sign in to comment.