Skip to content

Commit

Permalink
KUBESAW-12: Convert the health-check goroutine into ToolchainCluster …
Browse files Browse the repository at this point in the history
…controller

Signed-off-by: Feny Mehta <[email protected]>
  • Loading branch information
fbm3307 committed Apr 10, 2024
1 parent 6b578e1 commit 4b00df6
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,92 +1,32 @@
package toolchainclustercache
package toolchainclusterhealth

import (
"context"
"fmt"
"strings"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var logger = logf.Log.WithName("toolchaincluster_healthcheck")

const (
healthzOk = "/healthz responded with ok"
healthzNotOk = "/healthz responded without ok"
clusterNotReachableMsg = "cluster is not reachable"
clusterReachableMsg = "cluster is reachable"
)

func StartHealthChecks(ctx context.Context, mgr manager.Manager, namespace string, period time.Duration) {
logger.Info("starting health checks", "period", period)
go wait.Until(func() {
updateClusterStatuses(ctx, namespace, mgr.GetClient())
}, period, ctx.Done())
}

type HealthChecker struct {
localClusterClient client.Client
remoteClusterClient client.Client
remoteClusterClientset *kubeclientset.Clientset
logger logr.Logger
}

// updateClusterStatuses checks cluster health and updates status of all ToolchainClusters
func updateClusterStatuses(ctx context.Context, namespace string, cl client.Client) {
clusters := &toolchainv1alpha1.ToolchainClusterList{}
err := cl.List(ctx, clusters, client.InNamespace(namespace))
if err != nil {
logger.Error(err, "unable to list existing ToolchainClusters")
return
}
if len(clusters.Items) == 0 {
logger.Info("no ToolchainCluster found")
}

for _, obj := range clusters.Items {
clusterObj := obj.DeepCopy()
clusterLogger := logger.WithValues("cluster-name", clusterObj.Name)

cachedCluster, ok := cluster.GetCachedToolchainCluster(clusterObj.Name)
if !ok {
clusterLogger.Error(fmt.Errorf("cluster %s not found in cache", clusterObj.Name), "failed to retrieve stored data for cluster")
clusterObj.Status.Conditions = []toolchainv1alpha1.ToolchainClusterCondition{clusterOfflineCondition()}
if err := cl.Status().Update(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "failed to update the status of ToolchainCluster")
}
continue
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
clusterLogger.Error(err, "cannot create ClientSet for a ToolchainCluster")
continue
}

healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: clusterLogger,
}
// clusterLogger.Info("getting the current state of ToolchainCluster")
if err := healthChecker.updateIndividualClusterStatus(ctx, clusterObj); err != nil {
clusterLogger.Error(err, "unable to update cluster status of ToolchainCluster")
}
}
}

