Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node-controller: Support an annotation to hold updates #2163

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 41 additions & 61 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (ctrl *Controller) updateNode(old, cur interface{}) {
daemonconsts.CurrentMachineConfigAnnotationKey,
daemonconsts.DesiredMachineConfigAnnotationKey,
daemonconsts.MachineConfigDaemonStateAnnotationKey,
daemonconsts.MachineUpdateHoldAnnotationKey,
}
for _, anno := range annos {
newValue := curNode.Annotations[anno]
Expand Down Expand Up @@ -758,10 +759,10 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
return err
}

candidates, capacity := getAllCandidateMachines(pool, nodes, maxunavail)
if len(candidates) > 0 {
ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(candidates), capacity)
if err := ctrl.updateCandidateMachines(pool, candidates, capacity); err != nil {
state := getAllCandidateMachines(pool, nodes, maxunavail)
if len(state.candidates) > 0 {
ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(state.candidates), state.capacity)
if err := ctrl.updateCandidateMachines(pool, state.candidates, state.capacity); err != nil {
if syncErr := ctrl.syncStatusOnly(pool); syncErr != nil {
return goerrs.Wrapf(err, "error setting desired machine config annotation for pool %q, sync error: %v", pool.Name, syncErr)
}
Expand Down Expand Up @@ -833,20 +834,21 @@ func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfi
})
}

type updateCandidateState struct {
candidates []*corev1.Node
unavail uint
held uint
capacity uint
}

// getAllCandidateMachines returns all possible nodes which can be updated to the target config, along with a maximum
// capacity. It is the reponsibility of the caller to choose a subset of the nodes given the capacity.
func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) ([]*corev1.Node, uint) {
func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) updateCandidateState {
targetConfig := pool.Spec.Configuration.Name

unavail := getUnavailableMachines(nodesInPool)
// If we're at capacity, there's nothing to do.
if len(unavail) >= maxUnavailable {
return nil, 0
}
capacity := maxUnavailable - len(unavail)
failingThisConfig := 0
var failingThisConfig uint
ret := updateCandidateState{}
// We only look at nodes which aren't already targeting our desired config
var nodes []*corev1.Node
var candidates []*corev1.Node
for _, node := range nodesInPool {
if node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] == targetConfig {
if isNodeMCDFailing(node) {
Expand All @@ -855,69 +857,47 @@ func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*core
continue
}

nodes = append(nodes, node)
if _, ok := node.Annotations[daemonconsts.MachineUpdateHoldAnnotationKey]; ok {
ret.held++
continue
}

candidates = append(candidates, node)
}

ret.unavail = uint(len(getUnavailableMachines(nodesInPool)))
// If we're at capacity, there's nothing to do.
if ret.unavail >= uint(maxUnavailable) {
ret.capacity = 0
return ret
}
ret.capacity = uint(maxUnavailable) - ret.unavail

// Nodes which are failing to target this config also count against
// availability - it might be a transient issue, and if the issue
// clears we don't want multiple to update at once.
if failingThisConfig >= capacity {
return nil, 0
if failingThisConfig >= ret.capacity {
ret.capacity = 0
return ret
}
capacity -= failingThisConfig
ret.capacity -= failingThisConfig
ret.candidates = candidates

return nodes, uint(capacity)
return ret
}

// getCandidateMachines returns the maximum subset of nodes which can be updated to the target config given availability constraints.
func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) []*corev1.Node {
nodes, capacity := getAllCandidateMachines(pool, nodesInPool, maxUnavailable)
if uint(len(nodes)) < capacity {
return nodes
state := getAllCandidateMachines(pool, nodesInPool, maxUnavailable)
fmt.Printf("%+v\n", state)
if uint(len(state.candidates)) < state.capacity {
return state.candidates
}
return nodes[:capacity]
}

// getCurrentEtcdLeader is not yet implemented
func (ctrl *Controller) getCurrentEtcdLeader(candidates []*corev1.Node) (*corev1.Node, error) {
return nil, nil
}

// filterControlPlaneCandidateNodes adjusts the candidates and capacity specifically
// for the control plane, e.g. based on which node is the etcd leader at the time.
// nolint:unparam
func (ctrl *Controller) filterControlPlaneCandidateNodes(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) ([]*corev1.Node, uint, error) {
if len(candidates) <= 1 {
return candidates, capacity, nil
}
etcdLeader, err := ctrl.getCurrentEtcdLeader(candidates)
if err != nil {
glog.Warningf("Failed to find current etcd leader (continuing anyways): %v", err)
}
var newCandidates []*corev1.Node
for _, node := range candidates {
if node == etcdLeader {
// For now make this an event so we know it's working, even though it's more of a non-event
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "DeferringEtcdLeaderUpdate", "Deferring update of etcd leader %s", node.Name)
glog.Infof("Deferring update of etcd leader: %s", node.Name)
continue
}
newCandidates = append(newCandidates, node)
}
return newCandidates, capacity, nil
return state.candidates[:state.capacity]
}

