-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker.go
75 lines (65 loc) · 1.7 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package jiraworklog
import (
"time"
log "github.com/sirupsen/logrus"
)
// Worker will do its Action once every interval, making up for lost time that
// happened during the Action by only waiting the time left in the interval.
type Worker struct {
Stopped bool // A flag determining the state of the worker
Jobs []Job
logger *log.Entry
stopChan chan struct{}
}
//Job represents a job that needs to run at the given interval
type Job interface {
Run() error
GetInterval() time.Duration
GetName() string
}
// NewWorker creates a new worker and instantiates all the data structures required.
func NewWorker(logger *log.Entry, jobs ...Job) *Worker {
return &Worker{
Stopped: false,
stopChan: make(chan struct{}),
//ShutdownChannel: make(chan string),
logger: logger,
Jobs: jobs,
}
}
//Start will start the worker and listens for a shutdown call.
func (w *Worker) Start() {
for _, job := range w.Jobs {
go w.Run(job)
}
}
//Run will execute the given job
func (w *Worker) Run(job Job) {
for {
hasError := false
started := time.Now()
err := job.Run()
if err != nil {
w.logger.WithError(err).WithField("job", job.GetName()).Error("job run failed")
hasError = true
}
if !hasError {
finished := time.Now()
duration := finished.Sub(started)
w.logger.WithField("duration", duration).WithField("job", job.GetName()).Info("job run complete")
}
select {
case <-w.stopChan:
w.logger.WithField("job", job.GetName()).Warn("Shutting down")
return
case <-time.After(job.GetInterval()):
// This breaks out of the select, not the for loop.
break
}
}
}
// Shutdown is a graceful shutdown mechanism
func (w *Worker) Shutdown() {
w.Stopped = true
close(w.stopChan)
}