Skip to content

Commit

Permalink
use metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Dec 31, 2024
1 parent fca05c0 commit 2b6002c
Show file tree
Hide file tree
Showing 8 changed files with 1,100 additions and 28 deletions.
156 changes: 129 additions & 27 deletions components/fuse/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,179 @@ package fuse

import (
"context"
"encoding/json"
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/leptonai/gpud/components"
fuse_id "github.com/leptonai/gpud/components/fuse/id"
"github.com/leptonai/gpud/components/fuse/metrics"
"github.com/leptonai/gpud/components/fuse/state"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/fuse"

"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New(ctx context.Context) components.Component {
func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()
setDefaultPoller(cfg)

cctx, ccancel := context.WithCancel(ctx)
getDefaultPoller().Start(cctx, cfg.Query, fuse_id.Name)

return &component{
ctx: ctx,
cfg: cfg,
rootCtx: ctx,
cancel: ccancel,
poller: getDefaultPoller(),
db: cfg.Query.State.DB,
}
}

var _ components.Component = (*component)(nil)

type component struct {
ctx context.Context
cfg Config
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
gatherer prometheus.Gatherer
db *sql.DB
}

func (c *component) Name() string { return fuse_id.Name }

const (
StateNameFUSEConnectionInfos = "fuse_connection_infos"

StateKeyData = "data"
StateKeyEncoding = "encoding"
StateValueEncodingJSON = "json"
)

func (c *component) States(ctx context.Context) ([]components.State, error) {
infos, err := fuse.ListConnections()
last, err := c.poller.Last()
if err == query.ErrNoData { // no data
log.Logger.Debugw("nothing found in last state (no data collected yet)", "component", fuse_id.Name)
return []components.State{
{
Name: fuse_id.Name,
Healthy: true,
Reason: query.ErrNoData.Error(),
},
}, nil
}
if err != nil {
return nil, err
}

jsonData, err := json.Marshal(infos)
if err != nil {
return nil, err
if last.Error != nil {
return []components.State{
{
Name: fuse_id.Name,
Healthy: false,
Error: last.Error.Error(),
Reason: "last query failed",
},
}, nil
}

return []components.State{
{
Name: StateNameFUSEConnectionInfos,
Name: fuse_id.Name,
Healthy: true,
Reason: fmt.Sprintf("found %d FUSE connections", len(infos)),
ExtraInfo: map[string]string{
StateKeyData: string(jsonData),
StateKeyEncoding: StateValueEncodingJSON,
},
},
}, nil
}

const (
EventNameFuseConnections = "fuse_connections"

EventKeyUnixSeconds = "unix_seconds"
EventKeyData = "data"
EventKeyEncoding = "encoding"
EventValueEncodingJSON = "json"
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
// TODO: poll fuse connection tracker
// TODO: emit events if it's exceeding thresholds
events, err := state.ReadEvents(ctx, c.db, state.WithSince(since))
if err != nil {
return nil, err
}

return nil, nil
if len(events) == 0 {
log.Logger.Debugw("no event found", "component", c.Name(), "since", humanize.Time(since))
return nil, nil
}

log.Logger.Debugw("found events", "component", c.Name(), "since", humanize.Time(since), "count", len(events))
convertedEvents := make([]components.Event, 0, len(events))
for _, event := range events {
msgs := []string{}
if event.CongestedPercentAgainstThreshold > c.cfg.CongestedPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("congested percent against threshold %.2f exceeds threshold %.2f", event.CongestedPercentAgainstThreshold, c.cfg.CongestedPercentAgainstThreshold))
}
if event.MaxBackgroundPercentAgainstThreshold > c.cfg.MaxBackgroundPercentAgainstThreshold {
msgs = append(msgs, fmt.Sprintf("max background percent against threshold %.2f exceeds threshold %.2f", event.MaxBackgroundPercentAgainstThreshold, c.cfg.MaxBackgroundPercentAgainstThreshold))
}
if len(msgs) == 0 {
continue
}

eb, err := event.JSON()
if err != nil {
continue
}

convertedEvents = append(convertedEvents, components.Event{
Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0).UTC()},
Name: EventNameFuseConnections,
Type: components.EventTypeCritical,
Message: strings.Join(msgs, ", "),
ExtraInfo: map[string]string{
EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10),
EventKeyData: string(eb),
EventKeyEncoding: EventValueEncodingJSON,
},
})
}
if len(convertedEvents) == 0 {
return nil, nil
}
return convertedEvents, nil
}

