Skip to content

Commit

Permalink
Merge pull request #22 from CentaurusInfra/poc-migrate
Browse files Browse the repository at this point in the history
poc migration
  • Loading branch information
chenqianfzh authored Jul 31, 2021
2 parents f8291a4 + 831db1d commit a126b0d
Show file tree
Hide file tree
Showing 56 changed files with 2,740 additions and 120 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ all: clean
else
all: verify-golang
KUBEEDGE_OUTPUT_SUBPATH=$(OUT_DIR) hack/make-rules/build.sh $(WHAT)
yes | cp -r binaries/kubectl $(OUT_DIR)/bin/
yes | cp build/crds/edgecluster/*.yaml $(OUT_DIR)/bin/
endif


Expand Down
Binary file added binaries/kubectl/arktos/kubectl
Binary file not shown.
Binary file added binaries/kubectl/vanilla/kubectl
Binary file not shown.
2 changes: 2 additions & 0 deletions cloud/cmd/cloudcore/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/dynamiccontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/edgecontroller"
"github.com/kubeedge/kubeedge/cloud/pkg/missionstatepruner"
"github.com/kubeedge/kubeedge/cloud/pkg/router"
"github.com/kubeedge/kubeedge/cloud/pkg/synccontroller"
"github.com/kubeedge/kubeedge/common/constants"
Expand Down Expand Up @@ -126,6 +127,7 @@ func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudstream.Register(c.Modules.CloudStream)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
missionstatepruner.Register(c.Modules.MissionStatePruner)
}

func NegotiateTunnelPort() (*int, error) {
Expand Down
1 change: 1 addition & 0 deletions cloud/pkg/cloudhub/channelq/channelq.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func getListMsgKey(obj interface{}) (string, error) {
func isListResource(msg *beehiveModel.Message) bool {
msgResource := msg.GetResource()
if strings.Contains(msgResource, beehiveModel.ResourceTypePodlist) ||
strings.Contains(msgResource, beehiveModel.ResourceTypeMissionList) ||
strings.Contains(msgResource, "membership") ||
strings.Contains(msgResource, "twin/cloud_updated") {
return true
Expand Down
3 changes: 3 additions & 0 deletions cloud/pkg/common/modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ const (
TunnelPort string = "tunnelport"

TunnelPortRecordAnnotationKey string = "tunnelportrecord.kubeedge.io"

MissionStatePrunerModuleName = "missionstatepruner"
MissionStatePrunerGroupName = "missionstatepruner"
)
4 changes: 2 additions & 2 deletions cloud/pkg/dynamiccontroller/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,10 @@ func (c *Center) Response(app *Application, parentID string, status applicationS
FillBody(app)

if err := c.messageLayer.Response(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
klog.Warningf("Failed to send message, error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
return
}
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
klog.V(4).Infof("Message sent successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}

func (c *Center) GC() {
Expand Down
4 changes: 2 additions & 2 deletions cloud/pkg/dynamiccontroller/application/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (l *SelectorListener) sendObj(event watch.Event, messageLayer messagelayer.
FillBody(event.Object)

if err := messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
klog.Warningf("Failed to send message, error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
klog.V(4).Infof("Message sent successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}
}
204 changes: 172 additions & 32 deletions cloud/pkg/edgecontroller/controller/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"reflect"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -14,8 +15,11 @@ import (

beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/beehive/pkg/core/model"
edgeclustersv1 "github.com/kubeedge/kubeedge/cloud/pkg/apis/edgeclusters/v1"
routerv1 "github.com/kubeedge/kubeedge/cloud/pkg/apis/rules/v1"
crdClientset "github.com/kubeedge/kubeedge/cloud/pkg/client/clientset/versioned"
crdinformers "github.com/kubeedge/kubeedge/cloud/pkg/client/informers/externalversions"
crdlister "github.com/kubeedge/kubeedge/cloud/pkg/client/listers/edgeclusters/v1"
"github.com/kubeedge/kubeedge/cloud/pkg/common/client"
"github.com/kubeedge/kubeedge/cloud/pkg/common/informers"
"github.com/kubeedge/kubeedge/cloud/pkg/common/modules"
Expand All @@ -28,6 +32,8 @@ import (
type DownstreamController struct {
kubeClient kubernetes.Interface

crdClient crdClientset.Interface

messageLayer messagelayer.MessageLayer

podManager *manager.PodManager
Expand All @@ -46,11 +52,17 @@ type DownstreamController struct {

ruleEndpointsManager *manager.RuleEndpointManager

missionsManager *manager.MissionManager

edgeClusterManager *manager.EdgeClusterManager

lc *manager.LocationCache

svcLister clientgov1.ServiceLister

podLister clientgov1.PodLister

missionLister crdlister.MissionLister
}

func (dc *DownstreamController) syncPod() {
Expand Down Expand Up @@ -89,11 +101,8 @@ func (dc *DownstreamController) syncPod() {
klog.Warningf("pod event type: %s unsupported", e.Type)
continue
}
if err := dc.messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}

dc.SendMessage(msg)
}
}
}
Expand Down Expand Up @@ -139,12 +148,8 @@ func (dc *DownstreamController) syncConfigMap() {
SetResourceVersion(configMap.ResourceVersion).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, operation).
FillBody(configMap)
err = dc.messageLayer.Send(*msg)
if err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}

dc.SendMessage(msg)
}
}
}
Expand Down Expand Up @@ -192,12 +197,8 @@ func (dc *DownstreamController) syncSecret() {
SetResourceVersion(secret.ResourceVersion).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, operation).
FillBody(secret)
err = dc.messageLayer.Send(*msg)
if err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}

dc.SendMessage(msg)
}
}
}
Expand Down Expand Up @@ -241,12 +242,7 @@ func (dc *DownstreamController) syncEdgeNodes() {
}
msg := model.NewMessage("").
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
err = dc.messageLayer.Send(*msg)
if err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}
dc.SendMessage(msg)
default:
// unsupported operation, no need to send to any node
klog.Warningf("Node event type: %s unsupported", e.Type)
Expand Down Expand Up @@ -290,11 +286,7 @@ func (dc *DownstreamController) syncRule() {
klog.Warningf("rule event type: %s unsupported", e.Type)
continue
}
if err := dc.messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s. Reason: %v", err, msg.GetOperation(), msg.GetResource(), err)
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}
dc.SendMessage(msg)
}
}
}
Expand Down Expand Up @@ -334,10 +326,116 @@ func (dc *DownstreamController) syncRuleEndpoint() {
klog.Warningf("ruleEndpoint event type: %s unsupported", e.Type)
continue
}
if err := dc.messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())

dc.SendMessage(msg)
}
}
}

func (dc *DownstreamController) syncMissions() {
var operation string
for {
select {
case <-beehiveContext.Done():
klog.Warning("Stop edgecontroller downstream syncMission loop")
return
case e := <-dc.missionsManager.Events():
klog.V(4).Infof("Get mission events: event type: %s.", e.Type)
mission, ok := e.Object.(*edgeclustersv1.Mission)
if !ok {
klog.Warningf("object type: %T unsupported", mission)
continue
}
klog.V(4).Infof("Get mission events: mission object: %+v.", mission)
switch e.Type {
case watch.Added:
operation = model.InsertOperation
case watch.Modified:
operation = model.UpdateOperation
case watch.Deleted:
operation = model.DeleteOperation
default:
// unsupported operation, no need to send to any node
klog.Warningf("Mission event type: %s unsupported", e.Type)
continue
}

// send to all nodes
dc.lc.EdgeClusters.Range(func(key interface{}, value interface{}) bool {
clusterName, ok := key.(string)
if !ok {
klog.Warning("Failed to assert key to sting")
return true
}
msg := model.NewMessage("")
msg.SetResourceVersion(mission.ResourceVersion)
resource, err := messagelayer.BuildResource(clusterName, "default", model.ResourceTypeMission, mission.Name)
if err != nil {
klog.Warningf("Built message resource failed with error: %v", err)
return true
}
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, operation)
msg.Content = mission

dc.SendMessage(msg)
return true
})
}
}
}

func (dc *DownstreamController) syncEdgeClusters() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Stop edgecontroller downstream syncEdgeCluster loop")
return
case e := <-dc.edgeClusterManager.Events():
klog.V(4).Infof("Get edgeCluster events: event type: %s.", e.Type)
edgeCluster, ok := e.Object.(*edgeclustersv1.EdgeCluster)
if !ok {
klog.Warningf("object type: %T unsupported", edgeCluster)
continue
}
klog.V(4).Infof("Get edgeCluster events: edgeCluster object: %+v.", edgeCluster)
switch e.Type {
case watch.Added:
fallthrough
case watch.Modified:
missionsInEdge := edgeCluster.Status.ReceivedMissions
missionsInEdgeSet := map[string]bool{}
for _, m := range missionsInEdge {
missionsInEdgeSet[m] = true
}

missionsInCloudSet := map[string]bool{}
missionList, err := dc.missionLister.List(labels.Everything())
if err != nil {
klog.Warningf("Built message resource failed with error: %s", err)
break
}
for _, m := range missionList {
missionsInCloudSet[m.Name] = true
}

if reflect.DeepEqual(missionsInEdgeSet, missionsInCloudSet) {
break
}

msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(edgeCluster.Name, "default", model.ResourceTypeMissionList, "")
msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
msg.Content = missionList

dc.SendMessage(msg)

case watch.Deleted:
dc.lc.DeleteEdgeCluster(edgeCluster.ObjectMeta.Name)

default:
// unsupported operation, no need to send to any node
klog.Warningf("EdgeCluster event type: %s unsupported", e.Type)
continue
}
}
}
Expand All @@ -364,6 +462,12 @@ func (dc *DownstreamController) Start() error {
// ruleendpoint
go dc.syncRuleEndpoint()

// mission
go dc.syncMissions()

// edgecluster
go dc.syncEdgeClusters()

return nil
}

Expand Down Expand Up @@ -396,6 +500,16 @@ func (dc *DownstreamController) initLocating() error {
}
}

edgeclusters, err := dc.crdClient.EdgeclustersV1().EdgeClusters().List(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}

for _, ec := range edgeclusters.Items {
// add logic to get edgecluster status
dc.lc.UpdateEdgeCluster(ec.ObjectMeta.Name, true)
}

return nil
}

Expand Down Expand Up @@ -459,8 +573,23 @@ func NewDownstreamController(k8sInformerFactory k8sinformers.SharedInformerFacto
return nil, err
}

missionsInformer := crdInformerFactory.Edgeclusters().V1().Missions()
missionsManager, err := manager.NewMissionManager(missionsInformer.Informer())
if err != nil {
klog.Warningf("Create missionsManager failed with error: %s", err)
return nil, err
}

edgeClustersInformer := crdInformerFactory.Edgeclusters().V1().EdgeClusters()
edgeClusterManager, err := manager.NewEdgeClusterManager(edgeClustersInformer.Informer())
if err != nil {
klog.Warningf("Create edgeClusterManager failed with error: %s", err)
return nil, err
}

dc := &DownstreamController{
kubeClient: client.GetKubeClient(),
crdClient: client.GetCRDClient(),
podManager: podManager,
configmapManager: configMapManager,
secretManager: secretManager,
Expand All @@ -473,10 +602,21 @@ func NewDownstreamController(k8sInformerFactory k8sinformers.SharedInformerFacto
podLister: podInformer.Lister(),
rulesManager: rulesManager,
ruleEndpointsManager: ruleEndpointsManager,
missionsManager: missionsManager,
edgeClusterManager: edgeClusterManager,
missionLister: missionsInformer.Lister(),
}
if err := dc.initLocating(); err != nil {
return nil, err
}

return dc, nil
}

func (dc *DownstreamController) SendMessage(msg *model.Message) {
if err := dc.messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.V(4).Infof("message sent successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}
}
Loading

0 comments on commit a126b0d

Please sign in to comment.