func (hc *HealthChecker) updateIndividualClusterStatus(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster) error {

currentClusterStatus := hc.getClusterHealthStatus(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package toolchainclusterhealth

import (
"context"
"fmt"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// NewReconciler returns a new Reconciler
func NewReconciler(mgr manager.Manager, namespace string, timeout time.Duration, requeAfter time.Duration) *Reconciler {
cacheLog := log.Log.WithName("toolchaincluster_health")
clusterCacheService := cluster.NewToolchainClusterService(mgr.GetClient(), cacheLog, namespace, timeout)
return &Reconciler{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
clusterCacheService: clusterCacheService,
requeAfter: requeAfter,

Check warning on line 28 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L21-L28

Added lines #L21 - L28 were not covered by tests
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&toolchainv1alpha1.ToolchainCluster{}).
Complete(r)

Check warning on line 36 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}

// Reconciler reconciles a ToolchainCluster object
type Reconciler struct {
client client.Client
scheme *runtime.Scheme
clusterCacheService cluster.ToolchainClusterService
requeAfter time.Duration
}

// Reconcile reads that state of the cluster for a ToolchainCluster object and makes changes based on the state read
// and what is in the ToolchainCluster.Spec. It updates the status of the individual cluster
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx).WithName("health")
reqLogger.Info("Reconciling ToolchainCluster")

// Fetch the ToolchainCluster instance
toolchainCluster := &toolchainv1alpha1.ToolchainCluster{}
err := r.client.Get(ctx, request.NamespacedName, toolchainCluster)
if err != nil {
if errors.IsNotFound(err) {

Check warning on line 60 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L60

Added line #L60 was not covered by tests
// Stop monitoring the toolchain cluster as it is deleted
return reconcile.Result{}, nil

Check warning on line 62 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L62

Added line #L62 was not covered by tests
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err

Check warning on line 65 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L65

Added line #L65 was not covered by tests
}

cachedCluster, ok := cluster.GetCachedToolchainCluster(toolchainCluster.Name)
if !ok {
err := fmt.Errorf("cluster %s not found in cache", toolchainCluster.Name)
reqLogger.Error(err, "failed to retrieve stored data for cluster")
return reconcile.Result{}, err

Check warning on line 72 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L70-L72

Added lines #L70 - L72 were not covered by tests
}

clientSet, err := kubeclientset.NewForConfig(cachedCluster.RestConfig)
if err != nil {
reqLogger.Error(err, "cannot create ClientSet for a ToolchainCluster")
return reconcile.Result{}, err

Check warning on line 78 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

healthChecker := &HealthChecker{
localClusterClient: r.client,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: reqLogger,
}

//update the status of the individual cluster.
if err := healthChecker.updateIndividualClusterStatus(ctx, toolchainCluster); err != nil {
reqLogger.Error(err, "unable to update cluster status of ToolchainCluster")
return reconcile.Result{}, err

Check warning on line 91 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller.go#L90-L91

Added lines #L90 - L91 were not covered by tests
}

return reconcile.Result{RequeueAfter: r.requeAfter}, nil
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package toolchainclustercache
package toolchainclusterhealth

import (
"context"
"testing"
"time"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
Expand All @@ -11,9 +12,11 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func TestClusterHealthChecks(t *testing.T) {
Expand All @@ -37,61 +40,70 @@ func TestClusterHealthChecks(t *testing.T) {

t.Run("ToolchainCluster.status doesn't contain any conditions", func(t *testing.T) {
unstable, sec := newToolchainCluster("unstable", tcNs, "http://unstable.com", toolchainv1alpha1.ToolchainClusterStatus{})
notFound, _ := newToolchainCluster("not-found", tcNs, "http://not-found.com", toolchainv1alpha1.ToolchainClusterStatus{})
stable, _ := newToolchainCluster("stable", tcNs, "http://cluster.com", toolchainv1alpha1.ToolchainClusterStatus{})

cl := test.NewFakeClient(t, unstable, notFound, stable, sec)
reset := setupCachedClusters(t, cl, unstable, notFound, stable)
cl := test.NewFakeClient(t, unstable, sec)
reset := setupCachedClusters(t, cl, unstable)
defer reset()
service := newToolchainClusterService(t, cl, false)
// given
controller, req := prepareReconcile(unstable, cl, service, 10*time.Second)

// when
updateClusterStatuses(context.TODO(), "test-namespace", cl)
_, err := controller.Reconcile(context.TODO(), req)

// then
require.NoError(t, err)
assertClusterStatus(t, cl, "unstable", notOffline(), unhealthy())
assertClusterStatus(t, cl, "not-found", offline())
assertClusterStatus(t, cl, "stable", healthy())

})

t.Run("ToolchainCluster.status already contains conditions", func(t *testing.T) {
unstable, sec := newToolchainCluster("unstable", tcNs, "http://unstable.com", withStatus(healthy()))
notFound, _ := newToolchainCluster("not-found", tcNs, "http://not-found.com", withStatus(notOffline(), unhealthy()))
stable, _ := newToolchainCluster("stable", tcNs, "http://cluster.com", withStatus(offline()))
cl := test.NewFakeClient(t, unstable, notFound, stable, sec)
resetCache := setupCachedClusters(t, cl, unstable, notFound, stable)
cl := test.NewFakeClient(t, unstable, sec)
resetCache := setupCachedClusters(t, cl, unstable)
defer resetCache()
service := newToolchainClusterService(t, cl, false)
// given
controller, req := prepareReconcile(unstable, cl, service, 10*time.Second)

// when
updateClusterStatuses(context.TODO(), "test-namespace", cl)
_, err := controller.Reconcile(context.TODO(), req)

// then
require.NoError(t, err)
assertClusterStatus(t, cl, "unstable", notOffline(), unhealthy())
assertClusterStatus(t, cl, "not-found", offline())
assertClusterStatus(t, cl, "stable", healthy())
})

t.Run("if no zones nor region is retrieved, then keep the current", func(t *testing.T) {
stable, sec := newToolchainCluster("stable", tcNs, "http://cluster.com", withStatus(offline()))
cl := test.NewFakeClient(t, stable, sec)
resetCache := setupCachedClusters(t, cl, stable)
defer resetCache()
service := newToolchainClusterService(t, cl, false)
// given
controller, req := prepareReconcile(stable, cl, service, 10*time.Second)

// when
updateClusterStatuses(context.TODO(), "test-namespace", cl)
_, err := controller.Reconcile(context.TODO(), req)

// then
require.NoError(t, err)
assertClusterStatus(t, cl, "stable", healthy())
})

t.Run("if the connection cannot be established at beginning, then it should be offline", func(t *testing.T) {
stable, sec := newToolchainCluster("failing", tcNs, "http://failing.com", toolchainv1alpha1.ToolchainClusterStatus{})

stable, sec := newToolchainCluster("failing", tcNs, "http://failing.com", toolchainv1alpha1.ToolchainClusterStatus{})
cl := test.NewFakeClient(t, stable, sec)
service := newToolchainClusterService(t, cl, false)
// given
controller, req := prepareReconcile(stable, cl, service, 10*time.Second)

// when
updateClusterStatuses(context.TODO(), "test-namespace", cl)
_, err := controller.Reconcile(context.TODO(), req)

// then
require.NoError(t, err)
assertClusterStatus(t, cl, "failing", offline())
})
}
Expand Down Expand Up @@ -127,6 +139,21 @@ func newToolchainCluster(name, tcNs string, apiEndpoint string, status toolchain
return toolchainCluster, secret
}

func newToolchainClusterService(t *testing.T, cl client.Client, withCA bool) cluster.ToolchainClusterService {

Check failure on line 142 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint

`newToolchainClusterService` - `withCA` always receives `false` (unparam)
return cluster.NewToolchainClusterServiceWithClient(cl, logf.Log, "test-namespace", 3*time.Second, func(config *rest.Config, options client.Options) (client.Client, error) {
if withCA {
assert.False(t, config.Insecure)
assert.Equal(t, []byte("dummy"), config.CAData)
} else {
assert.True(t, config.Insecure)
}
// make sure that insecure is false to make Gock mocking working properly
config.Insecure = false
// reset the dummy certificate
config.CAData = []byte("")
return client.New(config, options)
})
}
func assertClusterStatus(t *testing.T, cl client.Client, clusterName string, clusterConds ...toolchainv1alpha1.ToolchainClusterCondition) {
tc := &toolchainv1alpha1.ToolchainCluster{}
err := cl.Get(context.TODO(), test.NamespacedName("test-namespace", clusterName), tc)
Expand All @@ -146,6 +173,19 @@ ExpConditions:
}
}

func prepareReconcile(toolchainCluster *toolchainv1alpha1.ToolchainCluster, cl *test.FakeClient, service cluster.ToolchainClusterService, requeAfter time.Duration) (Reconciler, reconcile.Request) {

Check failure on line 176 in controllers/toolchainclusterhealth/toolchaincluster_healthcheck_controller_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint

`prepareReconcile` - `requeAfter` always receives `10 * time.Second (10000000000)` (unparam)
controller := Reconciler{
client: cl,
scheme: scheme.Scheme,
clusterCacheService: service,
requeAfter: requeAfter,
}
req := reconcile.Request{
NamespacedName: test.NamespacedName(toolchainCluster.Namespace, toolchainCluster.Name),
}
return controller, req
}

func healthy() toolchainv1alpha1.ToolchainClusterCondition {
return toolchainv1alpha1.ToolchainClusterCondition{
Type: toolchainv1alpha1.ToolchainClusterReady,
Expand Down

0 comments on commit 4b00df6

Please sign in to comment.