Skip to content

Commit

Permalink
KUBESAW-12: Convert the health-check goroutine into ToolchainCluster …
Browse files Browse the repository at this point in the history
…controller (#386)

* KUBESAW-12: Convert the health-check goroutine into ToolchainCluster controller

Signed-off-by: Feny Mehta <[email protected]>

* Fixing go-lint

Signed-off-by: Feny Mehta <[email protected]>

* Adding some test cases

Signed-off-by: Feny Mehta <[email protected]>

* adding the test cases as previous

Signed-off-by: Feny Mehta <[email protected]>

* Review comments  1

Signed-off-by: Feny Mehta <[email protected]>

* removing unused clustercacheservice

Signed-off-by: Feny Mehta <[email protected]>

* Refactoring the tests

Signed-off-by: Feny Mehta <[email protected]>

* Fixing go lint

Signed-off-by: Feny Mehta <[email protected]>

* Just convert to controller

Signed-off-by: Feny Mehta <[email protected]>

* Updated package name

Signed-off-by: Feny Mehta <[email protected]>

* Review comments-2

Signed-off-by: Feny Mehta <[email protected]>

* better readability

Signed-off-by: Feny Mehta <[email protected]>

---------

Signed-off-by: Feny Mehta <[email protected]>
  • Loading branch information
fbm3307 authored Apr 22, 2024
1 parent d0cc930 commit e6d41ea
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 238 deletions.
Original file line number Diff line number Diff line change
@@ -1,92 +1,32 @@
package toolchainclustercache
package toolchaincluster

import (
"context"
"fmt"
"strings"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var logger = logf.Log.WithName("toolchaincluster_healthcheck")

const (
healthzOk = "/healthz responded with ok"
healthzNotOk = "/healthz responded without ok"
clusterNotReachableMsg = "cluster is not reachable"
clusterReachableMsg = "cluster is reachable"
)

func StartHealthChecks(ctx context.Context, mgr manager.Manager, namespace string, period time.Duration) {
logger.Info("starting health checks", "period", period)
go wait.Until(func() {
updateClusterStatuses(ctx, namespace, mgr.GetClient())
}, period, ctx.Done())
}

type HealthChecker struct {
localClusterClient client.Client
remoteClusterClient client.Client
remoteClusterClientset *kubeclientset.Clientset
logger logr.Logger
}

// updateClusterStatuses checks cluster health and updates status of all ToolchainClusters
func updateClusterStatuses(ctx context.Context, namespace string, cl client.Client) {
clusters := &toolchainv1alpha1.ToolchainClusterList{}
err := cl.List(ctx, clusters, client.InNamespace(namespace))
if err != nil {
logger.Error(err, "unable to list existing ToolchainClusters")
return
}
if len(clusters.Items) == 0 {
logger.Info("no ToolchainCluster found")
}

for _, obj := range clusters.Items {
clusterObj := obj.DeepCopy()
clusterLogger := logger.WithValues("cluster-name", clusterObj.Name)

cachedCluster, ok := cluster.GetCachedToolchainCluster(clusterObj.Name)
if !ok {
clusterLogger.Error(fmt.Errorf("cluster %s not found in cache", clusterObj.Name), "failed to retrieve stored data for cluster")
clusterObj.Status.Conditions = []toolchainv1alpha1.ToolchainClusterCondition{clusterOfflineCondition()}
if err := cl.Status().Update(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "failed to update the status of ToolchainCluster")
}
continue
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
clusterLogger.Error(err, "cannot create ClientSet for a ToolchainCluster")
continue
}

healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: clusterLogger,
}
// clusterLogger.Info("getting the current state of ToolchainCluster")
if err := healthChecker.updateIndividualClusterStatus(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "unable to update cluster status of ToolchainCluster")
}
}
}

