Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent logging #133

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/channels/email/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (ch *Email) SendNotification(req *plugin.NotificationRequest) error {
}

if len(to) == 0 {
return fmt.Errorf("contact user %s doesn't have an e-mail address", req.Contact.FullName)
return fmt.Errorf("contact user %s does not have an e-mail address", req.Contact.FullName)
}

var msg bytes.Buffer
Expand Down
4 changes: 2 additions & 2 deletions cmd/channels/rocketchat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ch *RocketChat) SendNotification(req *plugin.NotificationRequest) error {
}

if roomId == "" {
return fmt.Errorf("contact user %s doesn't specify a rocketchat channel or username", req.Contact.FullName)
return fmt.Errorf("contact user %s does not specify a rocketchat channel or username", req.Contact.FullName)
}

message := struct {
Expand Down Expand Up @@ -64,7 +64,7 @@ func (ch *RocketChat) SendNotification(req *plugin.NotificationRequest) error {
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(request)
if err != nil {
return fmt.Errorf("error while sending http request to rocketchat server: %s", err)
return fmt.Errorf("error while sending http request to rocketchat server: %w", err)
}

defer resp.Body.Close()
Expand Down
4 changes: 2 additions & 2 deletions internal/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg

sourceIdRaw, sourceIdOk := strings.CutPrefix(user, "source-")
if !sourceIdOk {
logger.Debugw("Cannot extract source ID from HTTP basic auth username", zap.String("user-input", user))
logger.Debugw("Cannot extract source ID from HTTP basic auth username", zap.String("user_input", user))
return nil
}
sourceId, err := strconv.ParseInt(sourceIdRaw, 10, 64)
if err != nil {
logger.Debugw("Cannot convert extracted source Id to int", zap.String("user-input", user), zap.Error(err))
logger.Debugw("Cannot convert extracted source Id to int", zap.String("user_input", user), zap.Error(err))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/filter/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestFilter(t *testing.T) {
assert.Nil(t, err, "There should be no errors but got: %s", err)

expected := &Condition{op: Equal, column: "foo", value: "bar"}
assert.Equal(t, expected, rule, "Parser doesn't parse single condition correctly")
assert.Equal(t, expected, rule, "Parser does not parse single condition correctly")
})

t.Run("UrlEncodedFilterExpression", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/icinga2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (client *Client) startCatchupWorkers(delay time.Duration) (chan *catchupEve
if err != nil && !errors.Is(err, context.Canceled) {
client.Logger.Debugw("Catch-up-phase event worker failed",
zap.Stringer("worker", workerId),
zap.String("object type", objType),
zap.String("object_type", objType),
zap.Error(err))
}
return err
Expand Down Expand Up @@ -531,7 +531,7 @@ func (client *Client) worker() {
client.Logger.Debugw("Event to be replayed is not in cache", zap.Stringer("event", ev))
} else if ev.Time.Before(ts) {
client.Logger.Debugw("Skip replaying outdated Event Stream event", zap.Stringer("event", ev),
zap.Time("event timestamp", ev.Time), zap.Time("cache timestamp", ts))
zap.Time("event_timestamp", ev.Time), zap.Time("cache_timestamp", ts))
break
}

Expand Down
4 changes: 2 additions & 2 deletions internal/icinga2/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ca
var stateChangeEvents, muteEvents, unmuteEvents int
defer func() {
client.Logger.Debugw("Querying API emitted events",
zap.String("object type", objType),
zap.Int("state changes", stateChangeEvents),
zap.String("object_type", objType),
zap.Int("state_changes", stateChangeEvents),
zap.Int("mute_events", muteEvents),
zap.Int("unmute_events", unmuteEvents))
}()
Expand Down
6 changes: 3 additions & 3 deletions internal/icinga2/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (launcher *Launcher) Launch(src *config.Source) {

if !launcher.isReady {
launcher.Logs.GetChildLogger("icinga2").
With(zap.Int64("source-id", src.ID)).
With(zap.Int64("source_id", src.ID)).
Debug("Postponing Event Stream Client Launch as Launcher is not ready yet")
launcher.waitingSources = append(launcher.waitingSources, src)
return
Expand All @@ -57,7 +57,7 @@ func (launcher *Launcher) Ready() {
launcher.isReady = true
for _, src := range launcher.waitingSources {
launcher.Logs.GetChildLogger("icinga2").
With(zap.Int64("source-id", src.ID)).
With(zap.Int64("source_id", src.ID)).
Debug("Launching postponed Event Stream Client")
launcher.launch(src)
}
Expand All @@ -66,7 +66,7 @@ func (launcher *Launcher) Ready() {

// launch a new Icinga 2 Event Stream API Client based on the config.Source configuration.
func (launcher *Launcher) launch(src *config.Source) {
logger := launcher.Logs.GetChildLogger("icinga2").With(zap.Int64("source-id", src.ID))
logger := launcher.Logs.GetChildLogger("icinga2").With(zap.Int64("source_id", src.ID))

if src.Type != config.SourceTypeIcinga2 ||
!src.Icinga2BaseURL.Valid ||
Expand Down
78 changes: 27 additions & 51 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {

tx, err := i.db.BeginTxx(ctx, nil)
if err != nil {
i.logger.Errorw("Can't start a db transaction", zap.Error(err))

return errors.New("can't start a db transaction")
i.logger.Errorw("Cannot start a db transaction", zap.Error(err))
return err
}
defer func() { _ = tx.Rollback() }()

if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil {
i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err))

return errors.New("can't insert event and fetch its ID")
return err
}

isNew := i.StartedAt.Time().IsZero()
Expand All @@ -161,14 +159,13 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
}

if err = i.AddEvent(ctx, tx, ev); err != nil {
i.logger.Errorw("Can't insert incident event to the database", zap.Error(err))

return errors.New("can't insert incident event to the database")
i.logger.Errorw("Cannot insert incident event to the database", zap.Error(err))
return err
}

if err := i.handleMuteUnmute(ctx, tx, ev); err != nil {
i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err))
return errors.New("cannot insert incident muted history")
return err
}

switch ev.Type {
Expand Down Expand Up @@ -214,9 +211,8 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
}

if err = tx.Commit(); err != nil {
i.logger.Errorw("Can't commit db transaction", zap.Error(err))

return errors.New("can't commit db transaction")
i.logger.Errorw("Cannot commit db transaction", zap.Error(err))
return err
}

// We've just committed the DB transaction and can safely update the incident muted flag.
Expand Down Expand Up @@ -263,7 +259,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
}

if err = i.AddEvent(ctx, tx, ev); err != nil {
return fmt.Errorf("can't insert incident event to the database: %w", err)
return fmt.Errorf("cannot insert incident event to the database: %w", err)
}

if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil {
Expand All @@ -276,7 +272,6 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
}

notifications, err = i.generateNotifications(ctx, tx, ev, channels)

return err
})
if err != nil {
Expand Down Expand Up @@ -313,8 +308,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,

if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err))

