Skip to content

Commit

Permalink
Use service IP as IP of observer to ensure more robust recovery (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI authored Mar 5, 2024
1 parent 41bafb6 commit 1e8af13
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 49 deletions.
12 changes: 10 additions & 2 deletions api/types/observer_replica_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ See the Mulan PSL v2 for more details.
package types

type OBServerReplicaStatus struct {
Server string `json:"server"`
Status string `json:"status"`
Server string `json:"server"` // PodIP
Status string `json:"status"`
ServiceIP string `json:"serviceIP,omitempty"`
}

func (srs OBServerReplicaStatus) GetConnectAddr() string {
if srs.ServiceIP != "" {
return srs.ServiceIP
}
return srs.Server
}
14 changes: 11 additions & 3 deletions api/v1alpha1/observer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package v1alpha1

import (
apitypes "github.com/oceanbase/ob-operator/api/types"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apitypes "github.com/oceanbase/ob-operator/api/types"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -55,6 +55,7 @@ type OBServerStatus struct {
PodPhase corev1.PodPhase `json:"podPhase"`
Ready bool `json:"ready"`
PodIp string `json:"podIp"`
ServiceIp string `json:"serviceIp,omitempty"`
NodeIp string `json:"nodeIp"`
OBStatus string `json:"obStatus,omitempty"`
StartServiceTime int64 `json:"startServiceTime,omitempty"`
Expand Down Expand Up @@ -91,3 +92,10 @@ type OBServerList struct {
func init() {
SchemeBuilder.Register(&OBServer{}, &OBServerList{})
}

func (ss OBServerStatus) GetConnectAddr() string {
if ss.ServiceIp != "" {
return ss.ServiceIp
}
return ss.PodIp
}
4 changes: 4 additions & 0 deletions charts/ob-operator/templates/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5731,6 +5731,8 @@ spec:
type: string
ready:
type: boolean
serviceIp:
type: string
startServiceTime:
format: int64
type: integer
Expand Down Expand Up @@ -11882,6 +11884,8 @@ spec:
properties:
server:
type: string
serviceIP:
type: string
status:
type: string
required:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/oceanbase.oceanbase.com_observers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2693,6 +2693,8 @@ spec:
type: string
ready:
type: boolean
serviceIp:
type: string
startServiceTime:
format: int64
type: integer
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/oceanbase.oceanbase.com_obzones.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2685,6 +2685,8 @@ spec:
properties:
server:
type: string
serviceIP:
type: string
status:
type: string
required:
Expand Down
4 changes: 4 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5744,6 +5744,8 @@ spec:
type: string
ready:
type: boolean
serviceIp:
type: string
startServiceTime:
format: int64
type: integer
Expand Down Expand Up @@ -11895,6 +11897,8 @@ spec:
properties:
server:
type: string
serviceIP:
type: string
status:
type: string
required:
Expand Down
5 changes: 4 additions & 1 deletion distribution/oceanbase/oceanbase-helper/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ func startOBServerWithParam() error {
return errors.Wrap(err, "Failed to parse current version")
}
var cmd string
svcIP := os.Getenv("SVC_IP")
if standalone != "" && obv.Cmp(MinStandaloneVersion) >= 0 {
cmd = fmt.Sprintf("cd %s && %s/bin/observer --nodaemon --appname %s --cluster_id %s --zone %s --devname lo -p %d -P %d -d %s/store -l info -o config_additional_dir=%s/store/etc,%s", DefaultHomePath, DefaultHomePath, clusterName, clusterId, zoneName, DefaultSqlPort, DefaultRpcPort, DefaultHomePath, DefaultHomePath, optStr)
} else if svcIP != "" {
cmd = fmt.Sprintf("cd %s && %s/bin/observer --nodaemon --appname %s --cluster_id %s --zone %s -I %s -p %d -P %d -d %s/store -l info -o config_additional_dir=%s/store/etc,%s", DefaultHomePath, DefaultHomePath, clusterName, clusterId, zoneName, svcIP, DefaultSqlPort, DefaultRpcPort, DefaultHomePath, DefaultHomePath, optStr)
} else {
cmd = fmt.Sprintf("cd %s && %s/bin/observer --nodaemon --appname %s --cluster_id %s --zone %s --devname %s -p %d -P %d -d %s/store -l info -o config_additional_dir=%s/store/etc,%s", DefaultHomePath, DefaultHomePath, clusterName, clusterId, zoneName, DefaultDevName, DefaultSqlPort, DefaultRpcPort, DefaultHomePath, DefaultHomePath, optStr)
cmd = fmt.Sprintf("cd %s && %s/bin/observer --nodaemon --appname %s --cluster_id %s --zone %s -i %s -p %d -P %d -d %s/store -l info -o config_additional_dir=%s/store/etc,%s", DefaultHomePath, DefaultHomePath, clusterName, clusterId, zoneName, DefaultDevName, DefaultSqlPort, DefaultRpcPort, DefaultHomePath, DefaultHomePath, optStr)
}
fmt.Println("Start commands: ", cmd)
return exec.Command("bash", "-c", cmd).Run()
Expand Down
19 changes: 10 additions & 9 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ import (
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

apitypes "github.com/oceanbase/ob-operator/api/types"
v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
obagentconst "github.com/oceanbase/ob-operator/internal/const/obagent"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
zonestatus "github.com/oceanbase/ob-operator/internal/const/status/obzone"

apitypes "github.com/oceanbase/ob-operator/api/types"
v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/model"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
Expand Down Expand Up @@ -308,10 +307,11 @@ func (m *OBClusterManager) Bootstrap() tasktypes.TaskError {
} else {
connectAddress := manager.Connector.DataSource().GetAddress()
for _, zone := range obzoneList.Items {
serverIp := zone.Status.OBServerStatus[0].Server
serverIp := zone.Status.OBServerStatus[0].GetConnectAddr()
// Notes: If the addr of the db connector is in this obzone, use it as the bootstrap server instead of the first one
for _, serverInfo := range zone.Status.OBServerStatus {
if serverInfo.Server == connectAddress {
serverIp = connectAddress
if serverInfo.Server == connectAddress || serverInfo.ServiceIP == connectAddress {
serverIp = serverInfo.GetConnectAddr()
}
}
serverInfo := &model.ServerInfo{
Expand Down Expand Up @@ -1007,9 +1007,10 @@ func (m *OBClusterManager) CheckImageReady() tasktypes.TaskError {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "helper-check-image-pull-ready",
Image: m.OBCluster.Spec.OBServerTemplate.Image,
Command: []string{"bash", "-c", "/home/admin/oceanbase/bin/oceanbase-helper help"},
Name: "helper-check-image-pull-ready",
ImagePullPolicy: corev1.PullIfNotPresent,
Image: m.OBCluster.Spec.OBServerTemplate.Image,
Command: []string{"bash", "-c", "/home/admin/oceanbase/bin/oceanbase-helper help"},
}},
RestartPolicy: corev1.RestartPolicyNever,
},
Expand Down
1 change: 1 addition & 0 deletions internal/resource/observer/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
// observer tasks
const (
tWaitOBClusterBootstrapped ttypes.TaskName = "wait obcluster bootstrapped"
tCreateOBServerSvc ttypes.TaskName = "create observer svc"
tCreateOBPVC ttypes.TaskName = "create observer pvc"
tCreateOBPod ttypes.TaskName = "create observer pod"
tAnnotateOBServerPod ttypes.TaskName = "annotate observer pod"
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/observer/observer_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func PrepareOBServerForBootstrap() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fPrepareOBServerForBootstrap,
Tasks: []tasktypes.TaskName{tCreateOBPVC, tCreateOBPod, tWaitOBServerReady},
Tasks: []tasktypes.TaskName{tCreateOBServerSvc, tCreateOBPVC, tCreateOBPod, tWaitOBServerReady},
TargetStatus: serverstatus.BootstrapReady,
},
}
Expand Down
46 changes: 33 additions & 13 deletions internal/resource/observer/observer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ import (
"context"
"strings"

"github.com/oceanbase/ob-operator/internal/telemetry"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/model"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -29,17 +26,17 @@ import (
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"sigs.k8s.io/controller-runtime/pkg/client"

oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"

"github.com/go-logr/logr"
"github.com/pkg/errors"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
serverstatus "github.com/oceanbase/ob-operator/internal/const/status/observer"
resourceutils "github.com/oceanbase/ob-operator/internal/resource/utils"
"github.com/oceanbase/ob-operator/internal/telemetry"
opresource "github.com/oceanbase/ob-operator/pkg/coordinator"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/model"
"github.com/oceanbase/ob-operator/pkg/task"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/status"
"github.com/oceanbase/ob-operator/pkg/task/const/strategy"
tasktypes "github.com/oceanbase/ob-operator/pkg/task/types"
)

Expand All @@ -54,6 +51,8 @@ type OBServerManager struct {

func (m *OBServerManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tCreateOBServerSvc:
return m.CreateOBServerSvc, nil
case tCreateOBPVC:
return m.CreateOBPVC, nil
case tCreateOBPod:
Expand Down Expand Up @@ -115,7 +114,7 @@ func (m *OBServerManager) SupportStaticIp() bool {
case oceanbaseconst.CNICalico:
return true
default:
return false
return m.OBServer.Status.ServiceIp != ""
}
}

Expand All @@ -126,7 +125,7 @@ func (m *OBServerManager) getCurrentOBServerFromOB() (*model.OBServer, error) {
return nil, err
}
observerInfo := &model.ServerInfo{
Ip: m.OBServer.Status.PodIp,
Ip: m.OBServer.Status.GetConnectAddr(),
Port: oceanbaseconst.RpcPort,
}
mode, modeExist := resourceutils.GetAnnotationField(m.OBServer, oceanbaseconst.AnnotationsMode)
Expand Down Expand Up @@ -169,7 +168,6 @@ func (m *OBServerManager) UpdateStatus() error {
} else if m.OBServer.Status.Status == "Failed" {
return nil
} else {
// get Pod status and update
pod, err := m.getPod()
if err != nil {
if kubeerrors.IsNotFound(err) {
Expand All @@ -188,6 +186,19 @@ func (m *OBServerManager) UpdateStatus() error {
m.OBServer.Status.NodeIp = pod.Status.HostIP
// TODO update from obcluster
m.OBServer.Status.CNI = resourceutils.GetCNIFromAnnotation(pod)

if m.OBServer.Status.ServiceIp == "" {
mode, modeAnnoExist := resourceutils.GetAnnotationField(m.OBServer, oceanbaseconst.AnnotationsMode)
if modeAnnoExist && mode == oceanbaseconst.ModeService {
svc := &corev1.Service{}
err := m.Client.Get(m.Ctx, m.generateNamespacedName(m.OBServer.Name), svc)
if err != nil {
m.Logger.V(oceanbaseconst.LogLevelDebug).Info("get svc failed")
} else {
m.OBServer.Status.ServiceIp = svc.Spec.ClusterIP
}
}
}
}
pvcs, err := m.getPVCs()
if err != nil {
Expand Down Expand Up @@ -400,6 +411,15 @@ func (m *OBServerManager) getPod() (*corev1.Pod, error) {
return pod, nil
}

func (m *OBServerManager) getSvc() (*corev1.Service, error) {
svc := &corev1.Service{}
err := m.Client.Get(m.Ctx, m.generateNamespacedName(m.OBServer.Name), svc)
if err != nil {
return nil, errors.Wrap(err, "get svc")
}
return svc, nil
}

func (m *OBServerManager) getOBCluster() (*v1alpha1.OBCluster, error) {
// this label always exists
clusterName, _ := m.OBServer.Labels[oceanbaseconst.LabelRefOBCluster]
Expand Down
Loading

0 comments on commit 1e8af13

Please sign in to comment.