Skip to content

Commit

Permalink
Merge pull request #61 from FUSAKLA/fus-cronjob
Browse files Browse the repository at this point in the history
feat: add support for scheduling jobs using cron syntax
  • Loading branch information
dewey authored May 17, 2022
2 parents c958997 + b76825f commit f9d6fbe
Show file tree
Hide file tree
Showing 19 changed files with 1,700 additions and 29 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ jobs:
- name: "example"
# interval defined the pause between the runs of this job
interval: '5m'
# cron_schedule when to execute the job in the standard CRON syntax
# if specified, the interval is ignored
cron_schedule: "0 0 * * *"
# connections is an array of connection URLs
# each query will be executed on each connection
connections:
Expand Down
36 changes: 28 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/go-kit/log"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/robfig/cron/v3"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -77,16 +79,34 @@ type File struct {
Queries map[string]string `yaml:"queries"`
}

type cronConfig struct {
definition string
schedule cron.Schedule
}

func (c *cronConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := unmarshal(&c.definition); err != nil {
return fmt.Errorf("invalid cron_schedule, must be a string: %w", err)
}
var err error
c.schedule, err = cron.ParseStandard(c.definition)
if err != nil {
return fmt.Errorf("invalid cron_schedule syntax for `%s`: %w", c.definition, err)
}
return nil
}

// Job is a collection of connections and queries
type Job struct {
log log.Logger
conns []*connection
Name string `yaml:"name"` // name of this job
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
Interval time.Duration `yaml:"interval"` // interval at which this job is run
Connections []string `yaml:"connections"`
Queries []*Query `yaml:"queries"`
StartupSQL []string `yaml:"startup_sql"` // SQL executed on startup
log log.Logger
conns []*connection
Name string `yaml:"name"` // name of this job
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
Interval time.Duration `yaml:"interval"` // interval at which this job is run
CronSchedule cronConfig `yaml:"cron_schedule"` // if specified, the interval is ignored and the job will be executed at the specified time in CRON syntax
Connections []string `yaml:"connections"`
Queries []*Query `yaml:"queries"`
StartupSQL []string `yaml:"startup_sql"` // SQL executed on startup
}

type connection struct {
Expand Down
22 changes: 16 additions & 6 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/robfig/cron/v3"
)

// Exporter collects SQL metrics. It implements prometheus.Collector.
type Exporter struct {
jobs []*Job
logger log.Logger
jobs []*Job
logger log.Logger
cronScheduler *cron.Cron
}

// NewExporter returns a new SQL Exporter for the provided config.
Expand All @@ -25,23 +27,31 @@ func NewExporter(logger log.Logger, configFile string) (*Exporter, error) {
}

exp := &Exporter{
jobs: make([]*Job, 0, len(cfg.Jobs)),
logger: logger,
jobs: make([]*Job, 0, len(cfg.Jobs)),
logger: logger,
cronScheduler: cron.New(),
}

// dispatch all jobs
for _, job := range cfg.Jobs {
if job == nil {
continue
}

if err := job.Init(logger, cfg.Queries); err != nil {
level.Warn(logger).Log("msg", "Skipping job. Failed to initialize", "err", err, "job", job.Name)
continue
}
exp.jobs = append(exp.jobs, job)
go job.Run()
if job.CronSchedule.schedule != nil {
exp.cronScheduler.Schedule(job.CronSchedule.schedule, job)
level.Info(logger).Log("msg", "Scheduled CRON job", "name", job.Name, "cron_schedule", job.CronSchedule.definition)
} else {
go job.ExecutePeriodically()
level.Info(logger).Log("msg", "Started periodically execution of job", "name", job.Name, "interval", job.Interval)
}
}

exp.cronScheduler.Start()
return exp, nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/lib/pq v1.10.4
github.com/prometheus/client_golang v1.12.0
github.com/prometheus/common v0.32.1
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/go-athena v0.0.0-20181208004937-dfa5f1818930
github.com/snowflakedb/gosnowflake v1.6.6
github.com/vertica/vertica-sql-go v1.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
Expand Down
35 changes: 20 additions & 15 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"
"github.com/snowflakedb/gosnowflake"
"net/url"
"regexp"
"strconv"
Expand All @@ -18,7 +17,8 @@ import (
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // register the PostgreSQL driver
"github.com/prometheus/client_golang/prometheus"
_ "github.com/segmentio/go-athena" // register the AWS Athena driver
_ "github.com/segmentio/go-athena" // register the AWS Athena driver
"github.com/snowflakedb/gosnowflake"
_ "github.com/vertica/vertica-sql-go" // register the Vertica driver
)

Expand Down Expand Up @@ -70,14 +70,11 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
},
)
}
j.updateConnections()
return nil
}

// Run prepares and runs the job
func (j *Job) Run() {
if j.log == nil {
j.log = log.NewNopLogger()
}
func (j *Job) updateConnections() {
// if there are no connection URLs for this job it can't be run
if j.Connections == nil {
level.Error(j.log).Log("msg", "No connections for job", "job", j.Name)
Expand Down Expand Up @@ -171,16 +168,12 @@ func (j *Job) Run() {
j.conns = append(j.conns, newConn)
}
}
level.Debug(j.log).Log("msg", "Starting")
}

// enter the run loop
// tries to run each query on each connection at approx the interval
func (j *Job) ExecutePeriodically() {
level.Debug(j.log).Log("msg", "Starting")
for {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}
j.Run()
level.Debug(j.log).Log("msg", "Sleeping until next run", "sleep", j.Interval.String())
time.Sleep(j.Interval)
}
Expand Down Expand Up @@ -225,6 +218,18 @@ func (j *Job) markFailed(conn *connection) {
}
}

// Run the job queries with exponential backoff, implements the cron.Job interface
func (j *Job) Run() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}
}

func (j *Job) runOnce() error {
doneChan := make(chan int, len(j.conns))

Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/robfig/cron/v3/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/robfig/cron/v3/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/robfig/cron/v3/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 125 additions & 0 deletions vendor/github.com/robfig/cron/v3/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f9d6fbe

Please sign in to comment.