return errors.New("failed to insert incident severity changed history")
return err
}

if newSeverity == event.SeverityOK {
Expand All @@ -331,9 +325,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err))

return errors.New("can't insert incident closed history to the database")
i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err))
return err
}

if i.timer != nil {
Expand All @@ -344,8 +337,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
i.Severity = newSeverity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Failed to update incident severity", zap.Error(err))

return errors.New("failed to update incident severity")
return err
}

return nil
Expand All @@ -355,9 +347,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,
i.StartedAt = types.UnixMilli(ev.Time)
i.Severity = ev.Severity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))

return errors.New("can't insert incident to the database")
i.logger.Errorw("Cannot insert incident to the database", zap.Error(err))
return err
}

i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message))
Expand All @@ -372,9 +363,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Can't insert incident opened history event", zap.Error(err))

return errors.New("can't insert incident opened history event")
i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err))
return err
}

return nil
Expand Down Expand Up @@ -430,8 +420,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
err = i.AddRuleMatched(ctx, tx, r)
if err != nil {
i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err))

return errors.New("failed to insert incident rule")
return err
}

hr := &HistoryRow{
Expand All @@ -443,8 +432,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64
}
if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err))

return errors.New("failed to insert rule matched incident history")
return err
}
}
}
Expand Down Expand Up @@ -540,8 +528,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even
"Failed to upsert escalation state", zap.Object("rule", r),
zap.Object("escalation", escalation), zap.Error(err),
)

