Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug logs to investigate crashing fra-ai #3346

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ GO_BUILD_DIR?="./"
MOCKGEN=go run github.com/golang/mock/mockgen
ABIGEN=go run github.com/ethereum/go-ethereum/cmd/abigen

all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench
all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench livepeer_ffmpeg

net/lp_rpc.pb.go: net/lp_rpc.proto
protoc -I=. --go_out=. --go-grpc_out=. $^
Expand Down Expand Up @@ -108,6 +108,8 @@ livepeer_bench:

livepeer_router:
GO111MODULE=on CGO_ENABLED=1 CC="$(cc)" CGO_CFLAGS="$(cgo_cflags)" CGO_LDFLAGS="$(cgo_ldflags) ${CGO_LDFLAGS}" go build -o $(GO_BUILD_DIR) -ldflags="$(ldflags)" cmd/livepeer_router/*.go
livepeer_ffmpeg:
GO111MODULE=on CGO_ENABLED=1 CC="$(cc)" CGO_CFLAGS="$(cgo_cflags)" CGO_LDFLAGS="$(cgo_ldflags) ${CGO_LDFLAGS}" go build -o $(GO_BUILD_DIR) -ldflags="$(ldflags)" cmd/livepeer_ffmpeg/*.go

docker:
docker buildx build --build-arg='BUILD_TAGS=mainnet,experimental' -f docker/Dockerfile .
Expand Down
32 changes: 32 additions & 0 deletions cmd/livepeer_ffmpeg/livepeer_ffmpeg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"fmt"
"os"

"github.com/livepeer/lpms/ffmpeg"
)

func main() {
if len(os.Args) < 3 {
fmt.Println("Usage: livepeer_ffmpeg <input_file> <output_pattern>")
os.Exit(1)
}

Check warning on line 14 in cmd/livepeer_ffmpeg/livepeer_ffmpeg.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer_ffmpeg/livepeer_ffmpeg.go#L10-L14

Added lines #L10 - L14 were not covered by tests

in := os.Args[1]
outFilePattern := os.Args[2]

ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning)
_, err := ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: in,
}, []ffmpeg.TranscodeOptions{{
Oname: outFilePattern,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "segment"},
}})
if err != nil {
fmt.Printf("Failed to run segmentation. in=%s err=%s\n", in, err)
os.Exit(1)
}

Check warning on line 31 in cmd/livepeer_ffmpeg/livepeer_ffmpeg.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer_ffmpeg/livepeer_ffmpeg.go#L16-L31

Added lines #L16 - L31 were not covered by tests
}
4 changes: 3 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ RUN ./install_ffmpeg.sh \

COPY . .

RUN make livepeer livepeer_cli livepeer_bench livepeer_router
RUN make livepeer livepeer_cli livepeer_bench livepeer_router livepeer_ffmpeg

FROM --platform=$TARGETPLATFORM nvidia/cuda:12.0.0-cudnn8-runtime-ubuntu20.04 AS livepeer-amd64-base

Expand All @@ -68,4 +68,6 @@ COPY --from=build /usr/bin/grpc_health_probe /usr/local/bin/grpc_health_probe
COPY --from=build /src/tasmodel.pb /tasmodel.pb
COPY --from=build /usr/share/misc/pci.ids /usr/share/misc/pci.ids

RUN apt update && apt install net-tools lsof -y

ENTRYPOINT ["/usr/local/bin/livepeer"]
14 changes: 12 additions & 2 deletions media/mediamtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"fmt"
"io"
"net/http"
"time"

"github.com/livepeer/go-livepeer/clog"
)
Expand Down Expand Up @@ -64,12 +65,17 @@
return err
}

req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

clog.Infof(ctx, "Sending req to MediaMTX")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)

Check warning on line 72 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L68-L72

Added lines #L68 - L72 were not covered by tests
if err != nil {
return fmt.Errorf("failed to create kick request: %w", err)
}
req.SetBasicAuth(mediaMTXControlUser, mc.apiPassword)
resp, err := http.DefaultClient.Do(req)
clog.Infof(ctx, "Done Sending req to MediaMTX")

Check warning on line 78 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L78

Added line #L78 was not covered by tests
if err != nil {
return fmt.Errorf("failed to kick connection: %w", err)
}
Expand All @@ -85,12 +91,16 @@
if err != nil {
return false, err
}
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)
clog.Infof(context.Background(), "StreamExist: Sending request to MediaMTX, sourceID=%v", mc.sourceID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)

Check warning on line 97 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L94-L97

Added lines #L94 - L97 were not covered by tests
if err != nil {
return false, fmt.Errorf("failed to create get stream request: %w", err)
}
req.SetBasicAuth(mediaMTXControlUser, mc.apiPassword)
resp, err := http.DefaultClient.Do(req)
clog.Infof(context.Background(), "StreamExist: Done Sending request to MediaMTX, sourceID=%v", mc.sourceID)

Check warning on line 103 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L103

Added line #L103 was not covered by tests
if err != nil {
return false, fmt.Errorf("failed to get stream: %w", err)
}
Expand Down
27 changes: 16 additions & 11 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
"log/slog"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/lpms/ffmpeg"
"golang.org/x/sys/unix"
)

Expand All @@ -30,6 +30,12 @@
}

func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
defer func() {
if r := recover(); r != nil {
clog.Warningf(ctx, "Recovered from panic:")
}

Check warning on line 36 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}()

outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts")
completionSignal := make(chan bool, 1)
wg := &sync.WaitGroup{}
Expand All @@ -41,6 +47,7 @@

retryCount := 0
for {
clog.Infof(ctx, "Checking if stream exists, sourceID=%v, retryCount=%v", ms.MediaMTXClient.sourceID, retryCount)

Check warning on line 50 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L50

Added line #L50 was not covered by tests
streamExists, err := ms.MediaMTXClient.StreamExists()
if err != nil {
clog.Errorf(ctx, "StreamExists check failed. err=%s", err)
Expand All @@ -49,18 +56,16 @@
clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err)
break
}
ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning)
_, err = ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: in,
}, []ffmpeg.TranscodeOptions{{
Oname: outFilePattern,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "segment"},
}})
cmd := exec.Command("/usr/local/bin/livepeer_ffmpeg", in, outFilePattern)
_, err = cmd.CombinedOutput()
if err != nil {
clog.Errorf(ctx, "Failed to run segmentation process. in=%s err=%s", in, err)
}
err = cmd.Wait()

Check warning on line 64 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L59-L64

Added lines #L59 - L64 were not covered by tests
if err != nil {
clog.Errorf(ctx, "Failed to run segmentation. in=%s err=%s", in, err)
clog.Errorf(ctx, "Failed to run segmentation process. in=%s err=%s", in, err)

Check warning on line 66 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L66

Added line #L66 was not covered by tests
}
clog.Infof(ctx, "Done with segmentation process")

Check warning on line 68 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L68

Added line #L68 was not covered by tests
retryCount++
time.Sleep(5 * time.Second)
}
Expand Down
Loading