Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lock: add etcdv3 semaphore implementation #5

Merged
merged 3 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions internal/lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package lock

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"

"go.etcd.io/etcd/clientv3"
)

const (
keyTemplate = "com.coreos.airlock/groups/%s/v1/semaphore"
lucab marked this conversation as resolved.
Show resolved Hide resolved
defaultGroup = "default"
)

var (
// ErrNilManager is returned on nil manager
ErrNilManager = errors.New("nil Manager")
)

// Manager takes care of locking for clients
type Manager struct {
client *clientv3.Client
keyPath string
}

// NewManager returns a new lock manager, ensuring the underlying semaphore is initialized.
func NewManager(ctx context.Context, etcdURLs []string, group string, slots uint64) (*Manager, error) {
lucab marked this conversation as resolved.
Show resolved Hide resolved
// TODO(lucab): move to clientv3.New(clientv3.Config)
client, err := clientv3.NewFromURL(etcdURLs[0])
lucab marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

keyPath := fmt.Sprintf(keyTemplate, url.QueryEscape(group))
manager := Manager{client, keyPath}

if err := manager.ensureInit(ctx, slots); err != nil {
return nil, err
}

return &manager, nil
}

// RecursiveLock adds this lock `id` as a holder of the semaphore
//
// It will return an error if there is a problem getting or setting the
// semaphore, or if the maximum number of holders has been reached.
func (m *Manager) RecursiveLock(ctx context.Context, id string) error {
sem, version, err := m.get(ctx)
if err != nil {
return err
}

held, err := sem.RecursiveLock(id)
if err != nil {
return err
}
if held {
return nil
}

if err := m.set(ctx, sem, version); err != nil {
return err
}

return nil
}

// UnlockIfHeld removes this lock `id` as a holder of the semaphore
//
// It returns an error if there is a problem getting or setting the semaphore.
func (m *Manager) UnlockIfHeld(ctx context.Context, id string) error {
sem, version, err := m.get(ctx)
if err != nil {
return err
}

if err := sem.UnlockIfHeld(id); err != nil {
return err
}

if err := m.set(ctx, sem, version); err != nil {
lucab marked this conversation as resolved.
Show resolved Hide resolved
return err
}

return nil
}

// Close reaps all running goroutines
func (m *Manager) Close() {
if m == nil {
return
}

m.client.Close()
}

// ensureInit initialize the semaphore in etcd, if it does not exist yet
func (m *Manager) ensureInit(ctx context.Context, slots uint64) error {
if m == nil {
return ErrNilManager
}

sem := NewSemaphore(slots)
semValue, err := sem.String()
if err != nil {
return err
}

_, err = m.client.Txn(ctx).If(
// version=0 means that the key does not exist.
clientv3.Compare(clientv3.Version(m.keyPath), "=", 0),
).Then(
clientv3.OpPut(m.keyPath, semValue),
).Commit()

if err != nil {
return err
}
return nil
}

// get returns the current semaphore value and version, or an error
func (m *Manager) get(ctx context.Context) (*Semaphore, int64, error) {
resp, err := m.client.Get(ctx, m.keyPath)
if err != nil {
return nil, 0, err
}
if resp.Count != 1 {
return nil, 0, fmt.Errorf("unexpected number of results: %d", resp.Count)
}

var data []byte
var version int64
for _, kv := range resp.Kvs {
data = kv.Value
version = kv.Version
break
}
if version == 0 {
return nil, 0, errors.New("key at version 0")
}
if len(data) == 0 {
return nil, 0, errors.New("empty semaphore value")
}

sem := &Semaphore{}
err = json.Unmarshal(data, sem)
if err != nil {
return nil, 0, err
}

return sem, version, nil
}

// set updates the semaphore in etcd, if `version` matches the one previously observed
func (m *Manager) set(ctx context.Context, sem *Semaphore, version int64) error {
if m == nil {
return ErrNilManager
}
if sem == nil {
return ErrNilSemaphore
}

data, err := json.Marshal(sem)
if err != nil {
return err
}

// Conditionally Put if version in etcd is still the same we observed.
// If the condition is not met, the transaction will return as "not succeeding".
resp, err := m.client.Txn(ctx).If(
clientv3.Compare(clientv3.Version(m.keyPath), "=", version),
).Then(
clientv3.OpPut(m.keyPath, string(data)),
).Commit()

if err != nil {
return err
}
if !resp.Succeeded {
return errors.New("conflict on semaphore detected, aborting")
}

return nil
}
108 changes: 108 additions & 0 deletions internal/lock/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package lock

import (
"encoding/json"
"errors"
"fmt"
"sort"
)

