Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

fixed size worker pool to serve new job requests #72

Merged
merged 1 commit into from
May 8, 2018
Merged

Conversation

dtheodor
Copy link
Contributor

@dtheodor dtheodor commented May 4, 2018

Based on https://gobyexample.com/worker-pools, with error and result handling improvements

  • X amount of goroutines listen to a queue for new job requests
  • HTTP new job requests place the work into the queue
  • the result is communicated back to the HTTP request goroutine through
    a per-request unique result channel
  • worker pool
  • configurable settings
  • tests
  • on exit wait for jobs to finish

type WorkResult struct {
BuildInfo *types.BuildInfo
Err error
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since BuildInfo already contains an .Err, can't we just use BuildInfo instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BuildInfo.Err contains build errors. The outer error is about failing to retrieve or construct the BuildInfo.

This struct captures the result of Work() (BuildInfo, error) which needs to be forwarded through a channel, so I'm capturing the two output parameters into a struct to pass it as one unit through channels

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, fair enough.

type WorkItem struct {
Job *Job
JobRequest types.JobRequest
ResultChan chan WorkResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResultChan -> Result

// WorkItem contains a job and a channel to place the job result
type WorkItem struct {
Job *Job
JobRequest types.JobRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*types.JobRequest

for i := 0; i < s.maxWorkers; i++ {
go s.worker(i)
}
s.Log.Printf("Setup %d workers", s.maxWorkers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] "Set up" or "Spawned"

@@ -87,6 +92,9 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
s.workQueueSize = 100
s.maxWorkers = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be a cfg.GlobalBuildConcurrency or something.

s.Log.Printf("%s work items channel closed, returning...", logPrefix)
}

func (s *Server) writeJobResultResponse(j *Job, r WorkResult, w http.ResponseWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this plays correct? I'd expect it to receive a pointer to the ResponseWriter, not a copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for WorkResult.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResponseWriter is an interface. The WorkResult doesn't need to be mutated so we don't need to pass a pointer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

j isn't mutated either.

s.Log.Printf("%s work items channel closed, returning...", logPrefix)
}

func (s *Server) writeJobResultResponse(j *Job, r WorkResult, w http.ResponseWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps writeWorkResult()?

go s.br.ListenForClients()
return s.srv.ListenAndServe()
}

func (s *Server) setupWorkerPool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too small and I think it should be inlined to ListenAndServe. We're not going to use it anywhere else for sure.

ResultChan chan WorkResult
}

func (s *Server) worker(id int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a more accurate name (work?) and documentation. Also, do we need to have an id?

}

func (s *Server) worker(id int) {
logPrefix := fmt.Sprintf("Worker %d:", id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to prefix things like this:

[worker] foo

@dtheodor dtheodor force-pushed the throttling branch 4 times, most recently from 38b925e to d8bb7f5 Compare May 4, 2018 14:29
@@ -21,6 +22,9 @@ type Config struct {
ProjectsPath string `json:"projects_path"`
BuildPath string `json:"build_path"`
Mounts map[string]string `json:"mounts"`

PoolMaxWorkers int `json:"pool_max_workers"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not expose implementation details to the user; we can use "Concurrency" or "BuildConcurrency" or "GlobalBuildConcurrency".

@@ -21,6 +22,9 @@ type Config struct {
ProjectsPath string `json:"projects_path"`
BuildPath string `json:"build_path"`
Mounts map[string]string `json:"mounts"`

PoolMaxWorkers int `json:"pool_max_workers"`
PoolMaxQueueSize int `json:"pool_max_queue_size"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't this configuration is need to be exposed to the user at all. We can use a reasonable fixed number for it; I believe 100 is a sane choice.

}
},
"pool_max_workers": 5,
"pool_max_queue_size": 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update config.test.json accordingly.

jq *JobQueue
pq *ProjectQueue
cfg *Config
WorkerPool *WorkerPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is exposed?

return
}

// if async, we're done, otherwise wait for the result in the result channel
if _, isAsync := r.URL.Query()["async"]; isAsync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[out of scope] I think this should be renamed to "async". Also, we should be consistent in our style: the if ... should be on a new line.

return p
}

// Stop stops the pool workers and closes the queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"closes the queue" is implementation detail. How about something like:

Stop signals the workers to close. Stop does not block until they are closed.

close(p.queue)
}

// SendWork sends work to the pool, and receives a work result that can be waited on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendWork schedules work on p and returns a FutureWorkResult. The actual result can be obtained by FutureWorkResult.Wait(). [...]


// SendWork sends work to the pool, and receives a work result that can be waited on.
// An error is returned if the backlog is full and cannot accept any new work items
func (p *WorkerPool) SendWork(j *Job, jr types.JobRequest) (FutureWorkResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow make this agnostic to Job and types.JobRequest? I think we can. Couldn't this accept a WorkItem? This would seem more intuitive and better since we get rid of the tight coupling with the server.

case p.queue <- wi:
return result, nil
default:
return result, fmt.Errorf("queue is full")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use Errorf since you don't format anything. Use errors.New. I think go vet should fail here anyway.


// poolWorker listens to the workQueue, runs Work() on any incoming work items, and
// sends the result through the result queue
func poolWorker(s *Server, id int, workQueue <-chan workItem) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Maybe just spawn or start is a better name?
  • Also, same goes here regarding to Server. I think this is unnecessary coupling that we could and should avoid.
  • "workQueue" could be renamed to the simpler "queue". The kind of thing that's queued in there is apparent from the type.

#!/bin/bash
set -e

sleep 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] consider putting a newline at the end of the file

p.wg.Wait()
}

// SendWork chedules work on p and returns a FutureWorkResult. The actual result can be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace chedules with schedules 🐦

cfg.Backlog = backlog

s, err := NewServer(cfg, nil)
failIfError(err, t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

As a side note, I opened an issue #74 to package and refactor the test helpers.

@apostergiou
Copy link
Contributor

LGTM 👍

* X amount of goroutines listen to a queue for new job requests
* HTTP new job requests place the work into the queue
* the result is communicated back to the HTTP request goroutine through
a per-request unique result channel
@dtheodor dtheodor merged commit 73c44ec into master May 8, 2018
@dtheodor dtheodor deleted the throttling branch May 8, 2018 07:25
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants