Skip to content

Commit

Permalink
Merge pull request #3646 from spidernet-io/robot/cherrypick/pr3584/re…
Browse files Browse the repository at this point in the history
…lease-v0.9

coordinator: Use ARP to detect the gateway rather than ICMP
  • Loading branch information
cyclinder authored Jun 24, 2024
2 parents 704f6f3 + 460531c commit ec72a75
Show file tree
Hide file tree
Showing 27 changed files with 186 additions and 1,672 deletions.
4 changes: 2 additions & 2 deletions cmd/coordinator/cmd/cni_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
if config == nil {
return &DetectOptions{
Interval: "1s",
TimeOut: "1s",
TimeOut: "3s",
Retry: 3,
}, nil
}
Expand All @@ -275,7 +275,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
}

if config.TimeOut == "" {
config.TimeOut = "1s"
config.TimeOut = "3s"
}

_, err := time.ParseDuration(config.Interval)
Expand Down
26 changes: 17 additions & 9 deletions cmd/coordinator/cmd/command_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/containernetworking/cni/pkg/skel"
Expand Down Expand Up @@ -80,7 +81,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) {
)
logger.Info(fmt.Sprintf("start to implement ADD command in %v mode", conf.Mode))
logger.Debug(fmt.Sprintf("api configuration: %+v", *coordinatorConfig))
logger.Debug(fmt.Sprintf("final configuration: %+v", *conf))
logger.Debug("final configuration", zap.Any("conf", conf))

