Skip to content

Commit

Permalink
Merge pull request #2 from su-houzhen/chronus
Browse files Browse the repository at this point in the history
create/delete zookeeper when create/delete clickhouse
  • Loading branch information
dbkernel authored Oct 22, 2021
2 parents a1e5d8a + 630ba82 commit a85d442
Show file tree
Hide file tree
Showing 16 changed files with 1,094 additions and 15 deletions.
5 changes: 5 additions & 0 deletions pkg/apis/clickhouse.radondb.com/v1/type_chi.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,18 @@ func (chi *ClickHouseInstallation) WalkHostsTillError(

// WalkTillError
func (chi *ClickHouseInstallation) WalkTillError(
fZooKeeper func(chi *ClickHouseInstallation) error,
fCHIPreliminary func(chi *ClickHouseInstallation) error,
fCluster func(cluster *ChiCluster) error,
fShard func(shard *ChiShard) error,
fHost func(host *ChiHost) error,
fCHI func(chi *ClickHouseInstallation) error,
) error {

if err := fZooKeeper(chi); err != nil {
return err
}

if err := fCHIPreliminary(chi); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/clickhouse.radondb.com/v1/type_zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (zkc *ChiZookeeperConfig) IsEmpty() bool {
return len(zkc.Nodes) == 0
}

func (zkc *ChiZookeeperConfig) GetStatefulSetReplicasNum() int32 {
return zkc.Replica
}

func (zkc *ChiZookeeperConfig) MergeFrom(from *ChiZookeeperConfig, _type MergeType) {
if from == nil {
return
Expand Down Expand Up @@ -61,4 +65,11 @@ func (zkc *ChiZookeeperConfig) MergeFrom(from *ChiZookeeperConfig, _type MergeTy
if from.Identity != "" {
zkc.Identity = from.Identity
}
if from.Replica > 0 {
zkc.Replica = from.Replica
}
if from.Port > 0 {
zkc.Port = from.Port
}
zkc.Install = from.Install
}
3 changes: 3 additions & 0 deletions pkg/apis/clickhouse.radondb.com/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ type ChiZookeeperConfig struct {
OperationTimeoutMs int `json:"operation_timeout_ms,omitempty" yaml:"operation_timeout_ms"`
Root string `json:"root,omitempty" yaml:"root"`
Identity string `json:"identity,omitempty" yaml:"identity"`
Install bool `json:"install,omitempty" yaml:"install"`
Replica int32 `json:"replica,omitempty" yaml:"replica"`
Port int32 `json:"port,omitempty" yaml:"port"`
}

// ChiZookeeperNode defines item of nodes section of .spec.configuration.zookeeper
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/chi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewController(
chiListerSynced: chopInformerFactory.Clickhouse().V1().ClickHouseInstallations().Informer().HasSynced,
chitLister: chopInformerFactory.Clickhouse().V1().ClickHouseInstallationTemplates().Lister(),
chitListerSynced: chopInformerFactory.Clickhouse().V1().ClickHouseInstallationTemplates().Informer().HasSynced,
pdbLister: kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
serviceLister: kubeInformerFactory.Core().V1().Services().Lister(),
serviceListerSynced: kubeInformerFactory.Core().V1().Services().Informer().HasSynced,
endpointsLister: kubeInformerFactory.Core().V1().Endpoints().Lister(),
Expand Down
49 changes: 49 additions & 0 deletions pkg/controller/chi/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ import (
chop "github.com/TCeason/clickhouse-operator/pkg/apis/clickhouse.radondb.com/v1"
)

// createStatefulSetZooKeeper is an internal function, used in reconcileStatefulSet only
func (c *Controller) createStatefulSetZooKeeper(statefulSet *apps.StatefulSet, chi *chop.ClickHouseInstallation) error {
log.V(1).M(chi).F().P()

if _, err := c.kubeClient.AppsV1().StatefulSets(statefulSet.Namespace).Create(statefulSet); err != nil {
// Unable to create StatefulSet at all
return err
}

// StatefulSet created, wait until it is ready

if err := c.waitZooKeeperReady(statefulSet); err == nil {
// Target generation reached, StatefulSet created successfully
return nil
}

// Unable to run StatefulSet, StatefulSet create failed, time to rollback?
return c.onStatefulSetZooKeeperCreateFailed(statefulSet, chi)
}

// createStatefulSet is an internal function, used in reconcileStatefulSet only
func (c *Controller) createStatefulSet(statefulSet *apps.StatefulSet, host *chop.ChiHost) error {
log.V(1).M(host).F().P()
Expand Down Expand Up @@ -126,6 +146,35 @@ func (c *Controller) onStatefulSetCreateFailed(failedStatefulSet *apps.StatefulS
return fmt.Errorf("unexpected flow")
}

// onStatefulSetZooKeeperCreateFailed handles situation when StatefulSet create failed
// It can just delete failed StatefulSet or do nothing
func (c *Controller) onStatefulSetZooKeeperCreateFailed(failedStatefulSet *apps.StatefulSet, chi *chop.ClickHouseInstallation) error {
// What to do with StatefulSet - look into chop configuration settings
switch c.chop.Config().OnStatefulSetCreateFailureAction {
case chop.OnStatefulSetCreateFailureActionAbort:
// Report appropriate error, it will break reconcile loop
log.V(1).M(chi).F().Info("abort")
return errors.New(fmt.Sprintf("Create failed on %s", util.NamespaceNameString(failedStatefulSet.ObjectMeta)))

case chop.OnStatefulSetCreateFailureActionDelete:
// Delete gracefully failed StatefulSet
log.V(1).M(chi).F().Info("going to DELETE FAILED StatefulSet %s", util.NamespaceNameString(failedStatefulSet.ObjectMeta))
_ = c.deleteZooKeeper(chi)
return c.shouldContinueOnCreateFailed()

case chop.OnStatefulSetCreateFailureActionIgnore:
// Ignore error, continue reconcile loop
log.V(1).M(chi).F().Info("going to ignore error %s", util.NamespaceNameString(failedStatefulSet.ObjectMeta))
return nil

default:
log.V(1).M(chi).A().Error("Unknown c.chop.Config().OnStatefulSetCreateFailureAction=%s", c.chop.Config().OnStatefulSetCreateFailureAction)
return nil
}

return fmt.Errorf("unexpected flow")
}

// onStatefulSetUpdateFailed handles situation when StatefulSet update failed
// It can try to revert StatefulSet to its previous version, specified in rollbackStatefulSet
func (c *Controller) onStatefulSetUpdateFailed(rollbackStatefulSet *apps.StatefulSet, host *chop.ChiHost) error {
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/chi/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,64 @@ func (c *Controller) deleteHost(host *chop.ChiHost) error {
return nil
}

// deleteZooKeeper deletes all kubernetes resources related to ZooKeeper
func (c *Controller) deleteZooKeeper(chi *chop.ClickHouseInstallation) error {
// Each ZooKeeper consists of
// 1. StatefulSet
// 2. PersistentVolumeClaim
// 3. Service
// Need to delete all these item

log.V(1).M(chi).S().Info(chi.Name)
defer log.V(1).M(chi).E().Info(chi.Name)

var err error

// Namespaced name
zooKeeperName := chopmodel.CreateStatefulSetZooKeeperName(chi)
namespace := chi.Namespace

// delete StatefulSet
log.V(1).M(chi).F().Info("%s/%s", namespace, zooKeeperName)
if err = c.kubeClient.AppsV1().StatefulSets(namespace).Delete(zooKeeperName, newDeleteOptions()); err == nil {
log.V(1).M(chi).Info("OK delete StatefulSet %s/%s", namespace, zooKeeperName)
} else if apierrors.IsNotFound(err) {
log.V(1).M(chi).Info("NEUTRAL not found StatefulSet %s/%s", namespace, zooKeeperName)
err = nil
} else {
log.V(1).M(chi).A().Error("FAIL delete StatefulSet %s/%s err: %v", namespace, zooKeeperName, err)
}

// delete PDB
pdbName := chopmodel.CreatePodDisruptionBudgetZooKeeperName(chi)
log.V(1).M(chi).F().Info("%s/%s", namespace, pdbName)
if err = c.kubeClient.PolicyV1beta1().PodDisruptionBudgets(namespace).Delete(pdbName, newDeleteOptions()); err == nil {
log.V(1).M(chi).Info("OK delete PodDisruptionBudget %s/%s", namespace, pdbName)
} else if apierrors.IsNotFound(err) {
log.V(1).M(chi).Info("NEUTRAL not found PodDisruptionBudget %s/%s", namespace, pdbName)
err = nil
} else {
log.V(1).M(chi).A().Error("FAIL delete PodDisruptionBudget %s/%s err: %v", namespace, pdbName, err)
}

// delete PVC?

// delete Service
clientName := chopmodel.CreateStatefulSetServiceZooKeeperClientName(chi)
log.V(1).M(chi).F().Info("%s/%s", namespace, clientName)
if err = c.deleteServiceIfExists(namespace, clientName); err != nil {
log.V(1).M(chi).A().Error("FAIL delete Service %s/%s err: %v", namespace, clientName, err)
}

serverName := chopmodel.CreateStatefulSetServiceZooKeeperServerName(chi)
log.V(1).M(chi).F().Info("%s/%s", namespace, serverName)
if err = c.deleteServiceIfExists(namespace, serverName); err != nil {
log.V(1).M(chi).A().Error("FAIL delete Service %s/%s err: %v", namespace, serverName, err)
}

return err
}

// deleteConfigMapsCHI
func (c *Controller) deleteConfigMapsCHI(chi *chop.ClickHouseInstallation) error {
// Delete common ConfigMap's
Expand Down
48 changes: 48 additions & 0 deletions pkg/controller/chi/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kublabels "k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -124,6 +125,53 @@ func (c *Controller) getService(objMeta *meta.ObjectMeta, byNameOnly bool) (*cor
return nil, fmt.Errorf("too much objects found %d expecting 1", len(objects))
}

// getPodDisruptionBudget gets PodDisruptionBudget either by namespaced name or by labels
func (c *Controller) getPodDisruptionBudget(objMeta *meta.ObjectMeta, byNameOnly bool) (*policy.PodDisruptionBudget, error) {
get := c.pdbLister.PodDisruptionBudgets(objMeta.Namespace).Get
list := c.pdbLister.PodDisruptionBudgets(objMeta.Namespace).List
var objects []*policy.PodDisruptionBudget

// Check whether object with such name already exists
obj, err := get(objMeta.Name)

if (obj != nil) && (err == nil) {
// Object found by name
return obj, nil
}

if !apierrors.IsNotFound(err) {
// Error, which is not related to "Object not found"
return nil, err
}

// Object not found by name. Try to find by labels

if byNameOnly {
return nil, fmt.Errorf("object not found by name %s/%s and no label search allowed ", objMeta.Namespace, objMeta.Name)
}

var selector kublabels.Selector
if selector, err = chopmodel.MakeSelectorFromObjectMeta(objMeta); err != nil {
return nil, err
}

if objects, err = list(selector); err != nil {
return nil, err
}

if len(objects) == 0 {
return nil, apierrors.NewNotFound(apps.Resource("PodDisruptionBudget"), objMeta.Name)
}

if len(objects) == 1 {
// Exactly one object found by labels
return objects[0], nil
}

// Too much objects found by labels
return nil, fmt.Errorf("too much objects found %d expecting 1", len(objects))
}

// getStatefulSet gets StatefulSet. Accepted types:
// 1. *meta.ObjectMeta
// 2. *chop.ChiHost
Expand Down
29 changes: 29 additions & 0 deletions pkg/controller/chi/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ func (c *Controller) waitHostReady(host *chop.ChiHost) error {
)
}

// waitZooKeeperReady polls zookeeper's StatefulSet until it is ready
func (c *Controller) waitZooKeeperReady(statefulSet *apps.StatefulSet) error {
// Wait for StatefulSet to reach generation
err := c.pollStatefulSet(
statefulSet,
nil,
func(sts *apps.StatefulSet) bool {
if sts == nil {
return false
}
return model.IsStatefulSetGeneration(sts, sts.Generation)
},
func() {},
)
if err != nil {
return err
}

// Wait StatefulSet to reach ready status
return c.pollStatefulSet(
statefulSet,
nil,
func(sts *apps.StatefulSet) bool {
return model.IsStatefulSetReady(sts)
},
func() {},
)
}

// waitHostDeleted polls host's StatefulSet until it is not available
func (c *Controller) waitHostDeleted(host *chop.ChiHost) {
for {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/chi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
kube "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Controller struct {
chitLister choplisters.ClickHouseInstallationTemplateLister
chitListerSynced cache.InformerSynced

// pdbLister used as pdbLister.Services(namespace).Get(name)
pdbLister policylisters.PodDisruptionBudgetLister
// serviceLister used as serviceLister.Services(namespace).Get(name)
serviceLister corelisters.ServiceLister
// serviceListerSynced used in waitForCacheSync()
Expand Down
Loading

0 comments on commit a85d442

Please sign in to comment.