return errors.New("failed to upsert escalation state")
return err
}

hr := &HistoryRow{
Expand All @@ -558,8 +545,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even
"Failed to insert escalation triggered incident history", zap.Object("rule", r),
zap.Object("escalation", escalation), zap.Error(err),
)

return errors.New("failed to insert escalation triggered incident history")
return err
}

if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil {
Expand Down Expand Up @@ -672,23 +658,17 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
i.logger.Errorw(
"Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err),
)

return errors.New("failed to add recipient role changed history")
i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err))
return err
}

cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err := tx.NamedExecContext(ctx, stmt, cr)
if err != nil {
i.logger.Errorw(
"Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err),
)

return errors.New("failed to upsert incident contact")
i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err))
return err
}

return nil
Expand Down Expand Up @@ -747,12 +727,8 @@ func (i *Incident) restoreRecipients(ctx context.Context) error {
var contacts []*ContactRow
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id)
if err != nil {
i.logger.Errorw(
"Failed to restore incident recipients from the database", zap.String("object", i.IncidentObject().DisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)

return errors.New("failed to restore incident recipients")
i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err))
return err
}

recipients := make(map[recipient.Key]*RecipientState)
Expand Down
12 changes: 4 additions & 8 deletions internal/incident/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package incident

import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/event"
Expand Down Expand Up @@ -30,7 +29,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error {
stmt, _ := i.db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
return fmt.Errorf("failed to upsert incident: %w", err)
}
} else {
stmt := utils.BuildInsertStmtWithout(i.db, i, "id")
Expand Down Expand Up @@ -103,8 +102,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru
"Failed to insert recipient role changed incident history", zap.Object("escalation", escalation),
zap.String("recipients", r.String()), zap.Error(err),
)

return errors.New("failed to insert recipient role changed incident history")
return err
}
}
cr.Role = state.Role
Expand All @@ -117,8 +115,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru
"Failed to upsert incident recipient", zap.Object("escalation", escalation),
zap.String("recipient", r.String()), zap.Error(err),
)

return errors.New("failed to upsert incident recipient")
return err
}
}

Expand Down Expand Up @@ -165,8 +162,7 @@ func (i *Incident) generateNotifications(
i.logger.Errorw("Failed to insert incident notification history",
zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.Object.IsMuted()),
zap.Error(err))

return nil, errors.New("cannot insert incident notification history")
return nil, err
}

if !suppress {
Expand Down
14 changes: 7 additions & 7 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ func (l *Listener) Run(ctx context.Context) error {
listenAddr := daemon.Config().Listen
l.logger.Infof("Starting listener on http://%s", listenAddr)
server := &http.Server{
Addr: listenAddr,
Handler: l,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 30 * time.Second,
Addr: listenAddr,
Handler: l,
ReadTimeout: 10 * time.Second,
IdleTimeout: 30 * time.Second,
}

serverErr := make(chan error)
Expand All @@ -86,7 +85,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
msg = fmt.Sprintf(format, a...)
}

logger := l.logger.With(zap.Int("status-code", statusCode), zap.String("msg", msg))
logger := l.logger.With(zap.Int("status_code", statusCode), zap.String("message", msg))
if ev != nil {
logger = logger.With(zap.Stringer("event", ev))
}
Expand Down Expand Up @@ -135,7 +134,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
l.logger.Infow("Processing event", zap.String("event", ev.String()))
err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev)
if err != nil {
abort(http.StatusInternalServerError, &ev, err.Error())
l.logger.Errorw("Failed to successfully process event", zap.Stringer("event", &ev), zap.Error(err))
abort(http.StatusInternalServerError, &ev, "event could not be processed successfully, see server logs for details")
return
}

Expand Down
Loading
Loading