Skip to content

Commit

Permalink
feat(*): separate read-only sqlite instance for better concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jan 6, 2025
1 parent c3bb2f7 commit 904a051
Show file tree
Hide file tree
Showing 103 changed files with 580 additions and 460 deletions.
2 changes: 1 addition & 1 deletion components/accelerator/nvidia/bad-envs/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, bad_envs_id.Name)

return &component{
Expand Down
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/bad-envs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
6 changes: 3 additions & 3 deletions components/accelerator/nvidia/clock-speed/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_clock_speed_id.Name)

return &component{
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *component) Close() error {

var _ components.PromRegisterer = (*component)(nil)

func (c *component) RegisterCollectors(reg *prometheus.Registry, db *sql.DB, tableName string) error {
func (c *component) RegisterCollectors(reg *prometheus.Registry, dbRW *sql.DB, dbRO *sql.DB, tableName string) error {
c.gatherer = reg
return nvidia_query_metrics_clockspeed.Register(reg, db, tableName)
return nvidia_query_metrics_clockspeed.Register(reg, dbRW, dbRO, tableName)
}
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/clock-speed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
6 changes: 3 additions & 3 deletions components/accelerator/nvidia/ecc/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_ecc_id.Name)

return &component{
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *component) Close() error {

var _ components.PromRegisterer = (*component)(nil)

func (c *component) RegisterCollectors(reg *prometheus.Registry, db *sql.DB, tableName string) error {
func (c *component) RegisterCollectors(reg *prometheus.Registry, dbRW *sql.DB, dbRO *sql.DB, tableName string) error {
c.gatherer = reg
return nvidia_query_metrics_ecc.Register(reg, db, tableName)
return nvidia_query_metrics_ecc.Register(reg, dbRW, dbRO, tableName)
}
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/ecc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
9 changes: 4 additions & 5 deletions components/accelerator/nvidia/error-xid-sxid/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package errorxidsxid

import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
Expand All @@ -24,24 +23,24 @@ func New(ctx context.Context, cfg Config) components.Component {

// this starts the Xid poller via "nvml.StartDefaultInstance"
cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_error_xid_sxid_id.Name)

return &component{
cfg: cfg,
rootCtx: ctx,
cancel: ccancel,
poller: nvidia_query.GetDefaultPoller(),
db: cfg.Query.State.DB,
}
}

var _ components.Component = (*component)(nil)

type component struct {
cfg Config
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
db *sql.DB
}

func (c *component) Name() string { return nvidia_error_xid_sxid_id.Name }
Expand All @@ -61,7 +60,7 @@ const (
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
events, err := nvidia_xid_sxid_state.ReadEvents(ctx, c.db, nvidia_xid_sxid_state.WithSince(since))
events, err := nvidia_xid_sxid_state.ReadEvents(ctx, c.cfg.Query.State.DBRO, nvidia_xid_sxid_state.WithSince(since))
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/error-xid-sxid/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
2 changes: 1 addition & 1 deletion components/accelerator/nvidia/error/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name)

return &component{
Expand Down
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/error/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/error/xid/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
2 changes: 1 addition & 1 deletion components/accelerator/nvidia/fabric-manager/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func New(ctx context.Context, cfg Config) (components.Component, error) {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Log.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Log.Query.State.DBRW, cfg.Log.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name)

if err := cfg.Log.Validate(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions components/accelerator/nvidia/fabric-manager/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@ func TestComponentLog(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := sqlite.Open(":memory:")
dbRW, err := sqlite.Open(":memory:")
if err != nil {
t.Fatalf("failed to open database: %v", err)
}
defer db.Close()
defer dbRW.Close()

dbRO, err := sqlite.Open(":memory:", sqlite.WithReadOnly(true))
if err != nil {
t.Fatalf("failed to open database: %v", err)
}
defer dbRO.Close()

pollInterval := 3 * time.Second
component, err := New(
Expand All @@ -43,7 +49,8 @@ func TestComponentLog(t *testing.T) {
Query: query_config.Config{
Interval: metav1.Duration{Duration: pollInterval},
State: &query_config.State{
DB: db,
DBRW: dbRW,
DBRO: dbRO,
},
},
BufferSize: query_log_config.DefaultBufferSize,
Expand Down
9 changes: 7 additions & 2 deletions components/accelerator/nvidia/fabric-manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Config struct {
Log query_log_config.Config `json:"log"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -28,7 +28,12 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
if cfg.Log.Query.State != nil {
cfg.Log.Query.State.DBRW = dbRW
cfg.Log.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
4 changes: 2 additions & 2 deletions components/accelerator/nvidia/gpm/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *component) Close() error {

var _ components.PromRegisterer = (*component)(nil)

func (c *component) RegisterCollectors(reg *prometheus.Registry, db *sql.DB, tableName string) error {
func (c *component) RegisterCollectors(reg *prometheus.Registry, dbRW *sql.DB, dbRO *sql.DB, tableName string) error {
c.gatherer = reg
return nvidia_query_metrics_gpm.Register(reg, db, tableName)
return nvidia_query_metrics_gpm.Register(reg, dbRW, dbRO, tableName)
}
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/gpm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_gsp_firmware_mode_id.Name)

return &component{
Expand Down
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/gsp-firmware-mode/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
12 changes: 6 additions & 6 deletions components/accelerator/nvidia/hw-slowdown/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

cctx, ccancel := context.WithCancel(ctx)
nvidia_query.SetDefaultPoller(cfg.Query.State.DB)
nvidia_query.SetDefaultPoller(cfg.Query.State.DBRW, cfg.Query.State.DBRO)
nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_hw_slowdown_id.Name)

return &component{
rootCtx: ctx,
cancel: ccancel,
poller: nvidia_query.GetDefaultPoller(),
db: cfg.Query.State.DB,
dbRO: cfg.Query.State.DBRO,
}
}

Expand All @@ -42,7 +42,7 @@ type component struct {
cancel context.CancelFunc
poller query.Poller
gatherer prometheus.Gatherer
db *sql.DB
dbRO *sql.DB
}

func (c *component) Name() string { return nvidia_hw_slowdown_id.Name }
Expand All @@ -61,7 +61,7 @@ func (c *component) Events(ctx context.Context, since time.Time) ([]components.E
// so we can just read from the storage
events, err := nvidia_clock_events_state.ReadEvents(
ctx,
c.db,
c.dbRO,
nvidia_clock_events_state.WithSince(since),

// in order to dedup nvidia-smi events and prioritize nvml events
Expand Down Expand Up @@ -148,7 +148,7 @@ func (c *component) Close() error {

var _ components.PromRegisterer = (*component)(nil)

func (c *component) RegisterCollectors(reg *prometheus.Registry, db *sql.DB, tableName string) error {
func (c *component) RegisterCollectors(reg *prometheus.Registry, dbRW *sql.DB, dbRO *sql.DB, tableName string) error {
c.gatherer = reg
return nvidia_query_metrics_clock.Register(reg, db, tableName)
return nvidia_query_metrics_clock.Register(reg, dbRW, dbRO, tableName)
}
5 changes: 3 additions & 2 deletions components/accelerator/nvidia/hw-slowdown/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
func ParseConfig(b any, dbRW *sql.DB, dbRO *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
Expand All @@ -22,7 +22,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
cfg.Query.State.DBRW = dbRW
cfg.Query.State.DBRO = dbRO
}
return cfg, nil
}
Expand Down
Loading

0 comments on commit 904a051

Please sign in to comment.