var (
// ErrNilSemaphore is returned on nil semaphore.
ErrNilSemaphore = errors.New("nil Semaphore")
)

// Semaphore is a struct representation of the information held by the semaphore
type Semaphore struct {
TotalSlots uint64 `json:"total_slots"`
Holders []string `json:"holders"`
}

// NewSemaphore returns a new empty semaphore.
func NewSemaphore(slots uint64) (sem *Semaphore) {
return &Semaphore{slots, []string{}}
}

// RecursiveLock adds holder `id` to the semaphore, or returns an error if
// the semaphore is already at maximum capacity.
func (s *Semaphore) RecursiveLock(id string) (bool, error) {
if s == nil {
return false, ErrNilSemaphore
}

// Check if id is already holding a lock.
loc := sort.SearchStrings(s.Holders, id)
if loc < len(s.Holders) && s.Holders[loc] == id {
return true, nil
}

if err := s.addHolder(id); err != nil {
return false, err
}

return false, nil
}

// UnlockIfHeld removes holder `id` from the semaphore, if present.
func (s *Semaphore) UnlockIfHeld(h string) error {
if s == nil {
return ErrNilSemaphore
}

_, err := s.removeHolderIfPresent(h)
if err != nil {
return err
}

return nil
}

// String returns a JSON representation of the semaphore.
func (s *Semaphore) String() (string, error) {
if s == nil {
return "", ErrNilSemaphore
}

b, err := json.Marshal(s)
if err != nil {
return "", err
}

return string(b), nil
}

// addHolder adds a holder with id h to the list of holders in the semaphore
func (s *Semaphore) addHolder(h string) error {
if s == nil {
return ErrNilSemaphore
}
if len(s.Holders) >= int(s.TotalSlots) {
return fmt.Errorf("all %d semaphore slots currently locked", s.TotalSlots)
}

loc := sort.SearchStrings(s.Holders, h)
switch {
case loc == len(s.Holders):
s.Holders = append(s.Holders, h)
default:
s.Holders = append(s.Holders[:loc], append([]string{h}, s.Holders[loc:]...)...)
}

return nil
}

// removeHolder removes a holder with id h from the list of holders in the
// semaphore. It returns whether the holder was present in the list.
func (s *Semaphore) removeHolderIfPresent(h string) (bool, error) {
if s == nil {
return false, ErrNilSemaphore
}

loc := sort.SearchStrings(s.Holders, h)
if loc < len(s.Holders) && s.Holders[loc] == h {
s.Holders = append(s.Holders[:loc], s.Holders[loc+1:]...)
return true, nil
}

return false, nil
}
86 changes: 86 additions & 0 deletions internal/lock/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package lock

import (
"reflect"
"testing"
)

func TestSingleLock(t *testing.T) {
sem := NewSemaphore(1)

if sem.TotalSlots != 1 {
t.Errorf("unexpected semaphore size: %d", sem.TotalSlots)
}

held, err := sem.RecursiveLock("a")
if err != nil {
t.Error(err)
}
if held {
t.Error("unexpected holding lock")
}
if !reflect.DeepEqual(sem.Holders, []string{"a"}) {
t.Error("lock did not add a to the holders")
}
if sem.TotalSlots != 1 {
t.Errorf("unexpected semaphore size: %d", sem.TotalSlots)
}

if err := sem.UnlockIfHeld("a"); err != nil {
t.Error(err)
}
if len(sem.Holders) != 0 {
t.Error("lock did not remove a from the holders")
}
if sem.TotalSlots != 1 {
t.Errorf("unexpected semaphore size: %d", sem.TotalSlots)
}
}

func TestRecursivelock(t *testing.T) {
sem := NewSemaphore(1)

heldOne, err := sem.RecursiveLock("a")
if err != nil {
t.Error(err)
}
if heldOne {
t.Error("unexpected holding lock")
}

heldTwo, err := sem.RecursiveLock("a")
if err != nil {
t.Error(err)
}
if !heldTwo {
t.Error("unexpected not holding lock")
}

if err := sem.UnlockIfHeld("a"); err != nil {
t.Error(err)
}
}

func TestHolderOrdering(t *testing.T) {
sem := NewSemaphore(3)

if _, err := sem.RecursiveLock("c"); err != nil {
t.Error(err)
}
if _, err := sem.RecursiveLock("a"); err != nil {
t.Error(err)
}
if _, err := sem.RecursiveLock("b"); err != nil {
t.Error(err)
}

if !reflect.DeepEqual(sem.Holders, []string{"a", "b", "c"}) {
t.Error("unexpected ordering")
}
if err := sem.UnlockIfHeld("b"); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(sem.Holders, []string{"a", "c"}) {
t.Error("unexpected ordering")
}
}