From 5d28982d6c3a8381eb47b4ce1296916e6202e3bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 10 Jan 2025 10:35:05 +0100 Subject: [PATCH 1/4] Add debug logs to investigate crashing fra-ai --- docker/Dockerfile | 2 ++ media/mediamtx.go | 4 ++++ media/rtmp2segment.go | 1 + 3 files changed, 7 insertions(+) diff --git a/docker/Dockerfile b/docker/Dockerfile index 120dc74761..4212746a35 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -68,4 +68,6 @@ COPY --from=build /usr/bin/grpc_health_probe /usr/local/bin/grpc_health_probe COPY --from=build /src/tasmodel.pb /tasmodel.pb COPY --from=build /usr/share/misc/pci.ids /usr/share/misc/pci.ids +RUN apt update && apt install net-tools lsof -y + ENTRYPOINT ["/usr/local/bin/livepeer"] diff --git a/media/mediamtx.go b/media/mediamtx.go index 3588519e2a..6c1b2f95d8 100644 --- a/media/mediamtx.go +++ b/media/mediamtx.go @@ -64,12 +64,14 @@ func (mc *MediaMTXClient) KickInputConnection(ctx context.Context) error { return err } + clog.Infof(ctx, "Sending req to MediaMTX") req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) if err != nil { return fmt.Errorf("failed to create kick request: %w", err) } req.SetBasicAuth(mediaMTXControlUser, mc.apiPassword) resp, err := http.DefaultClient.Do(req) + clog.Infof(ctx, "Done Sending req to MediaMTX") if err != nil { return fmt.Errorf("failed to kick connection: %w", err) } @@ -85,12 +87,14 @@ func (mc *MediaMTXClient) StreamExists() (bool, error) { if err != nil { return false, err } + clog.Infof(context.Background(), "StreamExist: Sending request to MediaMTX, sourceID=%v", mc.sourceID) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) if err != nil { return false, fmt.Errorf("failed to create get stream request: %w", err) } req.SetBasicAuth(mediaMTXControlUser, mc.apiPassword) resp, err := http.DefaultClient.Do(req) + clog.Infof(context.Background(), "StreamExist: Done Sending request to MediaMTX, sourceID=%v", mc.sourceID) if err != nil { return false, fmt.Errorf("failed to get stream: %w", err) } diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index db1d5a8eb0..3467bca749 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -41,6 +41,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen retryCount := 0 for { + clog.Infof(ctx, "Checking if stream exists, sourceID=%v, retryCount=%v", ms.MediaMTXClient.sourceID, retryCount) streamExists, err := ms.MediaMTXClient.StreamExists() if err != nil { clog.Errorf(ctx, "StreamExists check failed. err=%s", err) From 53a8529831b8ed364550251e27b5dfb455ea597d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 10 Jan 2025 10:52:10 +0100 Subject: [PATCH 2/4] Add timeout to MediaMTX calls --- media/mediamtx.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/media/mediamtx.go b/media/mediamtx.go index 6c1b2f95d8..90863b82ce 100644 --- a/media/mediamtx.go +++ b/media/mediamtx.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/livepeer/go-livepeer/clog" ) @@ -64,8 +65,11 @@ func (mc *MediaMTXClient) KickInputConnection(ctx context.Context) error { return err } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + clog.Infof(ctx, "Sending req to MediaMTX") - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) if err != nil { return fmt.Errorf("failed to create kick request: %w", err) } @@ -88,7 +92,9 @@ func (mc *MediaMTXClient) StreamExists() (bool, error) { return false, err } clog.Infof(context.Background(), "StreamExist: Sending request to MediaMTX, sourceID=%v", mc.sourceID) - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil) if err != nil { return false, fmt.Errorf("failed to create get stream request: %w", err) } From d6ee59a7a07e21d0e9cf982bb6dc429c624981d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 10 Jan 2025 10:53:41 +0100 Subject: [PATCH 3/4] Add recover() --- media/rtmp2segment.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index 3467bca749..6285df4d7d 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -30,6 +30,12 @@ type MediaSegmenter struct { } func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) { + defer func() { + if r := recover(); r != nil { + clog.Warningf(ctx, "Recovered from panic:") + } + }() + outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts") completionSignal := make(chan bool, 1) wg := &sync.WaitGroup{} From 2df074b048e0351134d7f13ad8e300da84a991cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 10 Jan 2025 16:37:39 +0100 Subject: [PATCH 4/4] Extract ffmpeg.Transcode3() to a separate process --- Makefile | 4 +++- cmd/livepeer_ffmpeg/livepeer_ffmpeg.go | 32 ++++++++++++++++++++++++++ docker/Dockerfile | 2 +- media/rtmp2segment.go | 20 ++++++++-------- 4 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 cmd/livepeer_ffmpeg/livepeer_ffmpeg.go diff --git a/Makefile b/Makefile index daf0cd94e1..083fb9a710 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ GO_BUILD_DIR?="./" MOCKGEN=go run github.com/golang/mock/mockgen ABIGEN=go run github.com/ethereum/go-ethereum/cmd/abigen -all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench +all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench livepeer_ffmpeg net/lp_rpc.pb.go: net/lp_rpc.proto protoc -I=. --go_out=. --go-grpc_out=. $^ @@ -108,6 +108,8 @@ livepeer_bench: livepeer_router: GO111MODULE=on CGO_ENABLED=1 CC="$(cc)" CGO_CFLAGS="$(cgo_cflags)" CGO_LDFLAGS="$(cgo_ldflags) ${CGO_LDFLAGS}" go build -o $(GO_BUILD_DIR) -ldflags="$(ldflags)" cmd/livepeer_router/*.go +livepeer_ffmpeg: + GO111MODULE=on CGO_ENABLED=1 CC="$(cc)" CGO_CFLAGS="$(cgo_cflags)" CGO_LDFLAGS="$(cgo_ldflags) ${CGO_LDFLAGS}" go build -o $(GO_BUILD_DIR) -ldflags="$(ldflags)" cmd/livepeer_ffmpeg/*.go docker: docker buildx build --build-arg='BUILD_TAGS=mainnet,experimental' -f docker/Dockerfile . diff --git a/cmd/livepeer_ffmpeg/livepeer_ffmpeg.go b/cmd/livepeer_ffmpeg/livepeer_ffmpeg.go new file mode 100644 index 0000000000..4a4463e4e3 --- /dev/null +++ b/cmd/livepeer_ffmpeg/livepeer_ffmpeg.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + "os" + + "github.com/livepeer/lpms/ffmpeg" +) + +func main() { + if len(os.Args) < 3 { + fmt.Println("Usage: livepeer_ffmpeg ") + os.Exit(1) + } + + in := os.Args[1] + outFilePattern := os.Args[2] + + ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning) + _, err := ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{ + Fname: in, + }, []ffmpeg.TranscodeOptions{{ + Oname: outFilePattern, + AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"}, + VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"}, + Muxer: ffmpeg.ComponentOptions{Name: "segment"}, + }}) + if err != nil { + fmt.Printf("Failed to run segmentation. in=%s err=%s\n", in, err) + os.Exit(1) + } +} diff --git a/docker/Dockerfile b/docker/Dockerfile index 4212746a35..4c112c6a7e 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -53,7 +53,7 @@ RUN ./install_ffmpeg.sh \ COPY . . -RUN make livepeer livepeer_cli livepeer_bench livepeer_router +RUN make livepeer livepeer_cli livepeer_bench livepeer_router livepeer_ffmpeg FROM --platform=$TARGETPLATFORM nvidia/cuda:12.0.0-cudnn8-runtime-ubuntu20.04 AS livepeer-amd64-base diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go index 6285df4d7d..8d8ac7ee05 100644 --- a/media/rtmp2segment.go +++ b/media/rtmp2segment.go @@ -11,6 +11,7 @@ import ( "log/slog" "math/rand" "os" + "os/exec" "path/filepath" "strings" "sync" @@ -18,7 +19,6 @@ import ( "time" "github.com/livepeer/go-livepeer/clog" - "github.com/livepeer/lpms/ffmpeg" "golang.org/x/sys/unix" ) @@ -56,18 +56,16 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err) break } - ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning) - _, err = ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{ - Fname: in, - }, []ffmpeg.TranscodeOptions{{ - Oname: outFilePattern, - AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"}, - VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"}, - Muxer: ffmpeg.ComponentOptions{Name: "segment"}, - }}) + cmd := exec.Command("/usr/local/bin/livepeer_ffmpeg", in, outFilePattern) + _, err = cmd.CombinedOutput() if err != nil { - clog.Errorf(ctx, "Failed to run segmentation. in=%s err=%s", in, err) + clog.Errorf(ctx, "Failed to run segmentation process. in=%s err=%s", in, err) } + err = cmd.Wait() + if err != nil { + clog.Errorf(ctx, "Failed to run segmentation process. in=%s err=%s", in, err) + } + clog.Infof(ctx, "Done with segmentation process") retryCount++ time.Sleep(5 * time.Second) }