// parse prevResult
prevResult, err := current.GetResult(conf.PrevResult)
Expand Down Expand Up @@ -186,21 +187,28 @@ func CmdAdd(args *skel.CmdArgs) (err error) {
if conf.DetectGateway != nil && *conf.DetectGateway {
logger.Debug("Try to detect gateway")

var gws []string
var gws []net.IP
err = c.netns.Do(func(netNS ns.NetNS) error {
gws, err = networking.GetDefaultGatewayByName(c.currentInterface, c.ipFamily)
if err != nil {
logger.Error("failed to GetDefaultGatewayByName", zap.Error(err))
return fmt.Errorf("failed to GetDefaultGatewayByName: %v", err)
}
logger.Debug("Get GetDefaultGatewayByName", zap.Any("Gws", gws))

logger.Debug("Get GetDefaultGatewayByName", zap.Strings("Gws", gws))
p, err := gwconnection.New(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, c.currentInterface, logger)
if err != nil {
return fmt.Errorf("failed to init the gateway client: %v", err)
}
p.ParseAddrFromPreresult(prevResult.IPs)
for _, gw := range gws {
p, err := gwconnection.NewPinger(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, gw, logger)
if err != nil {
return fmt.Errorf("failed to run NewPinger: %v", err)
if gw.To4() != nil {
p.V4Gw = gw
errg.Go(c.hostNs, c.netns, p.ArpingOverIface)
} else {
p.V6Gw = gw
errg.Go(c.hostNs, c.netns, p.NDPingOverIface)
}
errg.Go(c.hostNs, c.netns, p.DetectGateway)
}
return nil
})
Expand All @@ -225,7 +233,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) {

if err = errg.Wait(); err != nil {
logger.Error("failed to detect gateway and ip checking", zap.Error(err))
if errors.Is(err, constant.ErrIPConflict) {
if errors.Is(err, constant.ErrIPConflict) || errors.Is(err, constant.ErrGatewayUnreachable) {
_, innerErr := client.Daemonset.DeleteIpamIps(daemonset.NewDeleteIpamIpsParams().WithContext(context.TODO()).WithIpamBatchDelArgs(
&models.IpamBatchDelArgs{
ContainerID: &args.ContainerID,
Expand All @@ -235,7 +243,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) {
PodUID: (*string)(&k8sArgs.K8S_POD_UID),
},
))
if nil != innerErr {
if innerErr != nil {
logger.Sugar().Errorf("failed to clean up conflict IPs, error: %v", innerErr)
return multierr.Append(err, innerErr)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ spec:
## 支持检测 Pod 的网关是否可达(alpha)

在 Underlay 网络下,Pod 访问外部需要通过网关转发。如果网关不可达,那么在外界看来,这个 Pod 实际是失联的。有时候我们希望创建 Pod 时,其网关是可达的。 我们可借助 `coordinator` 检测 Pod 的网关是否可达,
支持检测 IPv4 和 IPv6 的网关地址。我们通过发送 ICMP 报文,探测网关地址是否可达。如果网关不可达,将会阻止 Pod 创建:
支持检测 IPv4 和 IPv6 的网关地址。我们通过发送 ARP 探测报文,探测网关地址是否可达。如果网关不可达,将会阻止 Pod 创建:

我们可以通过 Spidermultusconfig 配置它:

Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ spec:
## Detect Pod gateway reachability(alpha)

Under the underlay network, pod access to the outside needs to be forwarded through the gateway. If the gateway is unreachable, then the pod is actually lost. Sometimes we want to create a pod with a gateway reachable. We can use the 'coordinator' to check if the pod's gateway is reachable.
Gateway addresses for IPv4 and IPv6 can be detected. We send an ICMP packet to check whether the gateway address is reachable. If the gateway is unreachable, pods will be prevented from creating:
Gateway addresses for IPv4 and IPv6 can be detected. We send an ARP probe packet to check whether the gateway address is reachable. If the gateway is unreachable, pods will be prevented from creating:

We can configure it via Spidermultusconfig:

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ require (
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.29.0
github.com/openkruise/kruise-api v1.3.0
github.com/prometheus-community/pro-bing v0.2.0
github.com/prometheus/client_golang v1.17.0
github.com/sasha-s/go-deadlock v0.3.1
github.com/spf13/cobra v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,6 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/projectcalico/api v0.0.0-20220722155641-439a754a988b h1:dW+UhJMzusDO6hqVGuCYeDxXWAzc7HnA9CsPN+uHPnA=
github.com/projectcalico/api v0.0.0-20220722155641-439a754a988b/go.mod h1:Avoy1rTN1GfeisnHGf3WhQNqR+BuGOcwfNFsdWX6OHE=
github.com/prometheus-community/pro-bing v0.2.0 h1:hyK7yPFndU3LCDwEQJwPQUCjNkp1DGP/VxyzrWfXZUU=
github.com/prometheus-community/pro-bing v0.2.0/go.mod h1:20arNb2S8rNG3EtmjHyZZU92cfbhQx7oCHZ9sulAV+I=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
ErrRetriesExhausted = errors.New("exhaust all retries")
ErrIPUsedOut = errors.New("all IP addresses used out")
ErrIPConflict = errors.New("ip conflict")
ErrGatewayUnreachable = errors.New("unreachable")
ErrForbidReleasingStatefulWorkload = errors.New("forbid releasing IPs for stateful workload ")
ErrForbidReleasingStatelessWorkload = errors.New("forbid releasing IPs for stateless workload")
)
Expand Down
180 changes: 159 additions & 21 deletions pkg/networking/gwconnection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,186 @@ package gwconnection

import (
"fmt"
"net"
"net/netip"
"time"

"go.uber.org/zap"

ping "github.com/prometheus-community/pro-bing"
types100 "github.com/containernetworking/cni/pkg/types/100"
"github.com/mdlayher/arp"
_ "github.com/mdlayher/ethernet"
"github.com/mdlayher/ndp"
"github.com/spidernet-io/spiderpool/pkg/constant"
)

type Pinger struct {
logger *zap.Logger
pinger *ping.Pinger
type DetectGateway struct {
retries int
iface string
interval time.Duration
timeout time.Duration
v4Addr, v6Addr, V4Gw, V6Gw net.IP
logger *zap.Logger
}

func NewPinger(count int, interval, timeout, gw string, logger *zap.Logger) (*Pinger, error) {
pinger := ping.New(gw)
pinger.Count = count
func New(retries int, interval, timeout, iface string, logger *zap.Logger) (*DetectGateway, error) {
var err error
dg := &DetectGateway{
retries: retries,
iface: iface,
}

intervalDuration, err := time.ParseDuration(interval)
dg.interval, err = time.ParseDuration(interval)
if err != nil {
return nil, err
}
pinger.Interval = intervalDuration

timeoutDuration, err := time.ParseDuration(timeout)
dg.timeout, err = time.ParseDuration(timeout)
if err != nil {
return nil, err
}
pinger.Timeout = timeoutDuration
pinger.SetPrivileged(true)
dg.logger = logger

return dg, nil
}

return &Pinger{logger, pinger}, nil
func (dg *DetectGateway) ParseAddrFromPreresult(ipconfigs []*types100.IPConfig) {
for _, ipconfig := range ipconfigs {
if ipconfig.Address.IP.To4() != nil {
dg.v4Addr = ipconfig.Address.IP
} else {
dg.v6Addr = ipconfig.Address.IP
}
}
}

func (p *Pinger) DetectGateway() error {
if err := p.pinger.Run(); err != nil {
return fmt.Errorf("failed to run DetectGateway: %v", err)
// PingOverIface sends an arp ping over interface 'iface' to 'dstIP'
func (dg *DetectGateway) ArpingOverIface() error {
ifi, err := net.InterfaceByName(dg.iface)
if err != nil {
return err
}

stats := p.pinger.Statistics()
if stats.PacketLoss > 0 {
return fmt.Errorf("gateway %s is unreachable", p.pinger.Addr())
client, err := arp.Dial(ifi)
if err != nil {
return err
}
defer client.Close()

gwNetIP := netip.MustParseAddr(dg.V4Gw.String())
var gwHwAddr net.HardwareAddr
for i := 0; i < dg.retries; i++ {

err = client.SetReadDeadline(time.Now().Add(dg.timeout))
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err)
time.Sleep(dg.interval)
continue
}

dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err)
time.Sleep(dg.interval)
continue
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
}
time.Sleep(dg.interval)
}

if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), err)
return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable)
}

return fmt.Errorf("failed to checking the gateway %s if is reachable: %w", dg.V4Gw.String(), err)
}

func (dg *DetectGateway) NDPingOverIface() error {
ifi, err := net.InterfaceByName(dg.iface)
if err != nil {
return err
}

client, _, err := ndp.Listen(ifi, ndp.LinkLocal)
if err != nil {
return err
}
defer client.Close()

p.logger.Sugar().Debugf("gateway %s is reachable", p.pinger.Addr())
return nil
msg := &ndp.NeighborSolicitation{
TargetAddress: netip.MustParseAddr(dg.V6Gw.String()),
Options: []ndp.Option{
&ndp.LinkLayerAddress{
Direction: ndp.Source,
Addr: ifi.HardwareAddr,
},
},
}

ticker := time.NewTicker(dg.interval)
defer ticker.Stop()

var gwHwAddr string
for i := 0; i < dg.retries && gwHwAddr == ""; i++ {
<-ticker.C
gwHwAddr, err = dg.sendReceive(client, msg)
if err != nil {
dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err)
} else if gwHwAddr != "" {
dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr)
return nil
}
}

if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
dg.logger.Sugar().Errorf("gateway %s is unreachable, reason: %v", dg.V6Gw.String(), err)
return fmt.Errorf("gateway %s is %w", dg.V6Gw.String(), constant.ErrGatewayUnreachable)
}
return fmt.Errorf("error detect the gateway %s if is reachable: %v", dg.V6Gw.String(), err)
}

func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, error) {
gwNetIP := netip.MustParseAddr(dg.V6Gw.String())
// Always multicast the message to the target's solicited-node multicast
// group as if we have no knowledge of its MAC address.
snm, err := ndp.SolicitedNodeMulticast(gwNetIP)
if err != nil {
dg.logger.Error("[NDP]failed to determine solicited-node multicast address", zap.Error(err))
return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err)
}

// we send a gratuitous neighbor solicitation to checking if ip is conflict
err = client.WriteTo(m, nil, snm)
if err != nil {
dg.logger.Error("[NDP]failed to send message", zap.Error(err))
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

msg, _, _, err := client.ReadFrom()
if err != nil {
return "", err
}

gwAddr := netip.MustParseAddr(dg.V6Gw.String())
na, ok := msg.(*ndp.NeighborAdvertisement)
if ok && na.TargetAddress.Compare(gwAddr) == 0 && len(na.Options) == 1 {
dg.logger.Debug("Detect gateway: found the response", zap.String("TargetAddress", na.TargetAddress.String()))
// found ndp reply what we want
option, ok := na.Options[0].(*ndp.LinkLayerAddress)
if ok {
return option.Addr.String(), nil
}
}
return "", nil
}
10 changes: 5 additions & 5 deletions pkg/networking/networking/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func GetRoutesByName(iface string, ipfamily int) (routes []netlink.Route, err er
return netlink.RouteList(link, ipfamily)
}

func GetDefaultGatewayByName(iface string, ipfamily int) ([]string, error) {
func GetDefaultGatewayByName(iface string, ipfamily int) ([]net.IP, error) {
routes, err := GetRoutesByName("", ipfamily)
if err != nil {
return nil, err
Expand All @@ -42,17 +42,17 @@ func GetDefaultGatewayByName(iface string, ipfamily int) ([]string, error) {
return nil, err
}

gws := make([]string, 0)
gws := make([]net.IP, 0)
for _, route := range routes {
if route.LinkIndex == link.Attrs().Index {
if route.Dst == nil || route.Dst.IP.Equal(net.IPv4zero) {
gws = append(gws, route.Gw.String())
if route.Dst == nil || route.Dst.IP.Equal(net.IPv4zero) || route.Dst.IP.Equal(net.IPv6zero) {
gws = append(gws, route.Gw)
}
} else {
if len(route.MultiPath) > 0 {
for _, r := range route.MultiPath {
if r.LinkIndex == link.Attrs().Index {
gws = append(gws, r.Gw.String())
gws = append(gws, r.Gw)
break
}
}
Expand Down
16 changes: 0 additions & 16 deletions vendor/github.com/prometheus-community/pro-bing/.editorconfig

This file was deleted.

2 changes: 0 additions & 2 deletions vendor/github.com/prometheus-community/pro-bing/.gitignore

This file was deleted.

Loading

0 comments on commit ec72a75

Please sign in to comment.