func (hc *HealthChecker) updateIndividualClusterStatus(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster) error {

currentClusterStatus := hc.getClusterHealthStatus(ctx)
Expand Down
177 changes: 177 additions & 0 deletions controllers/toolchaincluster/healthchecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package toolchaincluster

import (
"context"
"testing"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/codeready-toolchain/toolchain-common/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
corev1 "k8s.io/api/core/v1"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var logger = logf.Log.WithName("toolchaincluster_healthcheck")

func TestClusterHealthChecks(t *testing.T) {

// given
defer gock.Off()
tcNs := "test-namespace"
gock.New("http://cluster.com").
Get("healthz").
Persist().
Reply(200).
BodyString("ok")
gock.New("http://unstable.com").
Get("healthz").
Persist().
Reply(200).
BodyString("unstable")
gock.New("http://not-found.com").
Get("healthz").
Persist().
Reply(404)

tests := map[string]struct {
tctype string
apiendpoint string
clusterconditions []toolchainv1alpha1.ToolchainClusterCondition
status toolchainv1alpha1.ToolchainClusterStatus
}{
//ToolchainCluster.status doesn't contain any conditions
"UnstableNoCondition": {
tctype: "unstable",
apiendpoint: "http://unstable.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{unhealthy(), notOffline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
"StableNoCondition": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
"NotFoundNoCondition": {
tctype: "not-found",
apiendpoint: "http://not-found.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
//ToolchainCluster.status already contains conditions
"UnstableContainsCondition": {
tctype: "unstable",
apiendpoint: "http://unstable.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{unhealthy(), notOffline()},
status: withStatus(healthy()),
},
"StableContainsCondition": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: withStatus(offline()),
},
"NotFoundContainsCondition": {
tctype: "not-found",
apiendpoint: "http://not-found.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: withStatus(healthy()),
},
//if the connection cannot be established at beginning, then it should be offline
"OfflineConnectionNotEstablished": {
tctype: "failing",
apiendpoint: "http://failing.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
//if no zones nor region is retrieved, then keep the current
"NoZoneKeepCurrent": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: withStatus(offline()),
},
}
for k, tc := range tests {
t.Run(k, func(t *testing.T) {
tctype, sec := newToolchainCluster(tc.tctype, tcNs, tc.apiendpoint, tc.status)
cl := test.NewFakeClient(t, tctype, sec)
reset := setupCachedClusters(t, cl, tctype)
defer reset()
cachedtc, found := cluster.GetCachedToolchainCluster(tctype.Name)
require.True(t, found)
cacheclient, err := kubeclientset.NewForConfig(cachedtc.RestConfig)
require.NoError(t, err)
healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedtc.Client,
remoteClusterClientset: cacheclient,
logger: logger,
}
// when
err = healthChecker.updateIndividualClusterStatus(context.TODO(), tctype)

//then
require.NoError(t, err)
assertClusterStatus(t, cl, tc.tctype, tc.clusterconditions...)
})
}
}

func withStatus(conditions ...toolchainv1alpha1.ToolchainClusterCondition) toolchainv1alpha1.ToolchainClusterStatus {
return toolchainv1alpha1.ToolchainClusterStatus{
Conditions: conditions,
}
}
func assertClusterStatus(t *testing.T, cl client.Client, clusterName string, clusterConds ...toolchainv1alpha1.ToolchainClusterCondition) {
tc := &toolchainv1alpha1.ToolchainCluster{}
err := cl.Get(context.TODO(), test.NamespacedName("test-namespace", clusterName), tc)
require.NoError(t, err)
assert.Len(t, tc.Status.Conditions, len(clusterConds))
ExpConditions:
for _, expCond := range clusterConds {
for _, cond := range tc.Status.Conditions {
if expCond.Type == cond.Type {
assert.Equal(t, expCond.Status, cond.Status)
assert.Equal(t, expCond.Reason, cond.Reason)
assert.Equal(t, expCond.Message, cond.Message)
continue ExpConditions
}
}
assert.Failf(t, "condition not found", "the list of conditions %v doesn't contain the expected condition %v", tc.Status.Conditions, expCond)
}
}
func healthy() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{
Type: toolchainv1alpha1.ToolchainClusterReady,
Status: corev1.ConditionTrue,
Reason: "ClusterReady",
Message: "/healthz responded with ok",
}
}
func unhealthy() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterReady,
Status: corev1.ConditionFalse,
Reason: "ClusterNotReady",
Message: "/healthz responded without ok",
}
}
func offline() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterOffline,
Status: corev1.ConditionTrue,
Reason: "ClusterNotReachable",
Message: "cluster is not reachable",
}
}
func notOffline() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterOffline,
Status: corev1.ConditionFalse,
Reason: "ClusterReachable",
Message: "cluster is reachable",
}
}
82 changes: 82 additions & 0 deletions controllers/toolchaincluster/toolchaincluster_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package toolchaincluster

import (
"context"
"fmt"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Reconciler reconciles a ToolchainCluster object
type Reconciler struct {
Client client.Client
Scheme *runtime.Scheme
RequeAfter time.Duration
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&toolchainv1alpha1.ToolchainCluster{}).
Complete(r)
}

// Reconcile reads that state of the cluster for a ToolchainCluster object and makes changes based on the state read
// and what is in the ToolchainCluster.Spec. It updates the status of the individual cluster
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling ToolchainCluster")

// Fetch the ToolchainCluster instance
toolchainCluster := &toolchainv1alpha1.ToolchainCluster{}
err := r.Client.Get(ctx, request.NamespacedName, toolchainCluster)
if err != nil {
if errors.IsNotFound(err) {
// Stop monitoring the toolchain cluster as it is deleted
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}

cachedCluster, ok := cluster.GetCachedToolchainCluster(toolchainCluster.Name)
if !ok {
err := fmt.Errorf("cluster %s not found in cache", toolchainCluster.Name)
toolchainCluster.Status.Conditions = []toolchainv1alpha1.ToolchainClusterCondition{clusterOfflineCondition()}
if err := r.Client.Status().Update(ctx, toolchainCluster); err != nil {
reqLogger.Error(err, "failed to update the status of ToolchainCluster")
}
return reconcile.Result{}, err
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
reqLogger.Error(err, "cannot create ClientSet for the ToolchainCluster")
return reconcile.Result{}, err
}
healthChecker := &HealthChecker{
localClusterClient: r.Client,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: reqLogger,
}
//update the status of the individual cluster.
if err := healthChecker.updateIndividualClusterStatus(ctx, toolchainCluster); err != nil {
reqLogger.Error(err, "unable to update cluster status of ToolchainCluster")
return reconcile.Result{}, err
}

return reconcile.Result{RequeueAfter: r.RequeAfter}, nil
}
Loading

0 comments on commit e6d41ea

Please sign in to comment.