// updateCandidateMachines sets the desiredConfig annotation the candidate machines
func (ctrl *Controller) updateCandidateMachines(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) error {
if pool.Name == masterPoolName {
var err error
candidates, capacity, err = ctrl.filterControlPlaneCandidateNodes(pool, candidates, capacity)
if err != nil {
return err
}
// In practice right now these counts will be 1 but let's stay general to support 5 etcd nodes in the future
ctrl.logPool(pool, "filtered to %d candidate nodes for update, capacity: %d", len(candidates), capacity)
}
if capacity < uint(len(candidates)) {
// Arbitrarily pick the first N candidates; no attempt at sorting.
// Perhaps later we allow admins to weight somehow, or do something more intelligent.
Expand Down
47 changes: 33 additions & 14 deletions pkg/controller/node/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func TestMaxUnavailable(t *testing.T) {

func TestGetCandidateMachines(t *testing.T) {
tests := []struct {
desc string
nodes []*corev1.Node
progress int

Expand All @@ -512,7 +513,7 @@ func TestGetCandidateMachines(t *testing.T) {
// capacity is the maximum number of nodes we could update
capacity uint
}{{
//no progress
desc: "no progress",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -523,7 +524,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 1,
}, {
//no progress
desc: "no progress 2",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -534,7 +535,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
//no progress because we have an unavailable node
desc: "no progress because we have an unavailable node",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -545,7 +546,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// node-2 is going to change config, so we can only progress one more
desc: "node-2 is going to change config, so we can only progress one more",
progress: 3,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -558,7 +559,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: []string{"node-4"},
capacity: 1,
}, {
// We have a node working, don't start anything else
desc: "We have a node working, don't start anything else",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -571,7 +572,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
//progress on old stuck node
desc: "progress on old stuck node",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -582,7 +583,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: []string{"node-2"},
capacity: 1,
}, {
// Don't change a degraded node to same config, but also don't start another
desc: "Don't change a degraded node to same config, but also don't start another",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -593,7 +594,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// Must be able to roll back
desc: "Must be able to roll back",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -605,7 +606,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 1,
}, {
// Validate we also don't affect nodes which haven't started work
desc: "Validate we also don't affect nodes which haven't started work",
progress: 1,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -616,7 +617,7 @@ func TestGetCandidateMachines(t *testing.T) {
otherCandidates: nil,
capacity: 0,
}, {
// A test with more nodes in mixed order
desc: "A test with more nodes in mixed order",
progress: 4,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
Expand All @@ -632,10 +633,28 @@ func TestGetCandidateMachines(t *testing.T) {
expected: []string{"node-3", "node-4"},
otherCandidates: []string{"node-5", "node-6"},
capacity: 2,
}, {
desc: "A test with more nodes in mixed order",
progress: 4,
nodes: []*corev1.Node{
newNodeWithReady("node-0", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-1", "v1", "v1", corev1.ConditionFalse),
newNodeWithReady("node-2", "v0", "v1", corev1.ConditionTrue),
newNodeWithReadyButHeld("node-3", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-4", "v0", "v0", corev1.ConditionTrue),
newNodeWithReadyButHeld("node-5", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-6", "v0", "v0", corev1.ConditionTrue),
newNodeWithReady("node-7", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-8", "v1", "v1", corev1.ConditionTrue),
newNodeWithReady("node-9", "v0", "v0", corev1.ConditionTrue),
},
expected: []string{"node-4", "node-6"},
otherCandidates: []string{"node-9"},
capacity: 2,
}}

for idx, test := range tests {
t.Run(fmt.Sprintf("case#%d", idx), func(t *testing.T) {
t.Run(fmt.Sprintf("case#%d: %s", idx, test.desc), func(t *testing.T) {
pool := &mcfgv1.MachineConfigPool{
Spec: mcfgv1.MachineConfigPoolSpec{
Configuration: mcfgv1.MachineConfigPoolStatusConfiguration{ObjectReference: corev1.ObjectReference{Name: "v1"}},
Expand All @@ -649,10 +668,10 @@ func TestGetCandidateMachines(t *testing.T) {
}
assert.Equal(t, test.expected, nodeNames)

allCandidates, capacity := getAllCandidateMachines(pool, test.nodes, test.progress)
assert.Equal(t, test.capacity, capacity)
state := getAllCandidateMachines(pool, test.nodes, test.progress)
assert.Equal(t, test.capacity, state.capacity)
var otherCandidates []string
for i, node := range allCandidates {
for i, node := range state.candidates {
if i < len(nodeNames) {
assert.Equal(t, node.Name, nodeNames[i])
} else {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/node/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ func newNodeWithReady(name string, currentConfig, desiredConfig string, status c
return node
}

func newNodeWithReadyButHeld(name string, currentConfig, desiredConfig string, status corev1.ConditionStatus) *corev1.Node {
node := newNode(name, currentConfig, desiredConfig)
node.Status = corev1.NodeStatus{Conditions: []corev1.NodeCondition{{Type: corev1.NodeReady, Status: status}}}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[daemonconsts.MachineUpdateHoldAnnotationKey] = "true"
return node
}

func newNodeWithReadyAndDaemonState(name string, currentConfig, desiredConfig string, status corev1.ConditionStatus, dstate string) *corev1.Node {
node := newNode(name, currentConfig, desiredConfig)
node.Status = corev1.NodeStatus{Conditions: []corev1.NodeCondition{{Type: corev1.NodeReady, Status: status}}}
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
DesiredMachineConfigAnnotationKey = "machineconfiguration.openshift.io/desiredConfig"
// MachineConfigDaemonStateAnnotationKey is used to fetch the state of the daemon on the machine.
MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state"
// MachineUpdateHoldAnnotationKey is used to skip specific nodes for updates
MachineUpdateHoldAnnotationKey = "machineconfiguration.openshift.io/hold"
// OpenShiftOperatorManagedLabel is used to filter out kube objects that don't need to be synced by the MCO
OpenShiftOperatorManagedLabel = "openshift.io/operator-managed"
// MachineConfigDaemonStateWorking is set by daemon when it is applying an update.
Expand Down