Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load active incidents gracefully #129

Merged
merged 9 commits into from
Apr 19, 2024
2 changes: 1 addition & 1 deletion .github/workflows/compliance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:
# and this very icinga-notifications are licensed under GPL-2.0.
# https://github.com/Icinga/icingadb/blob/v1.1.1/.github/workflows/compliance/check-licenses.sh
go-licenses check github.com/icinga/icinga-notifications/... \
--allowed_licenses BSD-2-Clause,BSD-3-Clause,GPL-2.0,MIT,MPL-2.0
--allowed_licenses BSD-2-Clause,BSD-3-Clause,GPL-2.0,ISC,MIT,MPL-2.0
62 changes: 62 additions & 0 deletions .github/workflows/tests_with_database.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: Tests with database

on:
push:
branches:
- main
pull_request: {}

jobs:
postgresql:
name: PostgreSQL ${{ matrix.version }}
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
version: ["9.6", "10", "11", "12", "13", "14", "15", "latest"]

env:
NOTIFICATIONS_TESTS_DB_TYPE: pgsql
NOTIFICATIONS_TESTS_DB: notifications
NOTIFICATIONS_TESTS_DB_USER: postgres
NOTIFICATIONS_TESTS_DB_PASSWORD: notifications
NOTIFICATIONS_TESTS_DB_HOST: 127.0.0.1
NOTIFICATIONS_TESTS_DB_PORT: 5432

services:
postgres:
image: postgres:${{ matrix.version }}
env:
POSTGRES_PASSWORD: ${{ env.NOTIFICATIONS_TESTS_DB_PASSWORD }}
POSTGRES_DB: ${{ env.NOTIFICATIONS_TESTS_DB }}
# Wait until postgres becomes ready
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: stable

- name: Checkout code
uses: actions/checkout@v4

- name: Importing Schema
env:
PGPASSWORD: ${{ env.NOTIFICATIONS_TESTS_DB_PASSWORD }}
run: |
psql -U postgres -w -h 127.0.0.1 -d ${{ env.NOTIFICATIONS_TESTS_DB }} < ${{ github.workspace }}/schema/pgsql/schema.sql

- name: Download dependencies
run: go get -v -t -d ./...

- name: Run tests
timeout-minutes: 10
run: go test -v -timeout 5m ./...
48 changes: 0 additions & 48 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

type IncidentRow struct {
ID int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`
}

// TableName implements the contracts.TableNamer interface.
func (i *IncidentRow) TableName() string {
return "incident"
}

// Upsert implements the contracts.Upserter interface.
func (i *IncidentRow) Upsert() interface{} {
return &struct {
Severity event.Severity `db:"severity"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
}{Severity: i.Severity, RecoveredAt: i.RecoveredAt}
}

// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}

i.ID = incidentId
}

return nil
}

// EventRow represents a single incident event database entry.
type EventRow struct {
IncidentID int64 `db:"incident_id"`
Expand Down
71 changes: 23 additions & 48 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ type ruleID = int64
type escalationID = int64

type Incident struct {
Object *object.Object
StartedAt time.Time
RecoveredAt time.Time
Severity event.Severity
Id int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`

Object *object.Object `db:"-"`

EscalationState map[escalationID]*EscalationState
Rules map[ruleID]struct{}
Recipients map[recipient.Key]*RecipientState

incidentRowID int64

// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
//
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
Expand All @@ -53,7 +54,7 @@ type Incident struct {
func NewIncident(
db *icingadb.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger,
) *Incident {
return &Incident{
i := &Incident{
db: db,
Object: obj,
logger: logger,
Expand All @@ -62,6 +63,12 @@ func NewIncident(
Rules: map[ruleID]struct{}{},
Recipients: map[recipient.Key]*RecipientState{},
}

if obj != nil {
i.ObjectID = obj.ID
}

return i
}

func (i *Incident) IncidentObject() *object.Object {
Expand All @@ -73,11 +80,11 @@ func (i *Incident) SeverityString() string {
}

func (i *Incident) String() string {
return fmt.Sprintf("#%d", i.incidentRowID)
return fmt.Sprintf("#%d", i.Id)
}

func (i *Incident) ID() int64 {
return i.incidentRowID
return i.Id
}

func (i *Incident) HasManager() bool {
Expand Down Expand Up @@ -200,7 +207,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

if !i.RecoveredAt.IsZero() {
if !i.RecoveredAt.Time().IsZero() {
// Incident is recovered in the meantime.
return
}
Expand Down Expand Up @@ -288,14 +295,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
causedByHistoryId = historyId

if newSeverity == event.SeverityOK {
i.RecoveredAt = time.Now()
i.RecoveredAt = types.UnixMilli(time.Now())
i.logger.Info("All sources recovered, closing incident")

RemoveCurrent(i.Object)

history := &HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(i.RecoveredAt),
Time: i.RecoveredAt,
Type: Closed,
}

Expand All @@ -322,7 +329,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
}

func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
i.StartedAt = ev.Time
i.StartedAt = types.UnixMilli(ev.Time)
i.Severity = ev.Severity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))
Expand Down Expand Up @@ -422,7 +429,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.timer = nil
}

filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity}
filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity}

var escalations []*rule.Escalation
retryAfter := rule.RetryNever
Expand Down Expand Up @@ -478,7 +485,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.RetriggerEscalations(&event.Event{
Type: event.TypeInternal,
Time: nextEvalAt,
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)),
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())),
})
})
}
Expand Down Expand Up @@ -646,17 +653,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
return nil
}

// RestoreEscalationStateRules restores this incident's rules based on the given escalation states.
func (i *Incident) RestoreEscalationStateRules(states []*EscalationState) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

for _, state := range states {
escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID)
i.Rules[escalation.RuleID] = struct{}{}
}
}

// getRecipientsChannel returns all the configured channels of the current incident and escalation recipients.
func (i *Incident) getRecipientsChannel(t time.Time) contactChannels {
contactChs := make(contactChannels)
Expand Down Expand Up @@ -697,7 +693,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) contactChannels {
func (i *Incident) restoreRecipients(ctx context.Context) error {
contact := &ContactRow{}
var contacts []*ContactRow
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id)
if err != nil {
i.logger.Errorw(
"Failed to restore incident recipients from the database", zap.String("object", i.IncidentObject().DisplayName()),
Expand All @@ -717,27 +713,6 @@ func (i *Incident) restoreRecipients(ctx context.Context) error {
return nil
}

// restoreEscalationsState restores all escalation states matching the current incident id from the database.
// Returns error on database failure.
func (i *Incident) restoreEscalationsState(ctx context.Context) error {
state := &EscalationState{}
var states []*EscalationState
err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
i.logger.Errorw("Failed to restore incident rule escalation states", zap.Error(err))

return errors.New("failed to restore incident rule escalation states")
}

for _, state := range states {
i.EscalationState[state.RuleEscalationID] = state
}

i.RestoreEscalationStateRules(states)

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
Loading
Loading