Skip to content

Commit

Permalink
health: Implement multistream status integrated with stream status (#7)
Browse files Browse the repository at this point in the history
* pkg/data: Store events time as millis, not nano

int64 is not JSON serializable, so we better avoid it

* Create new WebhookEvent type

* pkg/data: Create MultistreamWebhookPayload type

* pkd/data: Make webhook event payload optional

* health: Crete new multistream reducer for ms status

* health: Add multistream reducer to pipeline&health

* health: Use pointers of status to avoid racy copies

@darkdarkdragon pointed out on #6
that a copy is not atomic so could have issues when copying a value
without holding a lock. I don't want to increaase the usage of the lock
either, so decided to change the status to a pointer instead so we will
have a consistent view of the status, not one in between a copy.

Also, this is probably better for perf anyway since we were copying that
big object multiple times on every event processing. Better like this I
guess.

* health: Create root MultistreamHealthy condition

* WIP: Add TODO to fix Status recreation issue

* api: Accept timestamps only in unix milliseconds

(or RFC3339)

* health: Fix all comments from self-review

* pkg/event: Update bindings one existing streams

Now we'll make our first deploy already with
a change to the queue bindings. It already feels
overkill to have to change the stream name to do
that, so I'm  changing the logic on stream_consumer
to always set the bindings on the existing stream,
even if it already existed before. This works more
similarly to how other AMQP clients generally do
this.

* health: Add some helpers for Status immutability

Will reduce the boilerplate a bit. We should keep
iterating on this on the following code changes,
but I believe this is enough for this change.

* reducers: Namespace RealTime and NoErrors conditions

We might want to have other "no errors" and "real time"
flags in the future, so we better not start with too
generic names here.

* docker-compose: Create webhooks exchange as well

* health: Rename MultistreamHealthy to Multistreaming

Consistency with Transcoding condition.

* health: Fix webhook exchange name

* pkg/data: Change ManifestID field to StreamID

* api: Fix health status JSON response
  • Loading branch information
victorges authored Aug 26, 2021
1 parent cfda3ec commit 995038a
Show file tree
Hide file tree
Showing 17 changed files with 321 additions and 111 deletions.
15 changes: 4 additions & 11 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request, pa
respondError(rw, http.StatusInternalServerError, err)
return
}
rw.WriteHeader(http.StatusOK)
if err := json.NewEncoder(rw).Encode(status); err != nil {
glog.Errorf("Error writing stream health JSON response. err=%q", err)
}
respondJson(rw, http.StatusOK, status)
}

func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request, params httprouter.Params) {
Expand Down Expand Up @@ -144,7 +141,7 @@ func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription
func sendEvent(ctx context.Context, dest chan<- jsse.Event, evt data.Event) bool {
sseEvt, err := toSSEEvent(evt)
if err != nil {
glog.Errorf("Skipping bad event due to error converting to SSE. evtID=%q, manifestID=%q, err=%q", evt.ID(), evt.ManifestID(), err)
glog.Errorf("Skipping bad event due to error converting to SSE. evtID=%q, streamID=%q, err=%q", evt.ID(), evt.StreamID(), err)
return true
}
select {
Expand Down Expand Up @@ -177,13 +174,9 @@ func parseInputTimestamp(str string) (*time.Time, error) {
}
ts, unixErr := strconv.ParseInt(str, 10, 64)
if unixErr != nil {
return nil, fmt.Errorf("bad time %q. must be in RFC3339 or Unix Timestamp (sec) formats. rfcErr: %s; unixErr: %s", str, rfcErr, unixErr)
}
if ts > 1e13 {
t = time.Unix(0, ts)
} else {
t = time.Unix(ts, 0)
return nil, fmt.Errorf("bad time %q. must be in RFC3339 or Unix Timestamp (millisecond) formats. rfcErr: %s; unixErr: %s", str, rfcErr, unixErr)
}
t = data.NewUnixMillisTime(ts).Time
return &t, nil
}

Expand Down
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ services:
loopback_users = none
EOF
rabbitmq-plugins enable --offline rabbitmq_stream
bash -c "until rabbitmqadmin --vhost=livepeer declare exchange name=lp_golivepeer_metadata type=topic; do sleep 1; done;" &
bash <<EOF &
until rabbitmqadmin --vhost=livepeer declare exchange name=lp_golivepeer_metadata type=topic && \
rabbitmqadmin --vhost=livepeer declare exchange name=webhook_default_exchange type=topic; do
sleep 1;
done
EOF
docker-entrypoint.sh rabbitmq-server'
healthcheck:
test: rabbitmq-diagnostics check_running
Expand Down
10 changes: 5 additions & 5 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (c *Core) HandleMessage(msg event.StreamMessage) {
}

func (c *Core) handleSingleEvent(evt data.Event) {
mid, ts := evt.ManifestID(), evt.Timestamp()
record := c.storage.GetOrCreate(mid, c.conditionTypes)
streamID, ts := evt.StreamID(), evt.Timestamp()
record := c.storage.GetOrCreate(streamID, c.conditionTypes)

status := record.LastStatus
for i, reducer := range c.reducers {
Expand All @@ -123,15 +123,15 @@ func (c *Core) handleSingleEvent(evt data.Event) {
select {
case subs <- evt:
default:
glog.Warningf("Buffer full for health event subscription, skipping message. manifestId=%q, eventTs=%q", mid, ts)
glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts)
}
}
}

func (c *Core) GetStatus(manifestID string) (Status, error) {
func (c *Core) GetStatus(manifestID string) (*Status, error) {
record, ok := c.storage.Get(manifestID)
if !ok {
return Status{}, ErrStreamNotFound
return nil, ErrStreamNotFound
}
return record.LastStatus, nil
}
Expand Down
6 changes: 3 additions & 3 deletions health/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Record struct {
EventSubs []chan<- data.Event

ReducersState map[int]interface{}
LastStatus Status
LastStatus *Status
}

func NewRecord(id string, conditions []ConditionType) *Record {
Expand All @@ -31,9 +31,9 @@ func NewRecord(id string, conditions []ConditionType) *Record {
disposed: make(chan struct{}),
EventsByID: map[uuid.UUID]data.Event{},
ReducersState: map[int]interface{}{},
LastStatus: Status{
LastStatus: &Status{
ID: id,
Healthy: *NewCondition("", time.Time{}, nil, nil, nil),
Healthy: NewCondition("", time.Time{}, nil, nil, nil),
Conditions: make([]*Condition, len(conditions)),
},
}
Expand Down
6 changes: 3 additions & 3 deletions health/reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
type Reducer interface {
Bindings() []event.BindingArgs
Conditions() []ConditionType
Reduce(current Status, state interface{}, evt data.Event) (Status, interface{})
Reduce(current *Status, state interface{}, evt data.Event) (*Status, interface{})
}

type ReducerFunc func(Status, interface{}, data.Event) (Status, interface{})
type ReducerFunc func(*Status, interface{}, data.Event) (*Status, interface{})

func (f ReducerFunc) Bindings() []event.BindingArgs { return nil }
func (f ReducerFunc) Conditions() []ConditionType { return nil }
func (f ReducerFunc) Reduce(current Status, state interface{}, evt data.Event) (Status, interface{}) {
func (f ReducerFunc) Reduce(current *Status, state interface{}, evt data.Event) (*Status, interface{}) {
return f(current, state, evt)
}
32 changes: 18 additions & 14 deletions health/reducers/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,30 @@ import (
"github.com/livepeer/livepeer-data/pkg/data"
)

var healthyMustHaves = map[health.ConditionType]bool{
ConditionTranscoding: true,
ConditionRealTime: true,
var healthyRequirementDefaults = map[health.ConditionType]bool{
ConditionTranscoding: false,
ConditionTranscodeRealTime: false,
ConditionMultistreaming: true,
}

var HealthReducer = health.ReducerFunc(reduceHealth)

func reduceHealth(current health.Status, _ interface{}, evt data.Event) (health.Status, interface{}) {
healthyMustsCount := 0
func reduceHealth(current *health.Status, _ interface{}, evt data.Event) (*health.Status, interface{}) {
isHealthy := true
for _, cond := range current.Conditions {
if healthyMustHaves[cond.Type] && cond.Status != nil && *cond.Status {
healthyMustsCount++
status, isRequired := healthyRequirementDefaults[cond.Type]
if !isRequired {
continue
}
if cond.Status != nil {
status = *cond.Status
}
if !status {
isHealthy = false
break
}
}
isHealthy := healthyMustsCount == len(healthyMustHaves)
healthyCond := health.NewCondition("", evt.Timestamp(), &isHealthy, nil, &current.Healthy)
healthyCond := health.NewCondition("", evt.Timestamp(), &isHealthy, nil, current.Healthy)

return health.Status{
ID: current.ID,
Healthy: *healthyCond,
Conditions: current.Conditions,
}, nil
return health.NewMergedStatus(current, health.Status{Healthy: healthyCond}), nil
}
111 changes: 111 additions & 0 deletions health/reducers/multistream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package reducers

import (
"encoding/json"
"strings"
"time"

"github.com/golang/glog"
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/event"
)

const (
ConditionMultistreaming health.ConditionType = "Multistreaming"

webhooksExchange = "webhook_default_exchange"
multistreamBindingKey = "events.multistream.#"
)

type MultistreamReducer struct{}

func (t MultistreamReducer) Bindings() []event.BindingArgs {
return []event.BindingArgs{{Exchange: webhooksExchange, Key: multistreamBindingKey}}
}

func (t MultistreamReducer) Conditions() []health.ConditionType {
return []health.ConditionType{ConditionMultistreaming}
}

func (t MultistreamReducer) Reduce(current *health.Status, _ interface{}, evtIface data.Event) (*health.Status, interface{}) {
evt, ok := evtIface.(*data.WebhookEvent)
if !ok {
return current, nil
}
if !strings.HasPrefix(evt.Event, "multistream.") {
return current, nil
}

ts := evt.Timestamp()
var payload data.MultistreamWebhookPayload
if err := json.Unmarshal(evt.Payload, &payload); err != nil {
glog.Errorf("Error parsing multistream webhook payload. err=%q", err)
return current, nil
}
target := payload.Target

multistream := current.MultistreamCopy()
multistream, idx := findOrCreateMultistreamStatus(multistream, target)
if status := connectedStatusFromEvent(evt); status != nil {
currConnected := multistream[idx].Connected
multistream[idx] = &health.MultistreamStatus{
Target: target,
Connected: health.NewCondition("", ts, status, nil, currConnected),
}
}

conditions := current.ConditionsCopy()
for i, cond := range conditions {
if cond.Type == ConditionMultistreaming {
status := allTargetsConnected(multistream)
conditions[i] = health.NewCondition(cond.Type, ts, &status, nil, cond)
}
}

return health.NewMergedStatus(current, health.Status{
Conditions: conditions,
Multistream: multistream,
}), nil
}

func allTargetsConnected(multistream []*health.MultistreamStatus) bool {
for _, ms := range multistream {
if ms.Connected.Status == nil || !*ms.Connected.Status {
return false
}
}
return true
}

func connectedStatusFromEvent(evt *data.WebhookEvent) *bool {
var connected bool
switch evt.Event {
case "multistream.connected":
connected = true
case "multistream.disconnected", "multistream.error":
connected = false
default:
glog.Errorf("Unknown multistream webhook event. event=%q", evt.Event)
return nil
}
return &connected
}

func findOrCreateMultistreamStatus(multistream []*health.MultistreamStatus, target data.MultistreamTargetInfo) ([]*health.MultistreamStatus, int) {
for idx, ms := range multistream {
if targetsEq(ms.Target, target) {
return multistream, idx
}
}

multistream = append(multistream, &health.MultistreamStatus{
Target: target,
Connected: health.NewCondition("", time.Time{}, nil, nil, nil),
})
return multistream, len(multistream) - 1
}

func targetsEq(t1, t2 data.MultistreamTargetInfo) bool {
return t1.Profile == t2.Profile && t1.ID == t2.ID
}
1 change: 1 addition & 0 deletions health/reducers/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
func DefaultPipeline(golpExchange string, shardPrefixes []string) (reducers []health.Reducer, starTimeOffset time.Duration) {
return []health.Reducer{
TranscodeReducer{golpExchange, shardPrefixes},
MultistreamReducer{},
HealthReducer,
StatsReducer(statsWindows),
}, maxStatsWindow
Expand Down
20 changes: 10 additions & 10 deletions health/reducers/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type statsAggrs struct {
}

func StatsReducer(statsWindows []time.Duration) health.ReducerFunc {
return func(current health.Status, stateIface interface{}, evt data.Event) (health.Status, interface{}) {
return func(current *health.Status, stateIface interface{}, evt data.Event) (*health.Status, interface{}) {
var state *statsAggrs
if stateIface != nil {
state = stateIface.(*statsAggrs)
Expand All @@ -26,30 +26,30 @@ func StatsReducer(statsWindows []time.Duration) health.ReducerFunc {
}

ts := evt.Timestamp()
conditions := make([]*health.Condition, len(current.Conditions))
for i, cond := range current.Conditions {
conditions := current.ConditionsCopy()
for i, cond := range conditions {
statsAggr, ok := state.ConditionStats[cond.Type]
if !ok {
statsAggr = stats.WindowAggregators{}
state.ConditionStats[cond.Type] = statsAggr
}
conditions[i] = reduceCondStats(cond, ts, statsAggr, statsWindows)
}
return health.Status{
ID: current.ID,
Healthy: *reduceCondStats(&current.Healthy, ts, state.HealthStats, statsWindows),
healthy := reduceCondStats(current.Healthy, ts, state.HealthStats, statsWindows)

return health.NewMergedStatus(current, health.Status{
Healthy: healthy,
Conditions: conditions,
}, state
}), state
}
}

func reduceCondStats(cond *health.Condition, ts time.Time, statsAggr stats.WindowAggregators, statsWindows []time.Duration) *health.Condition {
if cond.LastProbeTime == nil || *cond.LastProbeTime != ts {
return cond
}
newCond := *cond
newCond.Frequency = statsAggr.Averages(statsWindows, ts, ptrBoolToFloat(cond.Status))
return &newCond
frequency := statsAggr.Averages(statsWindows, ts, ptrBoolToFloat(cond.Status))
return health.NewCondition(cond.Type, ts, cond.Status, frequency, cond)
}

func ptrBoolToFloat(b *bool) *float64 {
Expand Down
30 changes: 12 additions & 18 deletions health/reducers/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

const (
ConditionTranscoding health.ConditionType = "Transcoding"
ConditionRealTime health.ConditionType = "RealTime"
ConditionNoErrors health.ConditionType = "NoErrors"
ConditionTranscoding health.ConditionType = "Transcoding"
ConditionTranscodeRealTime health.ConditionType = "TranscodeRealTime"
ConditionTranscodeNoErrors health.ConditionType = "TranscodeNoErrors"

transcodeBindingKeyFormat = "broadcaster.stream_health.transcode.%s.#"
)

var transcodeConditions = []health.ConditionType{ConditionTranscoding, ConditionRealTime, ConditionNoErrors}
var transcodeConditions = []health.ConditionType{ConditionTranscoding, ConditionTranscodeRealTime, ConditionTranscodeNoErrors}

type TranscodeReducer struct {
GolpExchange string
Expand Down Expand Up @@ -44,38 +44,32 @@ func (t TranscodeReducer) Conditions() []health.ConditionType {
return transcodeConditions
}

func (t TranscodeReducer) Reduce(current health.Status, _ interface{}, evtIface data.Event) (health.Status, interface{}) {
func (t TranscodeReducer) Reduce(current *health.Status, _ interface{}, evtIface data.Event) (*health.Status, interface{}) {
evt, ok := evtIface.(*data.TranscodeEvent)
if !ok {
return current, nil
}

ts := evt.Timestamp()
conditions := make([]*health.Condition, len(current.Conditions))
for i, cond := range current.Conditions {
status := conditionStatus(evt, cond.Type)
if status == nil {
conditions[i] = cond
continue
copy(conditions, current.Conditions)
for i, cond := range conditions {
if status := conditionStatus(evt, cond.Type); status != nil {
conditions[i] = health.NewCondition(cond.Type, ts, status, nil, cond)
}
conditions[i] = health.NewCondition(cond.Type, ts, status, nil, cond)
}

return health.Status{
ID: current.ID,
Healthy: current.Healthy,
Conditions: conditions,
}, nil
return health.NewMergedStatus(current, health.Status{Conditions: conditions}), nil
}

func conditionStatus(evt *data.TranscodeEvent, condType health.ConditionType) *bool {
switch condType {
case ConditionTranscoding:
return &evt.Success
case ConditionRealTime:
case ConditionTranscodeRealTime:
isRealTime := evt.LatencyMs < int64(evt.Segment.Duration*1000)
return &isRealTime
case ConditionNoErrors:
case ConditionTranscodeNoErrors:
noErrors := true
for _, attempt := range evt.Attempts {
noErrors = noErrors && attempt.Error == nil
Expand Down
Loading

0 comments on commit 995038a

Please sign in to comment.