Skip to content

Commit

Permalink
WIP: Update etcd followers first
Browse files Browse the repository at this point in the history
The way we're talking to etcd is a bit hacky, I ended up cargo
culting some code.  This would be much cleaner if the etcd operator
did it.

But it's critical that we update the etcd followers first, because
leader elections are disruptive events and we can easily minimize
that.

Closes: openshift#1897
  • Loading branch information
cgwalters committed Aug 6, 2020
1 parent ab01a6b commit 26ecf6e
Show file tree
Hide file tree
Showing 335 changed files with 88,345 additions and 165 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/huandu/xstrings v1.2.0 // indirect
github.com/imdario/mergo v0.3.9
github.com/json-iterator/go v1.1.10 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/opencontainers/go-digest v1.0.0
github.com/openshift/api v3.9.1-0.20191111211345-a27ff30ebf09+incompatible
Expand All @@ -55,8 +56,11 @@ require (
github.com/ultraware/funlen v0.0.2 // indirect
github.com/vincent-petithory/dataurl v0.0.0-20160330182126-9a301d65acbb
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/grpc v1.26.0
k8s.io/api v0.18.3
k8s.io/apiextensions-apiserver v0.18.0
k8s.io/apimachinery v0.18.3
Expand Down
87 changes: 12 additions & 75 deletions go.sum

Large diffs are not rendered by default.

33 changes: 31 additions & 2 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (ctrl *Controller) updateNode(old, cur interface{}) {
daemonconsts.CurrentMachineConfigAnnotationKey,
daemonconsts.DesiredMachineConfigAnnotationKey,
daemonconsts.MachineConfigDaemonStateAnnotationKey,
daemonconsts.UpdateDisruptionScoreAnnotationKey,
}
for _, anno := range annos {
newValue := curNode.Annotations[anno]
Expand Down Expand Up @@ -875,9 +876,37 @@ func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.
return nodes[:capacity]
}

// getCurrentEtcdLeader is not yet implemented
// getCurrentEtcdLeader looks at the candidate nodes (should be control plane)
// at the "update disruption score" key which is currently just a function
// of the etcd leader, but in the future we might change to something beyond
// just 0/1 and/or expand beyond the control plane.
func (ctrl *Controller) getCurrentEtcdLeader(candidates []*corev1.Node) (*corev1.Node, error) {
return nil, nil
var leader *corev1.Node
foundAnno := false
key := daemonconsts.UpdateDisruptionScoreAnnotationKey
for _, node := range candidates {
if v, ok := node.Annotations[key]; ok {
if v == "1" {
if leader != nil {
glog.Warningf("Multiple etcd leaders, also found %s", node.Name)
} else {
leader = node
}
} else if v != "0" {
glog.Warningf("Unknown %s %s: %s", node.Name, key, v)
continue
}
foundAnno = true
}
}
if !foundAnno {
return nil, fmt.Errorf("Didn't find annotation %s on any candidate", key)
}
if leader != nil {
// Take me to your leader
return leader, nil
}
return nil, fmt.Errorf("no leader")
}

// filterControlPlaneCandidateNodes adjusts the candidates and capacity specifically
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state"
// OpenShiftOperatorManagedLabel is used to filter out kube objects that don't need to be synced by the MCO
OpenShiftOperatorManagedLabel = "openshift.io/operator-managed"
// UpdateDisruptionScore is the MCD's estimate of node update disruption; currently
// it is 1 for the etcd leader on control plane nodes, and 0 otherwise.
UpdateDisruptionScoreAnnotationKey = "machineconfiguration.openshift.io/uds"
// MachineConfigDaemonStateWorking is set by daemon when it is applying an update.
MachineConfigDaemonStateWorking = "Working"
// MachineConfigDaemonStateDone is set by daemon when it is done applying an update.
Expand Down
13 changes: 13 additions & 0 deletions pkg/daemon/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,18 @@ func (dn *Daemon) initializeControlPlane() error {
if err := updateOstreeObjectSync(); err != nil {
return err
}
ioniceEtcd(dn.stopCh)
go func() {
c := watchCurrentEtcdLeader(dn.stopCh)
for {
select {
case leader := <-c:
glog.Infof("node is etcd leader: %v", leader)
dn.nodeWriter.SetEtcdLeader(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name, leader)
case <-dn.stopCh:
return
}
}
}()
return nil
}
157 changes: 157 additions & 0 deletions pkg/daemon/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package daemon

// This file contains code to talk to the local etcd instance
// on control plane nodes, used as part of coordinating
// updates:
// https://github.com/openshift/machine-config-operator/issues/1897

import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc"
)

const (
// pollEtcdSecs is how often we ask etcd who the leader is
pollEtcdSecs = 10
endpoint = "https://localhost:2379"
podDir = "/etc/kubernetes/static-pod-resources"
)

