Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KUBESAW-12: Convert the health-check goroutine into ToolchainCluster controller #386

Merged
merged 16 commits into from
Apr 22, 2024
Merged
Original file line number Diff line number Diff line change
@@ -1,92 +1,32 @@
package toolchainclustercache
package toolchaincluster

import (
"context"
"fmt"
"strings"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var logger = logf.Log.WithName("toolchaincluster_healthcheck")

const (
healthzOk = "/healthz responded with ok"
healthzNotOk = "/healthz responded without ok"
clusterNotReachableMsg = "cluster is not reachable"
clusterReachableMsg = "cluster is reachable"
)

func StartHealthChecks(ctx context.Context, mgr manager.Manager, namespace string, period time.Duration) {
logger.Info("starting health checks", "period", period)
go wait.Until(func() {
updateClusterStatuses(ctx, namespace, mgr.GetClient())
}, period, ctx.Done())
}

type HealthChecker struct {
localClusterClient client.Client
remoteClusterClient client.Client
remoteClusterClientset *kubeclientset.Clientset
logger logr.Logger
}

// updateClusterStatuses checks cluster health and updates status of all ToolchainClusters
func updateClusterStatuses(ctx context.Context, namespace string, cl client.Client) {
clusters := &toolchainv1alpha1.ToolchainClusterList{}
err := cl.List(ctx, clusters, client.InNamespace(namespace))
if err != nil {
logger.Error(err, "unable to list existing ToolchainClusters")
return
}
if len(clusters.Items) == 0 {
logger.Info("no ToolchainCluster found")
}

for _, obj := range clusters.Items {
clusterObj := obj.DeepCopy()
clusterLogger := logger.WithValues("cluster-name", clusterObj.Name)

cachedCluster, ok := cluster.GetCachedToolchainCluster(clusterObj.Name)
if !ok {
clusterLogger.Error(fmt.Errorf("cluster %s not found in cache", clusterObj.Name), "failed to retrieve stored data for cluster")
clusterObj.Status.Conditions = []toolchainv1alpha1.ToolchainClusterCondition{clusterOfflineCondition()}
if err := cl.Status().Update(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "failed to update the status of ToolchainCluster")
}
continue
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
clusterLogger.Error(err, "cannot create ClientSet for a ToolchainCluster")
continue
}

healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: clusterLogger,
}
// clusterLogger.Info("getting the current state of ToolchainCluster")
if err := healthChecker.updateIndividualClusterStatus(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "unable to update cluster status of ToolchainCluster")
}
}
}

func (hc *HealthChecker) updateIndividualClusterStatus(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster) error {

currentClusterStatus := hc.getClusterHealthStatus(ctx)
Expand Down
177 changes: 177 additions & 0 deletions controllers/toolchaincluster/healthchecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package toolchaincluster

import (
"context"
"testing"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/codeready-toolchain/toolchain-common/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
corev1 "k8s.io/api/core/v1"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var logger = logf.Log.WithName("toolchaincluster_healthcheck")

func TestClusterHealthChecks(t *testing.T) {

// given
defer gock.Off()
tcNs := "test-namespace"
gock.New("http://cluster.com").
Get("healthz").
Persist().
Reply(200).
BodyString("ok")
gock.New("http://unstable.com").
Get("healthz").
Persist().
Reply(200).
BodyString("unstable")
gock.New("http://not-found.com").
Get("healthz").
Persist().
Reply(404)

tests := map[string]struct {
tctype string
apiendpoint string
clusterconditions []toolchainv1alpha1.ToolchainClusterCondition
status toolchainv1alpha1.ToolchainClusterStatus
}{
Comment on lines +41 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this way you improved the unit test 👍 🥇

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

//ToolchainCluster.status doesn't contain any conditions
"UnstableNoCondition": {
tctype: "unstable",
apiendpoint: "http://unstable.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{unhealthy(), notOffline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
"StableNoCondition": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
"NotFoundNoCondition": {
tctype: "not-found",
apiendpoint: "http://not-found.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
//ToolchainCluster.status already contains conditions
"UnstableContainsCondition": {
tctype: "unstable",
apiendpoint: "http://unstable.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{unhealthy(), notOffline()},
status: withStatus(healthy()),
},
"StableContainsCondition": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: withStatus(offline()),
},
"NotFoundContainsCondition": {
tctype: "not-found",
apiendpoint: "http://not-found.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: withStatus(healthy()),
},
//if the connection cannot be established at beginning, then it should be offline
"OfflineConnectionNotEstablished": {
tctype: "failing",
apiendpoint: "http://failing.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{offline()},
status: toolchainv1alpha1.ToolchainClusterStatus{},
},
//if no zones nor region is retrieved, then keep the current
"NoZoneKeepCurrent": {
tctype: "stable",
apiendpoint: "http://cluster.com",
clusterconditions: []toolchainv1alpha1.ToolchainClusterCondition{healthy()},
status: withStatus(offline()),
},
}
for k, tc := range tests {
t.Run(k, func(t *testing.T) {
tctype, sec := newToolchainCluster(tc.tctype, tcNs, tc.apiendpoint, tc.status)
cl := test.NewFakeClient(t, tctype, sec)
reset := setupCachedClusters(t, cl, tctype)
defer reset()
cachedtc, found := cluster.GetCachedToolchainCluster(tctype.Name)
require.True(t, found)
cacheclient, err := kubeclientset.NewForConfig(cachedtc.RestConfig)
require.NoError(t, err)
healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedtc.Client,
remoteClusterClientset: cacheclient,
logger: logger,
}
// when
err = healthChecker.updateIndividualClusterStatus(context.TODO(), tctype)

//then
require.NoError(t, err)
assertClusterStatus(t, cl, tc.tctype, tc.clusterconditions...)
})
}
}

func withStatus(conditions ...toolchainv1alpha1.ToolchainClusterCondition) toolchainv1alpha1.ToolchainClusterStatus {
return toolchainv1alpha1.ToolchainClusterStatus{
Conditions: conditions,
}
}
func assertClusterStatus(t *testing.T, cl client.Client, clusterName string, clusterConds ...toolchainv1alpha1.ToolchainClusterCondition) {
tc := &toolchainv1alpha1.ToolchainCluster{}
err := cl.Get(context.TODO(), test.NamespacedName("test-namespace", clusterName), tc)
require.NoError(t, err)
assert.Len(t, tc.Status.Conditions, len(clusterConds))
ExpConditions:
for _, expCond := range clusterConds {
for _, cond := range tc.Status.Conditions {
if expCond.Type == cond.Type {
assert.Equal(t, expCond.Status, cond.Status)
assert.Equal(t, expCond.Reason, cond.Reason)
assert.Equal(t, expCond.Message, cond.Message)
continue ExpConditions
}
}
assert.Failf(t, "condition not found", "the list of conditions %v doesn't contain the expected condition %v", tc.Status.Conditions, expCond)
}
}
func healthy() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{
Type: toolchainv1alpha1.ToolchainClusterReady,
Status: corev1.ConditionTrue,
Reason: "ClusterReady",
Message: "/healthz responded with ok",
}
}
func unhealthy() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterReady,
Status: corev1.ConditionFalse,
Reason: "ClusterNotReady",
Message: "/healthz responded without ok",
}
}
func offline() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterOffline,
Status: corev1.ConditionTrue,
Reason: "ClusterNotReachable",
Message: "cluster is not reachable",
}
}
func notOffline() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{Type: toolchainv1alpha1.ToolchainClusterOffline,
Status: corev1.ConditionFalse,
Reason: "ClusterReachable",
Message: "cluster is reachable",
}
}
82 changes: 82 additions & 0 deletions controllers/toolchaincluster/toolchaincluster_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package toolchaincluster

