From cdc6ce8b1116679c8bb2c0566d91b486e1c73cf2 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 16 Jul 2024 17:29:44 +0200 Subject: [PATCH 1/2] Don't set `server#WriteTimeout` Setting `10s` as a write timeout might not cause any problems when the request is processed successfully, but when we e.g. have to retry any database errors, which is `5m` by default, this will just in an i/o timeout and the listener won't event write any hints about that, but will silently abort the connections. --- internal/listener/listener.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/listener/listener.go b/internal/listener/listener.go index c7d26ae6..785060b4 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -55,11 +55,10 @@ func (l *Listener) Run(ctx context.Context) error { listenAddr := daemon.Config().Listen l.logger.Infof("Starting listener on http://%s", listenAddr) server := &http.Server{ - Addr: listenAddr, - Handler: l, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, + Addr: listenAddr, + Handler: l, + ReadTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, } serverErr := make(chan error) From 37376db8dd4d03cee7758e8f1a567e5d2c8bf3b3 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 12 Jul 2024 11:03:50 +0200 Subject: [PATCH 2/2] Make logging more consistent --- cmd/channels/email/main.go | 2 +- cmd/channels/rocketchat/main.go | 4 +- internal/config/runtime.go | 4 +- internal/filter/parser_test.go | 2 +- internal/icinga2/client.go | 4 +- internal/icinga2/client_api.go | 4 +- internal/icinga2/launcher.go | 6 +-- internal/incident/incident.go | 78 ++++++++++++--------------------- internal/incident/sync.go | 12 ++--- internal/listener/listener.go | 5 ++- internal/object/object.go | 2 +- internal/object/objects.go | 2 +- internal/rule/condition.go | 2 +- internal/utils/utils.go | 6 +-- 14 files changed, 53 insertions(+), 80 deletions(-) diff --git a/cmd/channels/email/main.go b/cmd/channels/email/main.go index 87580f5b..bc1a206c 100644 --- a/cmd/channels/email/main.go +++ b/cmd/channels/email/main.go @@ -45,7 +45,7 @@ func (ch *Email) SendNotification(req *plugin.NotificationRequest) error { } if len(to) == 0 { - return fmt.Errorf("contact user %s doesn't have an e-mail address", req.Contact.FullName) + return fmt.Errorf("contact user %s does not have an e-mail address", req.Contact.FullName) } var msg bytes.Buffer diff --git a/cmd/channels/rocketchat/main.go b/cmd/channels/rocketchat/main.go index b441234d..49afb0b2 100644 --- a/cmd/channels/rocketchat/main.go +++ b/cmd/channels/rocketchat/main.go @@ -36,7 +36,7 @@ func (ch *RocketChat) SendNotification(req *plugin.NotificationRequest) error { } if roomId == "" { - return fmt.Errorf("contact user %s doesn't specify a rocketchat channel or username", req.Contact.FullName) + return fmt.Errorf("contact user %s does not specify a rocketchat channel or username", req.Contact.FullName) } message := struct { @@ -64,7 +64,7 @@ func (ch *RocketChat) SendNotification(req *plugin.NotificationRequest) error { client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(request) if err != nil { - return fmt.Errorf("error while sending http request to rocketchat server: %s", err) + return fmt.Errorf("error while sending http request to rocketchat server: %w", err) } defer resp.Body.Close() diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 4c47d843..a3d26781 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -191,12 +191,12 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg sourceIdRaw, sourceIdOk := strings.CutPrefix(user, "source-") if !sourceIdOk { - logger.Debugw("Cannot extract source ID from HTTP basic auth username", zap.String("user-input", user)) + logger.Debugw("Cannot extract source ID from HTTP basic auth username", zap.String("user_input", user)) return nil } sourceId, err := strconv.ParseInt(sourceIdRaw, 10, 64) if err != nil { - logger.Debugw("Cannot convert extracted source Id to int", zap.String("user-input", user), zap.Error(err)) + logger.Debugw("Cannot convert extracted source Id to int", zap.String("user_input", user), zap.Error(err)) return nil } diff --git a/internal/filter/parser_test.go b/internal/filter/parser_test.go index b23a7955..ca386f67 100644 --- a/internal/filter/parser_test.go +++ b/internal/filter/parser_test.go @@ -141,7 +141,7 @@ func TestFilter(t *testing.T) { assert.Nil(t, err, "There should be no errors but got: %s", err) expected := &Condition{op: Equal, column: "foo", value: "bar"} - assert.Equal(t, expected, rule, "Parser doesn't parse single condition correctly") + assert.Equal(t, expected, rule, "Parser does not parse single condition correctly") }) t.Run("UrlEncodedFilterExpression", func(t *testing.T) { diff --git a/internal/icinga2/client.go b/internal/icinga2/client.go index eff1fafc..5eb92f63 100644 --- a/internal/icinga2/client.go +++ b/internal/icinga2/client.go @@ -397,7 +397,7 @@ func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEve if err != nil && !errors.Is(err, context.Canceled) { client.Logger.Debugw("Catch-up-phase event worker failed", zap.Stringer("worker", workerId), - zap.String("object type", objType), + zap.String("object_type", objType), zap.Error(err)) } return err @@ -531,7 +531,7 @@ func (client *Client) worker() { client.Logger.Debugw("Event to be replayed is not in cache", zap.Stringer("event", ev)) } else if ev.Time.Before(ts) { client.Logger.Debugw("Skip replaying outdated Event Stream event", zap.Stringer("event", ev), - zap.Time("event timestamp", ev.Time), zap.Time("cache timestamp", ts)) + zap.Time("event_timestamp", ev.Time), zap.Time("cache_timestamp", ts)) break } diff --git a/internal/icinga2/client_api.go b/internal/icinga2/client_api.go index 315d5a09..a4ac9093 100644 --- a/internal/icinga2/client_api.go +++ b/internal/icinga2/client_api.go @@ -231,8 +231,8 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ca var stateChangeEvents, muteEvents, unmuteEvents int defer func() { client.Logger.Debugw("Querying API emitted events", - zap.String("object type", objType), - zap.Int("state changes", stateChangeEvents), + zap.String("object_type", objType), + zap.Int("state_changes", stateChangeEvents), zap.Int("mute_events", muteEvents), zap.Int("unmute_events", unmuteEvents)) }() diff --git a/internal/icinga2/launcher.go b/internal/icinga2/launcher.go index 58d8491d..becc9226 100644 --- a/internal/icinga2/launcher.go +++ b/internal/icinga2/launcher.go @@ -40,7 +40,7 @@ func (launcher *Launcher) Launch(src *config.Source) { if !launcher.isReady { launcher.Logs.GetChildLogger("icinga2"). - With(zap.Int64("source-id", src.ID)). + With(zap.Int64("source_id", src.ID)). Debug("Postponing Event Stream Client Launch as Launcher is not ready yet") launcher.waitingSources = append(launcher.waitingSources, src) return @@ -57,7 +57,7 @@ func (launcher *Launcher) Ready() { launcher.isReady = true for _, src := range launcher.waitingSources { launcher.Logs.GetChildLogger("icinga2"). - With(zap.Int64("source-id", src.ID)). + With(zap.Int64("source_id", src.ID)). Debug("Launching postponed Event Stream Client") launcher.launch(src) } @@ -66,7 +66,7 @@ func (launcher *Launcher) Ready() { // launch a new Icinga 2 Event Stream API Client based on the config.Source configuration. func (launcher *Launcher) launch(src *config.Source) { - logger := launcher.Logs.GetChildLogger("icinga2").With(zap.Int64("source-id", src.ID)) + logger := launcher.Logs.GetChildLogger("icinga2").With(zap.Int64("source_id", src.ID)) if src.Type != config.SourceTypeIcinga2 || !src.Icinga2BaseURL.Valid || diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 18b8b0a4..e8692b00 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -138,16 +138,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { tx, err := i.db.BeginTxx(ctx, nil) if err != nil { - i.logger.Errorw("Can't start a db transaction", zap.Error(err)) - - return errors.New("can't start a db transaction") + i.logger.Errorw("Cannot start a db transaction", zap.Error(err)) + return err } defer func() { _ = tx.Rollback() }() if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) - - return errors.New("can't insert event and fetch its ID") + return err } isNew := i.StartedAt.Time().IsZero() @@ -161,14 +159,13 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } if err = i.AddEvent(ctx, tx, ev); err != nil { - i.logger.Errorw("Can't insert incident event to the database", zap.Error(err)) - - return errors.New("can't insert incident event to the database") + i.logger.Errorw("Cannot insert incident event to the database", zap.Error(err)) + return err } if err := i.handleMuteUnmute(ctx, tx, ev); err != nil { i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) - return errors.New("cannot insert incident muted history") + return err } switch ev.Type { @@ -214,9 +211,8 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } if err = tx.Commit(); err != nil { - i.logger.Errorw("Can't commit db transaction", zap.Error(err)) - - return errors.New("can't commit db transaction") + i.logger.Errorw("Cannot commit db transaction", zap.Error(err)) + return err } // We've just committed the DB transaction and can safely update the incident muted flag. @@ -263,7 +259,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { } if err = i.AddEvent(ctx, tx, ev); err != nil { - return fmt.Errorf("can't insert incident event to the database: %w", err) + return fmt.Errorf("cannot insert incident event to the database: %w", err) } if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil { @@ -276,7 +272,6 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { } notifications, err = i.generateNotifications(ctx, tx, ev, channels) - return err }) if err != nil { @@ -313,8 +308,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, if err := hr.Sync(ctx, i.db, tx); err != nil { i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) - - return errors.New("failed to insert incident severity changed history") + return err } if newSeverity == event.SeverityOK { @@ -331,9 +325,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, } if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err)) - - return errors.New("can't insert incident closed history to the database") + i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) + return err } if i.timer != nil { @@ -344,8 +337,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, i.Severity = newSeverity if err := i.Sync(ctx, tx); err != nil { i.logger.Errorw("Failed to update incident severity", zap.Error(err)) - - return errors.New("failed to update incident severity") + return err } return nil @@ -355,9 +347,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, i.StartedAt = types.UnixMilli(ev.Time) i.Severity = ev.Severity if err := i.Sync(ctx, tx); err != nil { - i.logger.Errorw("Can't insert incident to the database", zap.Error(err)) - - return errors.New("can't insert incident to the database") + i.logger.Errorw("Cannot insert incident to the database", zap.Error(err)) + return err } i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) @@ -372,9 +363,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, } if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Can't insert incident opened history event", zap.Error(err)) - - return errors.New("can't insert incident opened history event") + i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) + return err } return nil @@ -430,8 +420,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 err = i.AddRuleMatched(ctx, tx, r) if err != nil { i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) - - return errors.New("failed to insert incident rule") + return err } hr := &HistoryRow{ @@ -443,8 +432,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 } if err := hr.Sync(ctx, i.db, tx); err != nil { i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) - - return errors.New("failed to insert rule matched incident history") + return err } } } @@ -540,8 +528,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even "Failed to upsert escalation state", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) - - return errors.New("failed to upsert escalation state") + return err } hr := &HistoryRow{ @@ -558,8 +545,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even "Failed to insert escalation triggered incident history", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) - - return errors.New("failed to insert escalation triggered incident history") + return err } if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { @@ -672,11 +658,8 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, } if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( - "Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err), - ) - - return errors.New("failed to add recipient role changed history") + i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) + return err } cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} @@ -684,11 +667,8 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, stmt, _ := i.db.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { - i.logger.Errorw( - "Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err), - ) - - return errors.New("failed to upsert incident contact") + i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) + return err } return nil @@ -747,12 +727,8 @@ func (i *Incident) restoreRecipients(ctx context.Context) error { var contacts []*ContactRow err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) if err != nil { - i.logger.Errorw( - "Failed to restore incident recipients from the database", zap.String("object", i.IncidentObject().DisplayName()), - zap.String("incident", i.String()), zap.Error(err), - ) - - return errors.New("failed to restore incident recipients") + i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) + return err } recipients := make(map[recipient.Key]*RecipientState) diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 2645a7a7..c54fe61d 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -2,7 +2,6 @@ package incident import ( "context" - "errors" "fmt" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" @@ -30,7 +29,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { stmt, _ := i.db.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { - return fmt.Errorf("failed to upsert incident: %s", err) + return fmt.Errorf("failed to upsert incident: %w", err) } } else { stmt := utils.BuildInsertStmtWithout(i.db, i, "id") @@ -103,8 +102,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru "Failed to insert recipient role changed incident history", zap.Object("escalation", escalation), zap.String("recipients", r.String()), zap.Error(err), ) - - return errors.New("failed to insert recipient role changed incident history") + return err } } cr.Role = state.Role @@ -117,8 +115,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru "Failed to upsert incident recipient", zap.Object("escalation", escalation), zap.String("recipient", r.String()), zap.Error(err), ) - - return errors.New("failed to upsert incident recipient") + return err } } @@ -165,8 +162,7 @@ func (i *Incident) generateNotifications( i.logger.Errorw("Failed to insert incident notification history", zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.Object.IsMuted()), zap.Error(err)) - - return nil, errors.New("cannot insert incident notification history") + return nil, err } if !suppress { diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 785060b4..cac62448 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -85,7 +85,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { msg = fmt.Sprintf(format, a...) } - logger := l.logger.With(zap.Int("status-code", statusCode), zap.String("msg", msg)) + logger := l.logger.With(zap.Int("status_code", statusCode), zap.String("message", msg)) if ev != nil { logger = logger.With(zap.Stringer("event", ev)) } @@ -134,7 +134,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { l.logger.Infow("Processing event", zap.String("event", ev.String())) err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) if err != nil { - abort(http.StatusInternalServerError, &ev, err.Error()) + l.logger.Errorw("Failed to successfully process event", zap.Stringer("event", &ev), zap.Error(err)) + abort(http.StatusInternalServerError, &ev, "event could not be processed successfully, see server logs for details") return } diff --git a/internal/object/object.go b/internal/object/object.go index fc15c26d..9e542535 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -129,7 +129,7 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, } if err = tx.Commit(); err != nil { - return nil, fmt.Errorf("can't commit object database transaction: %w", err) + return nil, fmt.Errorf("cannot commit object database transaction: %w", err) } if !objectExists { diff --git a/internal/object/objects.go b/internal/object/objects.go index b7cce642..fbfa82c0 100644 --- a/internal/object/objects.go +++ b/internal/object/objects.go @@ -69,7 +69,7 @@ func restoreObjectsFromQuery(ctx context.Context, db *database.DB, query string, } }) - return errors.Wrap(err, "cannot restore muted objects without an active incident") + return errors.Wrap(err, "cannot restore objects") }) g.Go(func() error { diff --git a/internal/rule/condition.go b/internal/rule/condition.go index 9bf4db3b..97368e44 100644 --- a/internal/rule/condition.go +++ b/internal/rule/condition.go @@ -84,7 +84,7 @@ func (e *EscalationFilter) EvalLess(key string, value string) (bool, error) { } func (e *EscalationFilter) EvalLike(key string, value string) (bool, error) { - return false, fmt.Errorf("escalation filter doesn't support wildcard matches") + return false, fmt.Errorf("escalation filter does not support wildcard matches") } func (e *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, error) { diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 86eac7d6..40f1898a 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -63,17 +63,17 @@ func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) ( err = preparedStmt.Get(&lastInsertId, args) if err != nil { - return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) + return 0, fmt.Errorf("failed to insert entry for type %T: %w", args, err) } } else { result, err := tx.NamedExecContext(ctx, stmt, args) if err != nil { - return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) + return 0, fmt.Errorf("failed to insert entry for type %T: %w", args, err) } lastInsertId, err = result.LastInsertId() if err != nil { - return 0, fmt.Errorf("failed to fetch last insert id for type %T: %s", args, err) + return 0, fmt.Errorf("failed to fetch last insert id for type %T: %w", args, err) } }