// Yes this is a brutal hack that should be replaced by something
// discussed in https://github.com/openshift/api/pull/694
func hackilyFindAPIServerCerts() (string, error) {
ents, err := ioutil.ReadDir(podDir)
if err != nil {
return "", err
}
for _, ent := range ents {
if !strings.HasPrefix(ent.Name(), "kube-apiserver-pod") {
continue
}
path := filepath.Join(podDir, ent.Name())
keyPath := filepath.Join(path, "secrets/etcd-client/tls.key")
_, err := os.Stat(keyPath)
if err != nil {
continue
}
return path, nil
}
return "", nil
}

func initEtcdClient() (*clientv3.Client, error) {
path, err := hackilyFindAPIServerCerts()
if err != nil {
return nil, err
}
if path == "" {
return nil, fmt.Errorf("Couldn't find kube-apiserver secrets yet")
}

// TODO - this is obviously an awful hack, but I don't think we want to bind in the etcd certs for
// all MCDs. We could do another daemonset that only runs on the controlplane...
tlsInfo := transport.TLSInfo{
CertFile: filepath.Join(path, "secrets/etcd-client/tls.crt"),
KeyFile: filepath.Join(path, "secrets/etcd-client/tls.key"),
TrustedCAFile: filepath.Join(path, "configmaps/etcd-serving-ca/ca-bundle.crt"),
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
}
cfg := clientv3.Config{
DialOptions: dialOptions,
Endpoints: []string{endpoint},
DialTimeout: 15 * time.Second,
TLS: tlsConfig,
}

return clientv3.New(cfg)
}

// watchCurrentEtcdLeader creates a channel that signals changes in the etcd leader
func watchCurrentEtcdLeader(stopCh <-chan struct{}) <-chan bool {
context, cancel := context.WithCancel(clientv3.WithRequireLeader(context.Background()))

r := make(chan bool)
go func() {
var lastLeader *bool
var c *clientv3.Client
for {
var err error
c, err = initEtcdClient()
if err != nil {
glog.Warningf("Failed to init etcd client: %v", err)
time.Sleep(pollEtcdSecs * time.Second)
} else {
break
}
}
for {
resp, err := c.Status(context, endpoint)
if err != nil {
glog.Errorf("Failed to get etcd Status: %v", err)
if lastLeader != nil {
lastLeader = nil
r <- false
}
} else {
leader := resp.Header.MemberId == resp.Leader
if lastLeader == nil || *lastLeader != leader {
lastLeader = &leader
r <- leader
}
}
select {
case <-stopCh:
cancel()
return
case <-time.After(pollEtcdSecs * time.Second):
}
}
}()
return r
}

// This will be replaced by https://github.com/openshift/cluster-etcd-operator/pull/418
func ioniceEtcd(stopCh <-chan struct{}) {
go func() {
for {
pids, err := exec.Command("pgrep", "etcd").CombinedOutput()
if err == nil && len(pids) > 0 {
pid, err := strconv.ParseInt(string(pids), 10, 64)
if err != nil {
glog.Warningf("Failed to parse pgrep etcd: %v", err)
} else {
err = exec.Command("ionice", "-c2", "-n0", "-p", fmt.Sprintf("%d", pid)).Run()
if err != nil {
glog.Warningf("Failed to parse ionice etcd: %v", err)
}
glog.Info("Reniced etcd")
return
}
}

select {
case <-stopCh:
return
case <-time.After(30 * time.Second):
}
}
}()
}
21 changes: 21 additions & 0 deletions pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type NodeWriter interface {
SetUnreconcilable(err error, client corev1client.NodeInterface, lister corev1lister.NodeLister, node string) error
SetDegraded(err error, client corev1client.NodeInterface, lister corev1lister.NodeLister, node string) error
SetSSHAccessed(client corev1client.NodeInterface, lister corev1lister.NodeLister, node string) error
SetEtcdLeader(client corev1client.NodeInterface, lister corev1lister.NodeLister, node string, leader bool) error
}

// newNodeWriter Create a new NodeWriter
Expand Down Expand Up @@ -173,6 +174,26 @@ func (nw *clusterNodeWriter) SetSSHAccessed(client corev1client.NodeInterface, l
return <-respChan
}

// SetEtcdLeader updates the node's etcd leader annotation
func (nw *clusterNodeWriter) SetEtcdLeader(client corev1client.NodeInterface, lister corev1lister.NodeLister, node string, leader bool) error {
val := "0"
if leader {
val = "1"
}
annos := map[string]string{
constants.UpdateDisruptionScoreAnnotationKey: val,
}
respChan := make(chan error, 1)
nw.writer <- message{
client: client,
lister: lister,
node: node,
annos: annos,
responseChannel: respChan,
}
return <-respChan
}

func setNodeAnnotations(client corev1client.NodeInterface, lister corev1lister.NodeLister, nodeName string, m map[string]string) (*corev1.Node, error) {
node, err := internal.UpdateNodeRetry(client, lister, nodeName, func(node *corev1.Node) {
for k, v := range m {
Expand Down
Loading

0 comments on commit 26ecf6e

Please sign in to comment.