Skip to content

Commit

Permalink
feat: update k8s discovery. (#171)
Browse files Browse the repository at this point in the history
* build: rename k.go

* fix: add error handle.

* feat: implement kubernetes Discovery.

* fix: revert getConn.

* update k8s discovery.
  • Loading branch information
mo3et authored Dec 7, 2024
1 parent e4a3bae commit 1fe4211
Showing 1 changed file with 116 additions and 59 deletions.
175 changes: 116 additions & 59 deletions discovery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubernetes

import (
"context"
"fmt"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type KubernetesConnManager struct {
Expand All @@ -49,74 +39,94 @@ func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*Ku
return nil, fmt.Errorf("failed to create clientset: %v", err)
}

return &KubernetesConnManager{
k := &KubernetesConnManager{
clientset: clientset,
namespace: namespace,
dialOptions: options,
connMap: make(map[string][]*grpc.ClientConn),
}, nil
}
}

func (k *KubernetesConnManager) initializeConnMap() error {
k.mu.Lock()
defer k.mu.Unlock()
go k.watchEndpoints()

services, err := k.clientset.CoreV1().Services(k.namespace).List(context.Background(), metav1.ListOptions{})
return k, nil
}

func (k *KubernetesConnManager) initializeConns(serviceName string) error {
port, err := k.getServicePort(serviceName)
if err != nil {
return fmt.Errorf("failed to list services: %v", err)
return err
}

for _, service := range services.Items {
endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(context.Background(), service.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get endpoints for service %s: %v", service.Name, err)
}
endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err)
}

var conns []*grpc.ClientConn
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
conn, err := grpc.Dial(address.IP, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return fmt.Errorf("failed to dial endpoint %s: %v", address.IP, err)
}
conns = append(conns, conn)
var conns []*grpc.ClientConn
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to dial endpoint %s: %v", target, err)
}
conns = append(conns, conn)
}
k.connMap[service.Name] = conns
}

k.mu.Lock()
defer k.mu.Unlock()
k.connMap[serviceName] = conns

// go k.watchEndpoints(serviceName)

return nil
}

// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
// k.mu.RLock()
// defer k.mu.RUnlock()
// if len(k.connMap) == 0 {
// if err := k.initializeConnMap(); err != nil {
// return nil, err
// }
// }

// return k.connMap[serviceName], nil
return nil, nil
k.mu.RLock()
conns, exists := k.connMap[serviceName]
defer k.mu.RUnlock()

if exists {
return conns, nil
}

k.mu.Lock()
defer k.mu.Unlock()

// Check if another goroutine has already initialized the connections when we released the read lock
conns, exists = k.connMap[serviceName]
if exists {
return conns, nil
}

if err := k.initializeConns(serviceName); err != nil {
return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err)
}

return k.connMap[serviceName], nil
}

// GetConn returns a single gRPC client connection for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// k.mu.RLock()
// if len(k.connMap) == 0 {
// k.mu.RUnlock()
// if err := k.initializeConnMap(); err != nil {
// return nil, err
// }
// k.mu.RLock()
// }

// defer k.mu.RUnlock()

// return k.connMap[serviceName][0], nil
return grpc.DialContext(ctx, serviceName, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
port, err := k.getServicePort(serviceName)
if err != nil {
return nil, err
}

fmt.Println("SVC port:", port)

target := fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, port)

fmt.Println("SVC target:", target)

return grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, k.dialOptions...)...,
)
}

// GetSelfConnTarget returns the connection target for the current service.
Expand Down Expand Up @@ -158,3 +168,50 @@ func (k *KubernetesConnManager) UnRegister() error {
func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}

func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
fmt.Print("namespace:", k.namespace)
return 0, fmt.Errorf("failed to get service %s: %v", serviceName, err)
}

if len(svc.Spec.Ports) == 0 {
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}

return svc.Spec.Ports[0].Port, nil
}

// watchEndpoints listens for changes in Pod resources.
func (k *KubernetesConnManager) watchEndpoints() {
informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute*10)
informer := informerFactory.Core().V1().Pods().Informer()

// Watch for Pod changes (add, update, delete)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
k.handleEndpointChange(newObj)
},
DeleteFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
})

informerFactory.Start(context.Background().Done())
<-context.Background().Done() // Block forever
}

func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
endpoint, ok := obj.(*v1.Endpoints)
if !ok {
return
}
serviceName := endpoint.Name
if err := k.initializeConns(serviceName); err != nil {
fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err)
}
}

0 comments on commit 1fe4211

Please sign in to comment.