import (
"context"
"fmt"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Reconciler reconciles a ToolchainCluster object
type Reconciler struct {
Client client.Client
Scheme *runtime.Scheme
RequeAfter time.Duration
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&toolchainv1alpha1.ToolchainCluster{}).
Complete(r)

Check warning on line 30 in controllers/toolchaincluster/toolchaincluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchaincluster/toolchaincluster_controller.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}

// Reconcile reads that state of the cluster for a ToolchainCluster object and makes changes based on the state read
// and what is in the ToolchainCluster.Spec. It updates the status of the individual cluster
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling ToolchainCluster")

// Fetch the ToolchainCluster instance
toolchainCluster := &toolchainv1alpha1.ToolchainCluster{}
err := r.Client.Get(ctx, request.NamespacedName, toolchainCluster)
if err != nil {
if errors.IsNotFound(err) {
// Stop monitoring the toolchain cluster as it is deleted
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}

cachedCluster, ok := cluster.GetCachedToolchainCluster(toolchainCluster.Name)
if !ok {
err := fmt.Errorf("cluster %s not found in cache", toolchainCluster.Name)
toolchainCluster.Status.Conditions = []toolchainv1alpha1.ToolchainClusterCondition{clusterOfflineCondition()}
if err := r.Client.Status().Update(ctx, toolchainCluster); err != nil {
reqLogger.Error(err, "failed to update the status of ToolchainCluster")

Check warning on line 59 in controllers/toolchaincluster/toolchaincluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchaincluster/toolchaincluster_controller.go#L59

Added line #L59 was not covered by tests
}
return reconcile.Result{}, err
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
reqLogger.Error(err, "cannot create ClientSet for the ToolchainCluster")
return reconcile.Result{}, err

Check warning on line 67 in controllers/toolchaincluster/toolchaincluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchaincluster/toolchaincluster_controller.go#L66-L67

Added lines #L66 - L67 were not covered by tests
fbm3307 marked this conversation as resolved.
Show resolved Hide resolved
}
healthChecker := &HealthChecker{
localClusterClient: r.Client,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: reqLogger,
}
//update the status of the individual cluster.
if err := healthChecker.updateIndividualClusterStatus(ctx, toolchainCluster); err != nil {
reqLogger.Error(err, "unable to update cluster status of ToolchainCluster")
return reconcile.Result{}, err

Check warning on line 78 in controllers/toolchaincluster/toolchaincluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchaincluster/toolchaincluster_controller.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

return reconcile.Result{RequeueAfter: r.RequeAfter}, nil
filariow marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading