diff --git a/server/broadcast.go b/server/broadcast.go index e95da3af99..d432810bf2 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -218,7 +218,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] if len(session.SegsInFlight) == 0 { if session.LatencyScore > 0 && session.LatencyScore <= SELECTOR_LATENCY_SCORE_THRESHOLD { clog.PublicInfof(ctx, - "Selecting new orchestrator, reason=%v", + "Reusing Orchestrator, reason=%v", fmt.Sprintf( "performance: no segments in flight, latency score of %v < %v", session.LatencyScore, @@ -228,6 +228,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] return session } + clog.PublicInfof(ctx, + "Swapping Orchestrator, reason=%v", + fmt.Sprintf( + "performance: no segments in flight, latency score of %v < %v", + session.LatencyScore, + durMult, + ), + ) } // A session with segments in flight might be selectable under certain conditions @@ -247,7 +255,7 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] if timeInFlight < maxTimeInFlight { clog.PublicInfof(ctx, - "Selected orchestrator reason=%v", + "Reusing orchestrator reason=%v", fmt.Sprintf( "performance: segments in flight, latency score of %v < %v", session.LatencyScore, @@ -257,6 +265,14 @@ func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude [] return session } + clog.PublicInfof(ctx, + "Swapping Orchestrator, reason=%v", + fmt.Sprintf( + "performance: no segments in flight, latency score of %v < %v", + session.LatencyScore, + durMult, + ), + ) } } return nil diff --git a/server/broadcast_test.go b/server/broadcast_test.go index 2da5ba1119..669ff06313 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -735,7 +735,7 @@ func TestTranscodeSegment_CompleteSession(t *testing.T) { // Create stub server ts, mux := stubTLSServer() defer ts.Close() - transcodeDelay := 100 * time.Millisecond + transcodeDelay := 1500 * time.Millisecond mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) { time.Sleep(transcodeDelay) w.WriteHeader(http.StatusOK) diff --git a/server/segment_rpc.go b/server/segment_rpc.go index b35409d3e1..4b60320a29 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -629,10 +629,13 @@ func SubmitSegment(ctx context.Context, sess *BroadcastSession, seg *stream.HLSS clog.Infof(ctx, "Successfully transcoded segment segName=%s seqNo=%d orch=%s dur=%s", seg.Name, seg.SeqNo, ti.Transcoder, transcodeDur) + // Use 1.5s for segments that are shorter than 1.5s + // Otherwise, the latency score is too high which results in a high number session swaps + segDuration := math.Max(1.5, seg.Duration) return &ReceivedTranscodeResult{ TranscodeData: tdata, Info: tr.Info, - LatencyScore: tookAllDur.Seconds() / seg.Duration, + LatencyScore: tookAllDur.Seconds() / segDuration, }, nil }