Skip to content

Commit

Permalink
Health monitoring: Start method should block
Browse files Browse the repository at this point in the history
  • Loading branch information
Merteg committed Oct 4, 2024
1 parent fd98585 commit e6919bd
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions health/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health
defer cancel()

m.Start(ctx, measurementsCh, healthCh, targetCh)
<-m.Done()
}()

doneWg.Add(1)
Expand Down Expand Up @@ -98,6 +99,8 @@ type Manager interface {
IsStarted() bool
// AddTargets provides a simple interface to register monitored targets
AddTargets(targets []*target.Target)

Done() <-chan struct{}
}

// NewManager creates a new Manager instance with internal target registry.
Expand Down Expand Up @@ -131,10 +134,8 @@ type healthManager struct {
}

func (hm *healthManager) Start(
ctx context.Context,
measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health,
targetIn chan<- *target.Target,
ctx context.Context, measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health, targetIn chan<- *target.Target,
) {
hm.mu.Lock()
hm.targetIn = targetIn
Expand All @@ -155,13 +156,14 @@ func (hm *healthManager) Start(
}()

hm.started.Store(true)

go func() {
hm.wg.Wait()
hm.started.Store(false)
hm.stopWait <- struct{}{}
close(hm.stopWait)
}()

hm.sendTargetsInfo() // if some targets where added before start we need to register them
hm.sendTargetsInfo() // if some targets were added before start we need to register them
}

func (hm *healthManager) Shutdown() {
Expand All @@ -172,6 +174,10 @@ func (hm *healthManager) Shutdown() {
close(hm.targetIn)
}

func (hm *healthManager) Done() <-chan struct{} {
return hm.stopWait
}

func (hm *healthManager) IsStarted() bool {
return hm.started.Load()
}
Expand Down Expand Up @@ -239,7 +245,7 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro
}
targetHealth, err = hm.buildTargetFromMeasure(measure)
if err != nil {
return fmt.Errorf("Unable to register target automatically: %w", err)
return fmt.Errorf("unable to register target automatically: %w", err)
}
hm.registry.setRawHealthForTarget(targetHealth)
}
Expand All @@ -250,12 +256,12 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro
case Metric:
err := hm.updateTargetsMetric(targetHealth, measure)
if err != nil {
return fmt.Errorf("Unable to update target metric: %w", err)
return fmt.Errorf("unable to update target metric: %w", err)
}
case CounterChange:
err := hm.updateTargetsCounter(targetHealth, measure)
if err != nil {
return fmt.Errorf("Unable to update target counter: %w", err)
return fmt.Errorf("unable to update target counter: %w", err)
}
case Message:
if measure.Message.AffectHealth {
Expand Down

0 comments on commit e6919bd

Please sign in to comment.