Skip to content

Commit

Permalink
Merge pull request #10 from viafintech/feature/add-fail-on-lock-wait-…
Browse files Browse the repository at this point in the history
…time-expired

Add support for a non 0 exit code if lock was never retrieved
  • Loading branch information
martinseener authored Apr 13, 2023
2 parents 730bd1e + 86c1ef8 commit 9b8ef20
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Use `cronlocker --help` to get the following output:
Usage of ./cronlocker:
-endpoint string
endpoint (default "http://localhost:8500")
-failwithoutlock
Exists with status code 1 if the lock was not received within lockwaittime
-key string
key to monitor, e.g. cronjobs/any_service/cron_name (default "none")
-lockwaittime int
Expand Down
28 changes: 21 additions & 7 deletions command_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,40 @@ package main
import (
"bytes"
"context"
"errors"
"net/url"
"os/exec"
"time"

"github.com/hashicorp/consul/api"
)

var errLockWaitTimeExpired = errors.New("wait time for acquiring the lock expired")

// ConsulCommandLocker is an implementation of a command locker for consul,
// responsible for acquiring a distributed lock and executing a command
type ConsulCommandLocker struct {
apiClient *api.Client
lockWaitTime time.Duration
minLockTime time.Duration
maxExecTime time.Duration
apiClient *api.Client
lockWaitTime time.Duration
minLockTime time.Duration
maxExecTime time.Duration
failOnLockWaitExpiration bool
}

// NewConsulCommandLocker initializes a new ConsulCommandlocker
func NewConsulCommandLocker(
endpoint string,
token string,
lockWaitTime time.Duration,
minLockTime time.Duration,
maxExecTime time.Duration,
failOnLockWaitExpiration bool,
) (*ConsulCommandLocker, error) {
ccl := &ConsulCommandLocker{
lockWaitTime: lockWaitTime,
minLockTime: minLockTime,
maxExecTime: maxExecTime,
lockWaitTime: lockWaitTime,
minLockTime: minLockTime,
maxExecTime: maxExecTime,
failOnLockWaitExpiration: failOnLockWaitExpiration,
}

url, err := url.Parse(endpoint)
Expand All @@ -53,6 +62,7 @@ func NewConsulCommandLocker(
return ccl, nil
}

// LockAndExecute takes a lock key and executes the command
func (ccl *ConsulCommandLocker) LockAndExecute(key, command string) (string, error) {
lockOpts := &api.LockOptions{
Key: key,
Expand All @@ -75,6 +85,10 @@ func (ccl *ConsulCommandLocker) LockAndExecute(key, command string) (string, err
// The lock was not acquired if lock channel is empty
// Therefore we can simply return
if lockCh == nil {
if ccl.failOnLockWaitExpiration {
return "", errLockWaitTimeExpired
}

return "Nothing was executed\n", nil
}

Expand Down
32 changes: 31 additions & 1 deletion command_locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestConsulCommandLockerLockAndExecute(t *testing.T) {
300*time.Millisecond,
time.Millisecond,
0,
false,
)

for _, c := range cases {
Expand Down Expand Up @@ -85,6 +86,7 @@ func TestConsulCommandLockerMinimumLockAndExecuteTime(t *testing.T) {
300*time.Millisecond,
500*time.Millisecond,
0,
false,
)

startTime := time.Now()
Expand All @@ -103,13 +105,41 @@ func TestConsulCommandLockerMaximumExecutionTime(t *testing.T) {
100*time.Millisecond,
300*time.Millisecond,
500*time.Millisecond,
false,
)

startTime := time.Now()

commandLocker.LockAndExecute("test/cron/service/min_time_job", "sleep 5")
commandLocker.LockAndExecute("test/cron/service/max_time_job", "sleep 5")

if time.Since(startTime) > 700*time.Millisecond {
t.Errorf("Locker did not abort after the maximum execution time was reached")
}
}

func TestConsulCommandLockerFailOnLockWaitTimeExpired(t *testing.T) {
commandLocker, _ := NewConsulCommandLocker(
testutils.CONSULURI,
"", // blank token
300*time.Millisecond,
500*time.Millisecond,
0,
true,
)

go func() {
commandLocker.LockAndExecute("test/cron/service/lock_wait_time_job", "sleep 10")
}()

time.Sleep(100 * time.Millisecond)

output, err := commandLocker.LockAndExecute("test/cron/service/lock_wait_time_job", "sleep 2")

if output != "" {
t.Errorf("Expected no output but received: %s", output)
}

if err != errLockWaitTimeExpired {
t.Errorf("Expected to received errLockWaitTimeExpired, but received: %+v", err)
}
}
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ var (
500,
"Configures the wait time for a lock in milliseconds",
)
failOnLockWaitExpiration = flag.Bool(
"failwithoutlock",
false,
"Exists with status code 1 if the lock was not received within lockwaittime",
)
minLockTimeMS = flag.Int(
"minlocktime",
5000,
Expand Down Expand Up @@ -52,6 +57,7 @@ func main() {
time.Duration(*lockWaitTimeMS)*time.Millisecond,
time.Duration(*minLockTimeMS)*time.Millisecond,
time.Duration(*maxExecutionTimeMS)*time.Millisecond,
*failOnLockWaitExpiration,
)
if err != nil {
log.Fatalf("%v", err)
Expand Down

0 comments on commit 9b8ef20

Please sign in to comment.