func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) {
log.Logger.Debugw("querying metrics", "since", since)

return nil, nil
congestedPercents, err := metrics.ReadConnectionsCongestedPercents(ctx, since)
if err != nil {
return nil, fmt.Errorf("failed to read congested percents: %w", err)
}
maxBackgroundPercents, err := metrics.ReadConnectionsMaxBackgroundPercents(ctx, since)
if err != nil {
return nil, fmt.Errorf("failed to read max background percents: %w", err)
}
ms := make([]components.Metric, 0, len(congestedPercents)+len(maxBackgroundPercents))
for _, m := range congestedPercents {
ms = append(ms, components.Metric{Metric: m})
}
for _, m := range maxBackgroundPercents {
ms = append(ms, components.Metric{Metric: m})
}

return ms, nil
}

func (c *component) Close() error {
log.Logger.Debugw("closing component")

// safe to call stop multiple times
c.poller.Stop(fuse_id.Name)

return nil
}

var _ components.PromRegisterer = (*component)(nil)

func (c *component) RegisterCollectors(reg *prometheus.Registry, db *sql.DB, tableName string) error {
c.gatherer = reg
return metrics.Register(reg, db, tableName)
}
98 changes: 98 additions & 0 deletions components/fuse/component_output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package fuse

import (
"context"
"sync"
"time"

fuse_id "github.com/leptonai/gpud/components/fuse/id"
"github.com/leptonai/gpud/components/fuse/metrics"
"github.com/leptonai/gpud/components/fuse/state"
components_metrics "github.com/leptonai/gpud/components/metrics"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/pkg/fuse"
)

type Output struct {
ConnectionInfos []fuse.ConnectionInfo `json:"connection_infos"`
}

var (
defaultPollerOnce sync.Once
defaultPoller query.Poller
)

// only set once since it relies on the kube client and specific port
func setDefaultPoller(cfg Config) {
defaultPollerOnce.Do(func() {
defaultPoller = query.New(
fuse_id.Name,
cfg.Query,
CreateGet(cfg),
nil,
)
})
}

func getDefaultPoller() query.Poller {
return defaultPoller
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(fuse_id.Name)
} else {
components_metrics.SetGetSuccess(fuse_id.Name)
}
}()

infos, err := fuse.ListConnections()
if err != nil {
return nil, err
}

now := time.Now().UTC()
nowUTC := float64(now.Unix())
metrics.SetLastUpdateUnixSeconds(nowUTC)

foundDev := make(map[string]fuse.ConnectionInfo)
for _, info := range infos {
// to dedup fuse connection stats by device name
if _, ok := foundDev[info.DeviceName]; ok {
continue
}

prev, err := state.FindEvent(ctx, cfg.Query.State.DB, now.Unix(), info.DeviceName)
if err != nil {
return nil, err
}
if prev == nil {
continue
}

if err := state.InsertEvent(ctx, cfg.Query.State.DB, state.Event{
UnixSeconds: now.Unix(),
DeviceName: info.DeviceName,
CongestedPercentAgainstThreshold: info.CongestedPercent,
MaxBackgroundPercentAgainstThreshold: info.MaxBackgroundPercent,
}); err != nil {
return nil, err
}

if err := metrics.SetConnectionsCongestedPercent(ctx, info.DeviceName, info.CongestedPercent, now); err != nil {
return nil, err
}
if err := metrics.SetConnectionsMaxBackgroundPercent(ctx, info.DeviceName, info.MaxBackgroundPercent, now); err != nil {
return nil, err
}

foundDev[info.DeviceName] = info
}

return &Output{
ConnectionInfos: infos,
}, nil
}
}
51 changes: 51 additions & 0 deletions components/fuse/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fuse

import (
"database/sql"
"encoding/json"

query_config "github.com/leptonai/gpud/components/query/config"
)

type Config struct {
Query query_config.Config `json:"query"`

// CongestedPercentAgainstThreshold is the percentage of the FUSE connections waiting
// at which we consider the system to be congested.
CongestedPercentAgainstThreshold float64 `json:"congested_percent_against_threshold"`

// MaxBackgroundPercentAgainstThreshold is the percentage of the FUSE connections waiting
// at which we consider the system to be congested.
MaxBackgroundPercentAgainstThreshold float64 `json:"max_background_percent_against_threshold"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
}
cfg := new(Config)
err = json.Unmarshal(raw, cfg)
if err != nil {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
}
return cfg, nil
}

const (
DefaultCongestedPercentAgainstThreshold = float64(90)
DefaultMaxBackgroundPercentAgainstThreshold = float64(80)
)

func (cfg *Config) Validate() error {
if cfg.CongestedPercentAgainstThreshold == 0 {
cfg.CongestedPercentAgainstThreshold = DefaultCongestedPercentAgainstThreshold
}
if cfg.MaxBackgroundPercentAgainstThreshold == 0 {
cfg.MaxBackgroundPercentAgainstThreshold = DefaultMaxBackgroundPercentAgainstThreshold
}
return nil
}
Loading

0 comments on commit 2b6002c

Please sign in to comment.