From ccae84b2d587ed4bfcaf48b342868ac7d5dc1b83 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 11 Oct 2023 13:47:55 +0400 Subject: [PATCH] *: Collect height and state data from chain Closes #95. Signed-off-by: Evgenii Baidakov --- cmd/neofs-net-monitor/monitor.go | 8 ++ pkg/monitor/metrics.go | 22 ++++++ pkg/monitor/monitor.go | 59 ++++++++++++++ pkg/multinodepool/pool.go | 127 +++++++++++++++++++++++++++++++ 4 files changed, 216 insertions(+) create mode 100644 pkg/multinodepool/pool.go diff --git a/cmd/neofs-net-monitor/monitor.go b/cmd/neofs-net-monitor/monitor.go index a179b41..e2fe876 100644 --- a/cmd/neofs-net-monitor/monitor.go +++ b/cmd/neofs-net-monitor/monitor.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" "github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain" "github.com/nspcc-dev/neofs-net-monitor/pkg/morphchain/contracts" + "github.com/nspcc-dev/neofs-net-monitor/pkg/multinodepool" "github.com/nspcc-dev/neofs-net-monitor/pkg/pool" "github.com/spf13/viper" "go.uber.org/zap" @@ -130,6 +131,11 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { }, ) + mnPool := multinodepool.NewPool(sideChainEndpoints, cfg.GetDuration(cfgMetricsInterval)) + if err = mnPool.Dial(ctx); err != nil { + return nil, fmt.Errorf("mnPool: %w", err) + } + return monitor.New(monitor.Args{ Balance: balance, Proxy: proxy, @@ -144,5 +150,7 @@ func New(ctx context.Context, cfg *viper.Viper) (*monitor.Monitor, error) { SideBlFetcher: sideBalanceFetcher, MainBlFetcher: mainBalanceFetcher, CnrFetcher: cnrFetcher, + HeightFetcher: mnPool, + StateFetcher: mnPool, }), nil } diff --git a/pkg/monitor/metrics.go b/pkg/monitor/metrics.go index ee3ddda..a6b73ac 100644 --- a/pkg/monitor/metrics.go +++ b/pkg/monitor/metrics.go @@ -167,4 +167,26 @@ var ( Help: "Number of available containers", }, ) + + chainHeight = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: "neofs_net_monitor", + Name: "chain_height", + Help: "Chain height in blocks", + }, + []string{ + "host", + }, + ) + + chainState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: "neofs_net_monitor", + Name: "chain_state", + Help: "Chain state hash in specific height", + }, + []string{ + "host", "hash", + }, + ) ) diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 62678f6..d85d15e 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -44,6 +44,24 @@ type ( Total() (int64, error) } + HeightFetcher interface { + FetchHeight() []HeightData + } + + StateFetcher interface { + FetchState(height uint32) []StateData + } + + HeightData struct { + Host string + Value uint32 + } + + StateData struct { + Host string + Value string + } + Node struct { ID uint64 Address string @@ -76,6 +94,8 @@ type ( sideBlFetcher BalanceFetcher mainBlFetcher BalanceFetcher cnrFetcher ContainerFetcher + heightFetcher HeightFetcher + stateFetcher StateFetcher } Args struct { @@ -92,6 +112,8 @@ type ( SideBlFetcher BalanceFetcher MainBlFetcher BalanceFetcher CnrFetcher ContainerFetcher + HeightFetcher HeightFetcher + StateFetcher StateFetcher } ) @@ -113,6 +135,8 @@ func New(p Args) *Monitor { sideBlFetcher: p.SideBlFetcher, mainBlFetcher: p.MainBlFetcher, cnrFetcher: p.CnrFetcher, + heightFetcher: p.HeightFetcher, + stateFetcher: p.StateFetcher, } } @@ -133,6 +157,8 @@ func (m *Monitor) Start(ctx context.Context) { prometheus.MustRegister(alphabetMainDivergence) prometheus.MustRegister(alphabetSideDivergence) prometheus.MustRegister(containersNumber) + prometheus.MustRegister(chainHeight) + prometheus.MustRegister(chainState) if err := m.geoFetcher.Open(); err != nil { m.logger.Warn("geoposition fetching disabled", zap.Error(err)) @@ -204,6 +230,9 @@ func (m *Monitor) Job(ctx context.Context) { m.processContainersNumber() + minHeight := m.processChainHeight() + m.processChainState(minHeight) + select { case <-time.After(m.sleep): // sleep for some time before next prometheus update @@ -507,3 +536,33 @@ func (m *Monitor) processContainersNumber() { containersNumber.Set(float64(total)) } + +func (m *Monitor) processChainHeight() uint32 { + var minHeight uint32 + heightData := m.heightFetcher.FetchHeight() + + for _, d := range heightData { + chainHeight.WithLabelValues(d.Host).Set(float64(d.Value)) + + if minHeight == 0 || d.Value < minHeight { + minHeight = d.Value + } + } + + return minHeight +} + +func (m *Monitor) processChainState(height uint32) { + if height == 0 { + return + } + + stateData := m.stateFetcher.FetchState(height) + chainState.Reset() + + h := float64(height) + + for _, d := range stateData { + chainState.WithLabelValues(d.Host, d.Value).Set(h) + } +} diff --git a/pkg/multinodepool/pool.go b/pkg/multinodepool/pool.go new file mode 100644 index 0000000..94719ee --- /dev/null +++ b/pkg/multinodepool/pool.go @@ -0,0 +1,127 @@ +package multinodepool + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neofs-net-monitor/pkg/monitor" +) + +// Pool collects data from each node. +type Pool struct { + endpoints []string + dialTimeout time.Duration + clients []*rpcclient.Client +} + +func NewPool(endpoints []string, dialTimeout time.Duration) *Pool { + return &Pool{ + endpoints: endpoints, + dialTimeout: dialTimeout, + clients: make([]*rpcclient.Client, len(endpoints)), + } +} + +func (c *Pool) Dial(ctx context.Context) error { + opts := rpcclient.Options{DialTimeout: c.dialTimeout} + + for i, ep := range c.endpoints { + neoClient, err := neoGoClient(ctx, ep, opts) + if err != nil { + return fmt.Errorf("neoGoClient: %w", err) + } + + c.clients[i] = neoClient + } + + return nil +} + +func (c *Pool) FetchHeight() []monitor.HeightData { + var ( + heights []monitor.HeightData + wg sync.WaitGroup + heightChan = make(chan monitor.HeightData, len(c.clients)) + ) + + for _, cl := range c.clients { + wg.Add(1) + + go func(cl *rpcclient.Client) { + defer wg.Done() + + stHeight, err := cl.GetStateHeight() + if err != nil { + log.Printf("GetStateHeight for %s: %v", cl.Endpoint(), err) + return + } + + heightChan <- monitor.HeightData{ + Host: cl.Endpoint(), + Value: stHeight.Local, + } + }(cl) + } + + go func() { + wg.Wait() + close(heightChan) + }() + + for height := range heightChan { + heights = append(heights, height) + } + + return heights +} + +func (c *Pool) FetchState(height uint32) []monitor.StateData { + var ( + states []monitor.StateData + wg sync.WaitGroup + stateChan = make(chan monitor.StateData, len(c.clients)) + ) + + for _, cl := range c.clients { + wg.Add(1) + + go func(cl *rpcclient.Client) { + defer wg.Done() + + stHeight, err := cl.GetStateRootByHeight(height) + if err != nil { + log.Printf("GetStateRootByHeight for %s: %v", cl.Endpoint(), err) + return + } + + stateChan <- monitor.StateData{ + Host: cl.Endpoint(), + Value: stHeight.Hash().String(), + } + }(cl) + } + + go func() { + wg.Wait() + close(stateChan) + }() + + for state := range stateChan { + states = append(states, state) + } + + return states +} + +func neoGoClient(ctx context.Context, endpoint string, opts rpcclient.Options) (*rpcclient.Client, error) { + cli, err := rpcclient.New(ctx, endpoint, opts) + if err != nil { + return nil, fmt.Errorf("can't create neo-go client: %w", err) + } + + return cli, nil +}