diff --git a/components/accelerator/nvidia/hw-slowdown/component.go b/components/accelerator/nvidia/hw-slowdown/component.go index 5ab8c050..70dc094d 100644 --- a/components/accelerator/nvidia/hw-slowdown/component.go +++ b/components/accelerator/nvidia/hw-slowdown/component.go @@ -57,6 +57,8 @@ const ( ) func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { + // the default nvidia poller persists the events to the storage + // so we can just read from the storage events, err := nvidia_clock_events_state.ReadEvents( ctx, c.db, diff --git a/components/accelerator/nvidia/persistence-mode/component_output.go b/components/accelerator/nvidia/persistence-mode/component_output.go index b9d00ff9..d6b17c18 100644 --- a/components/accelerator/nvidia/persistence-mode/component_output.go +++ b/components/accelerator/nvidia/persistence-mode/component_output.go @@ -18,10 +18,7 @@ func ToOutput(i *nvidia_query.Output) *Output { return &Output{} } - o := &Output{ - PersistencedExists: i.PersistencedExists, - PersistencedRunning: i.PersistencedRunning, - } + o := &Output{} if i.NVML != nil { for _, device := range i.NVML.DeviceInfos { @@ -39,9 +36,6 @@ func ToOutput(i *nvidia_query.Output) *Output { } type Output struct { - PersistencedExists bool `json:"persistenced_exists"` - PersistencedRunning bool `json:"persistenced_running"` - PersistenceModesSMI []nvidia_query.SMIGPUPersistenceMode `json:"persistence_modes_smi"` PersistenceModesNVML []nvidia_query_nvml.PersistenceMode `json:"persistence_modes_nvml"` } @@ -94,10 +88,6 @@ func (o *Output) Evaluate() (string, bool, error) { enabled := true for _, p := range o.PersistenceModesSMI { - if o.PersistencedRunning { - continue - } - // legacy mode (https://docs.nvidia.com/deploy/driver-persistence/index.html#installation) // "The reason why we cannot immediately deprecate the legacy persistence mode and switch transparently to the NVIDIA Persistence Daemon is because at this time, // we cannot guarantee that the NVIDIA Persistence Daemon will be running. This would be a feature regression as persistence mode might not be available out-of- the-box." @@ -108,10 +98,6 @@ func (o *Output) Evaluate() (string, bool, error) { } for _, p := range o.PersistenceModesNVML { - if o.PersistencedRunning { - continue - } - // legacy mode (https://docs.nvidia.com/deploy/driver-persistence/index.html#installation) // "The reason why we cannot immediately deprecate the legacy persistence mode and switch transparently to the NVIDIA Persistence Daemon is because at this time, // we cannot guarantee that the NVIDIA Persistence Daemon will be running. This would be a feature regression as persistence mode might not be available out-of- the-box." @@ -121,15 +107,6 @@ func (o *Output) Evaluate() (string, bool, error) { } } - // does not make the component unhealthy, since persistence mode can still be enabled - // recommend installing nvidia-persistenced since it's the recommended way to enable persistence mode - if !o.PersistencedExists { - reasons = append(reasons, "nvidia-persistenced does not exist (install 'nvidia-persistenced' or run 'nvidia-smi -pm 1')") - } - if !o.PersistencedRunning { - reasons = append(reasons, "nvidia-persistenced exists but not running (start 'nvidia-persistenced' or run 'nvidia-smi -pm 1')") - } - return strings.Join(reasons, "; "), enabled, nil } diff --git a/components/accelerator/nvidia/query/nvidia_persistenced.go b/components/accelerator/nvidia/query/nvidia_persistenced.go deleted file mode 100644 index 0c51191b..00000000 --- a/components/accelerator/nvidia/query/nvidia_persistenced.go +++ /dev/null @@ -1,20 +0,0 @@ -package query - -import ( - "os/exec" - - "github.com/leptonai/gpud/pkg/file" -) - -// Returns true if the local machine has "nvidia-persistenced". -// ref. https://docs.nvidia.com/deploy/driver-persistence/index.html#usage -func PersistencedExists() bool { - _, err := file.LocateExecutable("nvidia-persistenced") - return err == nil -} - -// "pidof nvidia-persistenced" -func PersistencedRunning() bool { - err := exec.Command("pidof", "nvidia-persistenced").Run() - return err == nil -} diff --git a/components/accelerator/nvidia/query/nvidia_smi_query.go b/components/accelerator/nvidia/query/nvidia_smi_query.go index 79dd1938..48debdd5 100644 --- a/components/accelerator/nvidia/query/nvidia_smi_query.go +++ b/components/accelerator/nvidia/query/nvidia_smi_query.go @@ -11,6 +11,7 @@ import ( "strings" metrics_clock_events_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/clock-events-state" + "github.com/leptonai/gpud/log" "github.com/leptonai/gpud/pkg/file" "github.com/leptonai/gpud/pkg/process" @@ -25,6 +26,7 @@ func SMIExists() bool { } func RunSMI(ctx context.Context, args ...string) ([]byte, error) { + log.Logger.Debugw("finding nvidia-smi") nvidiaSMIPath, err := file.LocateExecutable("nvidia-smi") if err != nil { return nil, fmt.Errorf("nvidia-smi not found (%w)", err) @@ -38,6 +40,7 @@ func RunSMI(ctx context.Context, args ...string) ([]byte, error) { return nil, err } + log.Logger.Debugw("starting nvidia-smi", "args", args) if err := p.Start(ctx); err != nil { return nil, err } @@ -63,9 +66,8 @@ func RunSMI(ctx context.Context, args ...string) ([]byte, error) { // [Sat Oct 12 18:38:44 2024] _nv042330rm+0x10/0x40 [nvidia] // [Sat Oct 12 18:38:44 2024] ? _nv043429rm+0x23c/0x290 errc := make(chan error, 1) - var output []byte + lines := make([]string, 0) go func() { - lines := make([]string, 0) err := process.Read( ctx, p, @@ -76,20 +78,18 @@ func RunSMI(ctx context.Context, args ...string) ([]byte, error) { }), process.WithWaitForCmd(), ) - errc <- err - output = []byte(strings.Join(lines, "\n")) }() select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, fmt.Errorf("nvidia-smi command timed out: %w\n\n(partial) output:\n%s", ctx.Err(), strings.Join(lines, "\n")) case err := <-errc: if err != nil { - return nil, fmt.Errorf("nvidia-smi command failed: %w\n\noutput:\n%s", err, string(output)) + return nil, fmt.Errorf("nvidia-smi command failed: %w\n\n(partial) output:\n%s", err, strings.Join(lines, "\n")) } - return output, nil + return []byte(strings.Join(lines, "\n")), nil } } @@ -101,6 +101,7 @@ func GetSMIOutput(ctx context.Context) (*SMIOutput, error) { if err != nil { return nil, err } + o, err := ParseSMIQueryOutput(qb) if err != nil { return nil, err diff --git a/components/accelerator/nvidia/query/nvml/nvml.go b/components/accelerator/nvidia/query/nvml/nvml.go index f6f21017..0f8f1cf8 100644 --- a/components/accelerator/nvidia/query/nvml/nvml.go +++ b/components/accelerator/nvidia/query/nvml/nvml.go @@ -175,9 +175,13 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { } nvmlLib := nvml.New() + + log.Logger.Debugw("initializing nvml library") if ret := nvmlLib.Init(); ret != nvml.SUCCESS { return nil, fmt.Errorf("failed to initialize NVML: %v", nvml.ErrorString(ret)) } + + log.Logger.Debugw("getting driver version from nvml library") driverVersion, err := GetDriverVersion() if err != nil { return nil, err @@ -186,6 +190,8 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { if err != nil { return nil, err } + + log.Logger.Debugw("checking if clock events are supported") clockEventsSupported := ClockEventsSupportedVersion(major) if !clockEventsSupported { log.Logger.Warnw("old nvidia driver -- skipping clock events, see https://github.com/NVIDIA/go-nvml/pull/123", "version", driverVersion) @@ -193,12 +199,16 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { log.Logger.Debugw("successfully initialized NVML", "driverVersion", driverVersion) + log.Logger.Debugw("creating device library") deviceLib := device.New(nvmlLib) + + log.Logger.Debugw("creating info library") infoLib := nvinfo.New( nvinfo.WithNvmlLib(nvmlLib), nvinfo.WithDeviceLib(deviceLib), ) + log.Logger.Debugw("checking if nvml exists from info library") nvmlExists, nvmlExistsMsg := infoLib.HasNvml() if !nvmlExists { log.Logger.Warnw("nvml not found", "message", nvmlExistsMsg) @@ -258,6 +268,7 @@ func (inst *instance) Start() error { inst.mu.Lock() defer inst.mu.Unlock() + log.Logger.Debugw("creating xid sxid event history table") ctx, cancel := context.WithTimeout(inst.rootCtx, 10*time.Second) defer cancel() if err := nvidia_xid_sxid_state.CreateTableXidSXidEventHistory(ctx, inst.db); err != nil { @@ -266,6 +277,7 @@ func (inst *instance) Start() error { // "NVIDIA Xid 79: GPU has fallen off the bus" may fail this syscall with: // "error getting device handle for index '6': Unknown Error" + log.Logger.Debugw("getting devices from device library") devices, err := inst.deviceLib.GetDevices() if err != nil { return err @@ -285,30 +297,38 @@ func (inst *instance) Start() error { } // TODO: this returns 0 for all GPUs... + log.Logger.Debugw("getting device minor number") minorNumber, ret := d.GetMinorNumber() if ret != nvml.SUCCESS { return fmt.Errorf("failed to get device minor number: %v", nvml.ErrorString(ret)) } // ref. https://docs.nvidia.com/deploy/nvml-api/group__nvmlDeviceQueries.html#group__nvmlDeviceQueries_1g8789a616b502a78a1013c45cbb86e1bd + log.Logger.Debugw("getting device pci info") pciInfo, ret := d.GetPciInfo() if ret != nvml.SUCCESS { return fmt.Errorf("failed to get device PCI info: %v", nvml.ErrorString(ret)) } + log.Logger.Debugw("getting device name") name, ret := d.GetName() if ret != nvml.SUCCESS { return fmt.Errorf("failed to get device name: %v", nvml.ErrorString(ret)) } + + log.Logger.Debugw("getting device cores") cores, ret := d.GetNumGpuCores() if ret != nvml.SUCCESS { return fmt.Errorf("failed to get device cores: %v", nvml.ErrorString(ret)) } + + log.Logger.Debugw("getting supported event types") supportedEvents, ret := d.GetSupportedEventTypes() if ret != nvml.SUCCESS { return fmt.Errorf("failed to get supported event types: %v", nvml.ErrorString(ret)) } + log.Logger.Debugw("registering events") ret = d.RegisterEvents(inst.xidEventMask&supportedEvents, inst.xidEventSet) if ret != nvml.SUCCESS { return fmt.Errorf("failed to register events: %v", nvml.ErrorString(ret)) @@ -318,6 +338,7 @@ func (inst *instance) Start() error { inst.xidErrorSupported = false } + log.Logger.Debugw("checking if gpm metrics are supported") gpmMetricsSpported, err := GPMSupportedByDevice(d) if err != nil { return err @@ -571,6 +592,8 @@ func StartDefaultInstance(rootCtx context.Context, opts ...OpOption) error { return nil } + log.Logger.Debugw("creating a new default nvml instance") + var err error defaultInstance, err = NewInstance(rootCtx, opts...) if err != nil { diff --git a/components/accelerator/nvidia/query/query.go b/components/accelerator/nvidia/query/query.go index 1534a404..5ebd0f38 100644 --- a/components/accelerator/nvidia/query/query.go +++ b/components/accelerator/nvidia/query/query.go @@ -96,18 +96,17 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { go_nvml.GPM_METRIC_FP16_UTIL, ), ); err != nil { - return nil, err + return nil, fmt.Errorf("failed to start nvml instance: %w", err) } o := &Output{ SMIExists: SMIExists(), - PersistencedExists: PersistencedExists(), - PersistencedRunning: PersistencedRunning(), FabricManagerExists: FabricManagerExists(), InfinibandClassExists: infiniband.CountInfinibandClass() > 0, IbstatExists: infiniband.IbstatExists(), } + log.Logger.Debugw("counting gpu devices") o.GPUDeviceCount, err = CountAllDevicesFromDevDir() if err != nil { log.Logger.Warnw("failed to count gpu devices", "error", err) @@ -129,6 +128,7 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { } if o.FabricManagerExists { + log.Logger.Debugw("checking fabric manager version") cctx, ccancel := context.WithTimeout(ctx, 30*time.Second) ver, err := CheckFabricManagerVersion(cctx) ccancel() @@ -136,6 +136,7 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { o.FabricManagerErrors = append(o.FabricManagerErrors, fmt.Sprintf("failed to check fabric manager version: %v", err)) } + log.Logger.Debugw("connecting to dbus") if err := systemd.ConnectDbus(); err != nil { log.Logger.Warnw("failed to connect to dbus", "error", err) @@ -172,6 +173,7 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { } if o.InfinibandClassExists && o.IbstatExists { + log.Logger.Debugw("running ibstat") cctx, ccancel := context.WithTimeout(ctx, 30*time.Second) o.Ibstat, err = infiniband.RunIbstat(cctx) ccancel() @@ -183,6 +185,7 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { } } + log.Logger.Debugw("checking lsmod peermem") cctx, ccancel := context.WithTimeout(ctx, 30*time.Second) o.LsmodPeermem, err = peermem.CheckLsmodPeermemModule(cctx) ccancel() @@ -190,9 +193,10 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { o.LsmodPeermemErrors = append(o.LsmodPeermemErrors, err.Error()) } + log.Logger.Debugw("waiting for default nvml instance") select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, fmt.Errorf("context canceled waiting for nvml instance: %w", ctx.Err()) case <-nvml.DefaultInstanceReady(): log.Logger.Debugw("default nvml instance ready") } @@ -221,125 +225,8 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { metrics_remapped_rows.SetLastUpdateUnixSeconds(nowUnix) for _, dev := range o.NVML.DeviceInfos { - log.Logger.Debugw("setting metrics for device", "uuid", dev.UUID, "bus", dev.BusID, "device", dev.DeviceID, "minorNumber", dev.MinorNumberID) - - if dev.ClockEvents != nil { - if err := metrics_clock.SetHWSlowdown(ctx, dev.UUID, dev.ClockEvents.HWSlowdown, now); err != nil { - return nil, err - } - if err := metrics_clock.SetHWSlowdownThermal(ctx, dev.UUID, dev.ClockEvents.HWSlowdownThermal, now); err != nil { - return nil, err - } - if err := metrics_clock.SetHWSlowdownPowerBrake(ctx, dev.UUID, dev.ClockEvents.HWSlowdownPowerBrake, now); err != nil { - return nil, err - } - } - - if err := metrics_clockspeed.SetGraphicsMHz(ctx, dev.UUID, dev.ClockSpeed.GraphicsMHz, now); err != nil { - return nil, err - } - if err := metrics_clockspeed.SetMemoryMHz(ctx, dev.UUID, dev.ClockSpeed.MemoryMHz, now); err != nil { - return nil, err - } - - if err := metrics_ecc.SetAggregateTotalCorrected(ctx, dev.UUID, float64(dev.ECCErrors.Aggregate.Total.Corrected), now); err != nil { - return nil, err - } - if err := metrics_ecc.SetAggregateTotalUncorrected(ctx, dev.UUID, float64(dev.ECCErrors.Aggregate.Total.Uncorrected), now); err != nil { - return nil, err - } - if err := metrics_ecc.SetVolatileTotalCorrected(ctx, dev.UUID, float64(dev.ECCErrors.Volatile.Total.Corrected), now); err != nil { - return nil, err - } - if err := metrics_ecc.SetVolatileTotalUncorrected(ctx, dev.UUID, float64(dev.ECCErrors.Volatile.Total.Uncorrected), now); err != nil { - return nil, err - } - - if err := metrics_memory.SetTotalBytes(ctx, dev.UUID, float64(dev.Memory.TotalBytes), now); err != nil { - return nil, err - } - metrics_memory.SetReservedBytes(dev.UUID, float64(dev.Memory.ReservedBytes)) - if err := metrics_memory.SetUsedBytes(ctx, dev.UUID, float64(dev.Memory.UsedBytes), now); err != nil { - return nil, err - } - metrics_memory.SetFreeBytes(dev.UUID, float64(dev.Memory.FreeBytes)) - usedPercent, err := dev.Memory.GetUsedPercent() - if err != nil { - o.NVMLErrors = append(o.NVMLErrors, err.Error()) - } else { - if err := metrics_memory.SetUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { - return nil, err - } - } - - if err := metrics_nvlink.SetFeatureEnabled(ctx, dev.UUID, dev.NVLink.States.AllFeatureEnabled(), now); err != nil { - return nil, err - } - if err := metrics_nvlink.SetReplayErrors(ctx, dev.UUID, dev.NVLink.States.TotalRelayErrors(), now); err != nil { - return nil, err - } - if err := metrics_nvlink.SetRecoveryErrors(ctx, dev.UUID, dev.NVLink.States.TotalRecoveryErrors(), now); err != nil { - return nil, err - } - if err := metrics_nvlink.SetCRCErrors(ctx, dev.UUID, dev.NVLink.States.TotalCRCErrors(), now); err != nil { - return nil, err - } - if err := metrics_nvlink.SetRxBytes(ctx, dev.UUID, float64(dev.NVLink.States.TotalThroughputRawRxBytes()), now); err != nil { - return nil, err - } - if err := metrics_nvlink.SetTxBytes(ctx, dev.UUID, float64(dev.NVLink.States.TotalThroughputRawTxBytes()), now); err != nil { - return nil, err - } - - if err := metrics_power.SetUsageMilliWatts(ctx, dev.UUID, float64(dev.Power.UsageMilliWatts), now); err != nil { - return nil, err - } - if err := metrics_power.SetEnforcedLimitMilliWatts(ctx, dev.UUID, float64(dev.Power.EnforcedLimitMilliWatts), now); err != nil { - return nil, err - } - usedPercent, err = dev.Power.GetUsedPercent() - if err != nil { - o.NVMLErrors = append(o.NVMLErrors, err.Error()) - } else { - if err := metrics_power.SetUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { - return nil, err - } - } - - if err := metrics_temperature.SetCurrentCelsius(ctx, dev.UUID, float64(dev.Temperature.CurrentCelsiusGPUCore), now); err != nil { - return nil, err - } - if err := metrics_temperature.SetThresholdSlowdownCelsius(ctx, dev.UUID, float64(dev.Temperature.ThresholdCelsiusSlowdown), now); err != nil { - return nil, err - } - usedPercent, err = dev.Temperature.GetUsedPercentSlowdown() - if err != nil { - o.NVMLErrors = append(o.NVMLErrors, err.Error()) - } else { - if err := metrics_temperature.SetSlowdownUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { - return nil, err - } - } - - if err := metrics_utilization.SetGPUUtilPercent(ctx, dev.UUID, dev.Utilization.GPUUsedPercent, now); err != nil { - return nil, err - } - if err := metrics_utilization.SetMemoryUtilPercent(ctx, dev.UUID, dev.Utilization.MemoryUsedPercent, now); err != nil { - return nil, err - } - - if err := metrics_processes.SetRunningProcessesTotal(ctx, dev.UUID, len(dev.Processes.RunningProcesses), now); err != nil { - return nil, err - } - - if err := metrics_remapped_rows.SetRemappedDueToUncorrectableErrors(ctx, dev.UUID, uint32(dev.RemappedRows.RemappedDueToCorrectableErrors), now); err != nil { - return nil, err - } - if err := metrics_remapped_rows.SetRemappingPending(ctx, dev.UUID, dev.RemappedRows.RemappingPending, now); err != nil { - return nil, err - } - if err := metrics_remapped_rows.SetRemappingFailed(ctx, dev.UUID, dev.RemappedRows.RemappingFailed, now); err != nil { - return nil, err + if err := setMetricsForDevice(ctx, dev, now, o); err != nil { + return nil, fmt.Errorf("failed to set metrics for device %s: %w", dev.UUID, err) } } } @@ -358,7 +245,7 @@ func Get(ctx context.Context, db *sql.DB) (output any, err error) { // as the NVML API provides all the data we need if o.SMIExists { // call this with a timeout, as a broken GPU may block the command. - cctx, ccancel := context.WithTimeout(ctx, time.Minute) + cctx, ccancel := context.WithTimeout(ctx, 2*time.Minute) o.SMI, err = GetSMIOutput(cctx) ccancel() if err != nil { @@ -414,9 +301,6 @@ type Output struct { // This implements "DCGM_FR_BAD_CUDA_ENV" logic in DCGM. BadEnvVarsForCUDA map[string]string `json:"bad_env_vars_for_cuda,omitempty"` - PersistencedExists bool `json:"persistenced_exists"` - PersistencedRunning bool `json:"persistenced_running"` - FabricManagerExists bool `json:"fabric_manager_exists"` FabricManager *FabricManagerOutput `json:"fabric_manager,omitempty"` FabricManagerErrors []string `json:"fabric_manager_errors,omitempty"` @@ -584,9 +468,9 @@ func (o *Output) PrintInfo(debug bool) { // ref. https://docs.nvidia.com/deploy/driver-persistence/index.html if dev.PersistenceMode.Enabled { - fmt.Printf("%s NVML persistence mode is enabled (nvidia-persistenced running %v)\n", checkMark, o.PersistencedRunning) + fmt.Printf("%s NVML persistence mode is enabled\n", checkMark) } else { - fmt.Printf("%s NVML persistence mode is disabled (nvidia-persistenced running %v)\n", warningSign, o.PersistencedRunning) + fmt.Printf("%s NVML persistence mode is disabled\n", warningSign) } if dev.ClockEvents != nil { @@ -668,3 +552,199 @@ func (o *Output) PrintInfo(debug bool) { } } } + +// setMetricsForDevice sets all metrics for a single device +func setMetricsForDevice(ctx context.Context, dev *nvml.DeviceInfo, now time.Time, o *Output) error { + log.Logger.Debugw("setting metrics for device", "uuid", dev.UUID, "bus", dev.BusID, "device", dev.DeviceID, "minorNumber", dev.MinorNumberID) + + if dev.ClockEvents != nil { + if err := setClockMetrics(ctx, dev, now); err != nil { + return err + } + } + + if err := setClockSpeedMetrics(ctx, dev, now); err != nil { + return err + } + + if err := setECCMetrics(ctx, dev, now); err != nil { + return err + } + + if err := setMemoryMetrics(ctx, dev, now, o); err != nil { + return err + } + + if err := setNVLinkMetrics(ctx, dev, now); err != nil { + return err + } + + if err := setPowerMetrics(ctx, dev, now, o); err != nil { + return err + } + + if err := setTemperatureMetrics(ctx, dev, now, o); err != nil { + return err + } + + if err := setUtilizationMetrics(ctx, dev, now); err != nil { + return err + } + + if err := setProcessMetrics(ctx, dev, now); err != nil { + return err + } + + if err := setRemappedRowsMetrics(ctx, dev, now); err != nil { + return err + } + + return nil +} + +func setClockMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_clock.SetHWSlowdown(ctx, dev.UUID, dev.ClockEvents.HWSlowdown, now); err != nil { + return err + } + if err := metrics_clock.SetHWSlowdownThermal(ctx, dev.UUID, dev.ClockEvents.HWSlowdownThermal, now); err != nil { + return err + } + if err := metrics_clock.SetHWSlowdownPowerBrake(ctx, dev.UUID, dev.ClockEvents.HWSlowdownPowerBrake, now); err != nil { + return err + } + return nil +} + +func setClockSpeedMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_clockspeed.SetGraphicsMHz(ctx, dev.UUID, dev.ClockSpeed.GraphicsMHz, now); err != nil { + return err + } + if err := metrics_clockspeed.SetMemoryMHz(ctx, dev.UUID, dev.ClockSpeed.MemoryMHz, now); err != nil { + return err + } + return nil +} + +func setECCMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_ecc.SetAggregateTotalCorrected(ctx, dev.UUID, float64(dev.ECCErrors.Aggregate.Total.Corrected), now); err != nil { + return err + } + if err := metrics_ecc.SetAggregateTotalUncorrected(ctx, dev.UUID, float64(dev.ECCErrors.Aggregate.Total.Uncorrected), now); err != nil { + return err + } + if err := metrics_ecc.SetVolatileTotalCorrected(ctx, dev.UUID, float64(dev.ECCErrors.Volatile.Total.Corrected), now); err != nil { + return err + } + if err := metrics_ecc.SetVolatileTotalUncorrected(ctx, dev.UUID, float64(dev.ECCErrors.Volatile.Total.Uncorrected), now); err != nil { + return err + } + return nil +} + +func setMemoryMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time, o *Output) error { + if err := metrics_memory.SetTotalBytes(ctx, dev.UUID, float64(dev.Memory.TotalBytes), now); err != nil { + return err + } + metrics_memory.SetReservedBytes(dev.UUID, float64(dev.Memory.ReservedBytes)) + if err := metrics_memory.SetUsedBytes(ctx, dev.UUID, float64(dev.Memory.UsedBytes), now); err != nil { + return err + } + metrics_memory.SetFreeBytes(dev.UUID, float64(dev.Memory.FreeBytes)) + usedPercent, err := dev.Memory.GetUsedPercent() + if err != nil { + o.NVMLErrors = append(o.NVMLErrors, err.Error()) + } else { + if err := metrics_memory.SetUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { + return err + } + } + return nil +} + +func setNVLinkMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_nvlink.SetFeatureEnabled(ctx, dev.UUID, dev.NVLink.States.AllFeatureEnabled(), now); err != nil { + return err + } + if err := metrics_nvlink.SetReplayErrors(ctx, dev.UUID, dev.NVLink.States.TotalRelayErrors(), now); err != nil { + return err + } + if err := metrics_nvlink.SetRecoveryErrors(ctx, dev.UUID, dev.NVLink.States.TotalRecoveryErrors(), now); err != nil { + return err + } + if err := metrics_nvlink.SetCRCErrors(ctx, dev.UUID, dev.NVLink.States.TotalCRCErrors(), now); err != nil { + return err + } + if err := metrics_nvlink.SetRxBytes(ctx, dev.UUID, float64(dev.NVLink.States.TotalThroughputRawRxBytes()), now); err != nil { + return err + } + if err := metrics_nvlink.SetTxBytes(ctx, dev.UUID, float64(dev.NVLink.States.TotalThroughputRawTxBytes()), now); err != nil { + return err + } + return nil +} + +func setPowerMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time, o *Output) error { + if err := metrics_power.SetUsageMilliWatts(ctx, dev.UUID, float64(dev.Power.UsageMilliWatts), now); err != nil { + return err + } + if err := metrics_power.SetEnforcedLimitMilliWatts(ctx, dev.UUID, float64(dev.Power.EnforcedLimitMilliWatts), now); err != nil { + return err + } + usedPercent, err := dev.Power.GetUsedPercent() + if err != nil { + o.NVMLErrors = append(o.NVMLErrors, err.Error()) + } else { + if err := metrics_power.SetUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { + return err + } + } + return nil +} + +func setTemperatureMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time, o *Output) error { + if err := metrics_temperature.SetCurrentCelsius(ctx, dev.UUID, float64(dev.Temperature.CurrentCelsiusGPUCore), now); err != nil { + return err + } + if err := metrics_temperature.SetThresholdSlowdownCelsius(ctx, dev.UUID, float64(dev.Temperature.ThresholdCelsiusSlowdown), now); err != nil { + return err + } + usedPercent, err := dev.Temperature.GetUsedPercentSlowdown() + if err != nil { + o.NVMLErrors = append(o.NVMLErrors, err.Error()) + } else { + if err := metrics_temperature.SetSlowdownUsedPercent(ctx, dev.UUID, usedPercent, now); err != nil { + return err + } + } + return nil +} + +func setUtilizationMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_utilization.SetGPUUtilPercent(ctx, dev.UUID, dev.Utilization.GPUUsedPercent, now); err != nil { + return err + } + if err := metrics_utilization.SetMemoryUtilPercent(ctx, dev.UUID, dev.Utilization.MemoryUsedPercent, now); err != nil { + return err + } + return nil +} + +func setProcessMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_processes.SetRunningProcessesTotal(ctx, dev.UUID, len(dev.Processes.RunningProcesses), now); err != nil { + return err + } + return nil +} + +func setRemappedRowsMetrics(ctx context.Context, dev *nvml.DeviceInfo, now time.Time) error { + if err := metrics_remapped_rows.SetRemappedDueToUncorrectableErrors(ctx, dev.UUID, uint32(dev.RemappedRows.RemappedDueToCorrectableErrors), now); err != nil { + return err + } + if err := metrics_remapped_rows.SetRemappingPending(ctx, dev.UUID, dev.RemappedRows.RemappingPending, now); err != nil { + return err + } + if err := metrics_remapped_rows.SetRemappingFailed(ctx, dev.UUID, dev.RemappedRows.RemappingFailed, now); err != nil { + return err + } + return nil +} diff --git a/components/docker/container/component_output.go b/components/docker/container/component_output.go index c09ff14d..7df947b6 100644 --- a/components/docker/container/component_output.go +++ b/components/docker/container/component_output.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os/exec" "strings" "sync" "time" @@ -13,7 +12,7 @@ import ( docker_container_id "github.com/leptonai/gpud/components/docker/container/id" components_metrics "github.com/leptonai/gpud/components/metrics" "github.com/leptonai/gpud/components/query" - "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/pkg/process" docker_types "github.com/docker/docker/api/types" docker_container "github.com/docker/docker/api/types/container" @@ -149,17 +148,13 @@ func CreateGet(cfg Config) query.GetFunc { } }() - // check if a process named "docker" is running - dockerRunning := false - if err := exec.Command("pidof", "docker").Run(); err == nil { - dockerRunning = true - } else { - log.Logger.Warnw("docker process not found, assuming docker is not running", "error", err) - } + cctx, ccancel := context.WithTimeout(ctx, 15*time.Second) + dockerRunning := process.CheckRunningByPid(cctx, "docker") + ccancel() // "ctx" here is the root level, create one with shorter timeouts // to not block on this checks - cctx, ccancel := context.WithTimeout(ctx, 30*time.Second) + cctx, ccancel = context.WithTimeout(ctx, 30*time.Second) dockerContainers, err := ListContainers(cctx) ccancel() if err != nil { diff --git a/components/k8s/pod/component_output.go b/components/k8s/pod/component_output.go index c4c59c86..f6a9e7ad 100644 --- a/components/k8s/pod/component_output.go +++ b/components/k8s/pod/component_output.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "os/exec" "strings" "sync" "time" @@ -16,6 +15,7 @@ import ( components_metrics "github.com/leptonai/gpud/components/metrics" "github.com/leptonai/gpud/components/query" "github.com/leptonai/gpud/log" + "github.com/leptonai/gpud/pkg/process" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -138,17 +138,13 @@ func CreateGet(cfg Config) query.GetFunc { } }() - // check if a process named "kubelet" is running - kubeletRunning := false - if err := exec.Command("pidof", "kubelet").Run(); err == nil { - kubeletRunning = true - } else { - log.Logger.Warnw("kubelet process not found, assuming kubelet is not running", "error", err) - } + cctx, ccancel := context.WithTimeout(ctx, 15*time.Second) + kubeletRunning := process.CheckRunningByPid(cctx, "kubelet") + ccancel() // "ctx" here is the root level, create one with shorter timeouts // to not block on this checks - cctx, ccancel := context.WithTimeout(ctx, 30*time.Second) + cctx, ccancel = context.WithTimeout(ctx, 30*time.Second) pods, err := ListFromKubeletReadOnlyPort(cctx, cfg.Port) ccancel() if err != nil { diff --git a/components/query/config/config.go b/components/query/config/config.go index 49c788ae..2a32a556 100644 --- a/components/query/config/config.go +++ b/components/query/config/config.go @@ -11,7 +11,7 @@ import ( const ( DefaultPollInterval = time.Minute - DefaultGetTimeout = 3 * time.Minute + DefaultGetTimeout = 7 * time.Minute DefaultQueueSize = 60 DefaultStateRetention = 30 * time.Minute ) diff --git a/pkg/process/pids.go b/pkg/process/pids.go index dbfb9d3e..3126929b 100644 --- a/pkg/process/pids.go +++ b/pkg/process/pids.go @@ -2,6 +2,7 @@ package process import ( "context" + "os/exec" "strings" "github.com/leptonai/gpud/log" @@ -9,6 +10,15 @@ import ( procs "github.com/shirou/gopsutil/v4/process" ) +func CheckRunningByPid(ctx context.Context, processName string) bool { + log.Logger.Debugw("checking if process is running", "processName", processName) + err := exec.CommandContext(ctx, "pidof", processName).Run() + if err != nil { + log.Logger.Debugw("failed to check -- assuming process is not running", "error", err) + } + return err == nil +} + // CountRunningPids returns the number of running pids. func CountRunningPids() (uint64, error) { pids, err := procs.Pids() diff --git a/pkg/process/utils.go b/pkg/process/utils.go index 2f970d09..7953fbdb 100644 --- a/pkg/process/utils.go +++ b/pkg/process/utils.go @@ -48,6 +48,8 @@ func WithReadStderr() ReadOpOption { } } +// Sets a function to process each line of the command output. +// Helps with debugging if command times out in the middle of reading. func WithProcessLine(fn func(line string)) ReadOpOption { return func(op *ReadOp) { op.processLine = fn @@ -79,6 +81,7 @@ func Read(ctx context.Context, p Process, opts ...ReadOpOption) error { scanner := bufio.NewScanner(combinedReader) for scanner.Scan() { + // helps with debugging if command times out in the middle of reading op.processLine(scanner.Text()) select {