Skip to content

Commit

Permalink
eventstream: group replay goroutines in errgroup
Browse files Browse the repository at this point in the history
By switching from a WaitGroup to an errgroup, a quicker exit was
possible with the groups' custom context.
  • Loading branch information
oxzi committed Nov 2, 2023
1 parent 2b10c44 commit 45f533a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
26 changes: 15 additions & 11 deletions internal/eventstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"net/http"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -323,25 +323,29 @@ func (client *Client) enterReplayPhase() {
return
}

queryFns := []func(string){client.checkMissedAcknowledgements, client.checkMissedStateChanges}
queryFns := []func(string, context.Context) error{client.checkMissedAcknowledgements, client.checkMissedStateChanges}
objTypes := []string{"host", "service"}

var replayWg sync.WaitGroup
replayWg.Add(len(queryFns) * len(objTypes))

group, groupCtx := errgroup.WithContext(client.Ctx)
for _, fn := range queryFns {
for _, objType := range objTypes {
go func(fn func(string), objType string) {
fn(objType)
replayWg.Done()
}(fn, objType)
fn, objType := fn, objType // https://go.dev/doc/faq#closures_and_goroutines
group.Go(func() error {
return fn(objType, groupCtx)
})
}
}

go func() {
startTime := time.Now()
replayWg.Wait()
client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime)))

err := group.Wait()
if err != nil {
client.Logger.Errorw("Replaying the API resulted in errors", zap.Error(err), zap.Duration("duration", time.Since(startTime)))
} else {
client.Logger.Debugw("All replay phase workers have finished", zap.Duration("duration", time.Since(startTime)))
}

client.replayTrigger <- struct{}{}
}()
}
Expand Down
74 changes: 34 additions & 40 deletions internal/eventstream/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,42 +178,37 @@ func (client *Client) fetchAcknowledgementComment(host, service string, ackTime
// If a filterExpr is given (non-empty string), it will be used for the query. Otherwise, all objects will be requested.
//
// The callback function will be called f.e. object of the objType (i.e. "host" or "service") being retrieved from the
// Icinga 2 Objects API. The callback function or a later caller must decide if this object should be replayed.
func (client *Client) checkMissedChanges(objType, filterExpr string, attrsCallbackFn func(attrs HostServiceRuntimeAttributes, host, service string)) {
var (
logger = client.Logger.With(zap.String("object type", objType))

jsonRaw io.ReadCloser
err error
)
// Icinga 2 Objects API sequentially. The callback function or a later caller decides if this object should be replayed.
func (client *Client) checkMissedChanges(
objType, filterExpr string,
attrsCallbackFn func(attrs HostServiceRuntimeAttributes, host, service string) error,
) (err error) {
logger := client.Logger.With(zap.String("object type", objType), zap.String("filter expr", filterExpr))

defer func() {
if err != nil {
logger.Errorw("Querying API for replay failed", zap.Error(err))
}
}()

var jsonRaw io.ReadCloser
if filterExpr == "" {
jsonRaw, err = client.queryObjectsApiDirect(objType, "")
} else {
jsonRaw, err = client.queryObjectsApiQuery(objType, map[string]any{"filter": filterExpr})
}
if err != nil {
logger.Errorw("Querying API failed", zap.Error(err))
return
}

objQueriesResults, err := extractObjectQueriesResult[HostServiceRuntimeAttributes](jsonRaw)
if err != nil {
logger.Errorw("Parsing API response failed", zap.Error(err))
return
}

if len(objQueriesResults) == 0 {
return
}

logger.Debugw("Querying API resulted in state changes", zap.Int("changes", len(objQueriesResults)))

for _, objQueriesResult := range objQueriesResults {
if client.Ctx.Err() != nil {
logger.Warnw("Stopping API response processing as context is finished", zap.Error(client.Ctx.Err()))
return
}

var hostName, serviceName string
switch objQueriesResult.Type {
case "Host":
Expand All @@ -224,57 +219,56 @@ func (client *Client) checkMissedChanges(objType, filterExpr string, attrsCallba
serviceName = objQueriesResult.Attrs.Name

default:
logger.Errorw("Querying API delivered a wrong object type", zap.String("result type", objQueriesResult.Type))
continue
err = fmt.Errorf("querying API delivered a wrong object type %q", objQueriesResult.Type)
return
}

attrsCallbackFn(objQueriesResult.Attrs, hostName, serviceName)
err = attrsCallbackFn(objQueriesResult.Attrs, hostName, serviceName)
if err != nil {
return
}
}
return
}

// checkMissedStateChanges fetches all objects of the requested type and feeds them into the handler.
func (client *Client) checkMissedStateChanges(objType string) {
client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) {
logger := client.Logger.With(zap.String("object type", objType))

func (client *Client) checkMissedStateChanges(objType string, ctx context.Context) error {
return client.checkMissedChanges(objType, "", func(attrs HostServiceRuntimeAttributes, host, service string) error {
ev, err := client.buildHostServiceEvent(attrs.LastCheckResult, attrs.State, host, service)
if err != nil {
logger.Errorw("Failed to construct Event from API", zap.Error(err))
return
return fmt.Errorf("failed to construct Event from API, %w", err)
}

select {
case <-client.Ctx.Done():
logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err()))
case <-ctx.Done():
return ctx.Err()
case client.eventDispatcherReplay <- &eventMsg{ev, attrs.LastStateChange.Time}:
return nil
}
})
}

// checkMissedAcknowledgements fetches all Host or Service Acknowledgements and feeds them into the handler.
//
// Currently only active acknowledgements are being processed.
func (client *Client) checkMissedAcknowledgements(objType string) {
func (client *Client) checkMissedAcknowledgements(objType string, ctx context.Context) error {
filterExpr := fmt.Sprintf("%s.acknowledgement", objType)
client.checkMissedChanges(objType, filterExpr, func(attrs HostServiceRuntimeAttributes, host, service string) {
logger := client.Logger.With(zap.String("object type", objType))

return client.checkMissedChanges(objType, filterExpr, func(attrs HostServiceRuntimeAttributes, host, service string) error {
ackComment, err := client.fetchAcknowledgementComment(host, service, attrs.AcknowledgementLastChange.Time)
if err != nil {
logger.Errorw("Cannot fetch ACK Comment for Acknowledgement", zap.Error(err))
return
return fmt.Errorf("cannot fetch ACK Comment for Acknowledgement, %w", err)
}

ev, err := client.buildAcknowledgementEvent(host, service, ackComment.Author, ackComment.Text)
if err != nil {
logger.Errorw("Failed to construct Event from Acknowledgement API", zap.Error(err))
return
return fmt.Errorf("failed to construct Event from Acknowledgement API, %w", err)
}

select {
case <-client.Ctx.Done():
logger.Warnw("Cannot dispatch replayed event as context is finished", zap.Error(client.Ctx.Err()))
case <-ctx.Done():
return ctx.Err()
case client.eventDispatcherReplay <- &eventMsg{ev, attrs.AcknowledgementLastChange.Time}:
return nil
}
})
}
Expand Down

0 comments on commit 45f533a

Please sign in to comment.