From 3cd05b9b4696e620ccc8b290f8328d470ed4960c Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 11 Aug 2022 17:01:37 -0300 Subject: [PATCH] task/import: Revert the early optimization (#55) * task/import: Revert the early optimization The error group and piped streams were causing more harm than good. Let's simplify the code and see if some of the weirder errors stop happening (like the weird 1 status codes from ffprobe, or the tasks disappearing and restarting in another node) * Use from, to --- task/import.go | 104 +++++++++++++++++++++++----------------------- task/transcode.go | 29 +++++++------ 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/task/import.go b/task/import.go index b4d98bfb..a10e756f 100644 --- a/task/import.go +++ b/task/import.go @@ -14,15 +14,15 @@ import ( api "github.com/livepeer/go-api-client" "github.com/livepeer/go-tools/drivers" "github.com/livepeer/livepeer-data/pkg/data" - "golang.org/x/sync/errgroup" ) func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) { var ( - ctx = tctx.Context - playbackID = tctx.OutputAsset.PlaybackID - params = *tctx.Task.Params.Import - osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage) + ctx = tctx.Context + playbackID = tctx.OutputAsset.PlaybackID + params = *tctx.Task.Params.Import + osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage) + cancelProgress context.CancelFunc ) filename, size, contents, err := getFile(ctx, osSess, params) if err != nil { @@ -30,44 +30,57 @@ func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) { } defer contents.Close() - secondaryReader, pipe := io.Pipe() - mainReader := NewReadCounter(io.TeeReader(contents, pipe)) + defer func() { cancelProgress() }() + measureProgress := func(r io.Reader, from, to float64) *ReadCounter { + if cancelProgress != nil { + cancelProgress() + } + var progressCtx context.Context + progressCtx, cancelProgress = context.WithCancel(ctx) + counter := NewReadCounter(r) + go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, counter.Count, from, to) + return counter + } - progressCtx, cancelProgress := context.WithCancel(ctx) - defer cancelProgress() - go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, mainReader.Count, 0, 0.5) + // Download the file to local disk (or memory). + input := measureProgress(contents, 0, 0.2) + sizeInt := int64(size) + sourceFile, err := readFile(filename, &sizeInt, input) + if err != nil { + return nil, err + } + defer sourceFile.Close() - eg, egCtx := errgroup.WithContext(ctx) - var ( - videoFilePath, metadataFilePath, fullPath string - metadata *FileMetadata - ) - // Probe the source file to retrieve metadata - eg.Go(func() (err error) { - metadata, err = Probe(egCtx, tctx.OutputAsset.ID, filename, mainReader) - pipe.CloseWithError(err) - if err != nil { - return err - } - metadataFilePath, err = saveMetadataFile(egCtx, osSess, playbackID, metadata) - return err - }) - // Save source file to our storage - eg.Go(func() (err error) { - fullPath = videoFileName(playbackID) - videoFilePath, err = osSess.SaveData(egCtx, fullPath, secondaryReader, nil, fileUploadTimeout) - if err != nil { - return fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err) - } - glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath) - return nil - }) - if err := eg.Wait(); err != nil { - // TODO: Delete the source file + // Probe metadata from the source file and save it to object store. + input = measureProgress(sourceFile, 0.2, 0.25) + metadata, err := Probe(ctx, tctx.OutputAsset.ID, filename, input) + if err != nil { + return nil, err + } + metadataFilePath, err := saveMetadataFile(ctx, osSess, playbackID, metadata) + if err != nil { return nil, err } + + // Save source file to object store. + _, err = sourceFile.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("error seeking to start of source file: %w", err) + } + input = measureProgress(sourceFile, 0.25, 0.5) + fullPath := videoFileName(playbackID) + videoFilePath, err := osSess.SaveData(ctx, fullPath, input, nil, fileUploadTimeout) + if err != nil { + return nil, fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err) + } + glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath) cancelProgress() - playbackRecordingID, err := prepareImportedAsset(tctx, metadata, fullPath) + + _, err = sourceFile.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("error seeking to start of source file: %w", err) + } + playbackRecordingID, err := prepareImportedAsset(tctx, metadata, sourceFile) if err != nil { return nil, fmt.Errorf("error preparing asset: %w", err) } @@ -119,23 +132,12 @@ func getFile(ctx context.Context, osSess drivers.OSSession, params api.ImportTas return filename(req, resp), size, resp.Body, nil } -func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, fullPath string) (string, error) { +func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, sourceFile io.ReadSeekCloser) (string, error) { if sessID := tctx.Params.Import.RecordedSessionID; sessID != "" { return sessID, nil } - fileInfoReader, err := tctx.outputOS.ReadData(tctx, fullPath) - if err != nil { - return "", fmt.Errorf("error reading imported file from output OS path=%s err=%w", fullPath, err) - } - defer fileInfoReader.Body.Close() - importedFile, err := readFile(fileInfoReader) - if err != nil { - return "", err - } - defer importedFile.Close() - - playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, importedFile, 0.5) + playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, sourceFile, 0.5) if err != nil { glog.Errorf("Error preparing file assetId=%s taskType=import err=%q", tctx.OutputAsset.ID, err) return "", err diff --git a/task/transcode.go b/task/transcode.go index da938c52..6686d75b 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -13,7 +13,6 @@ import ( "github.com/golang/glog" api "github.com/livepeer/go-api-client" - "github.com/livepeer/go-tools/drivers" "github.com/livepeer/joy4/av" "github.com/livepeer/joy4/av/avutil" "github.com/livepeer/joy4/format" @@ -34,8 +33,8 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func readFileToMemory(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) { - fileInMem, err := io.ReadAll(fir.Body) +func readFileToMemory(r io.Reader) (io.ReadSeekCloser, error) { + fileInMem, err := io.ReadAll(r) if err != nil { return nil, err } @@ -76,20 +75,20 @@ func getTempFile(size int64) (*os.File, error) { return file, nil } -func readFile(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) { - var fileSize int64 - if fir.Size != nil { - fileSize = *fir.Size +func readFile(name string, sizePtr *int64, content io.Reader) (io.ReadSeekCloser, error) { + var size int64 + if sizePtr != nil { + size = *sizePtr } - glog.Infof("Source file name=%s size=%d", fir.Name, fileSize) - if fir.Size != nil && *fir.Size < maxFileSizeForMemory { + glog.Infof("Source file name=%s size=%d", name, size) + if size > 0 && size < maxFileSizeForMemory { // use memory - return readFileToMemory(fir) + return readFileToMemory(content) } - if file, err := getTempFile(fileSize); err != nil { - return readFileToMemory(fir) + if file, err := getTempFile(size); err != nil { + return readFileToMemory(content) } else { - if _, err = file.ReadFrom(fir.Body); err != nil { + if _, err = file.ReadFrom(content); err != nil { file.Close() os.Remove(file.Name()) return nil, err @@ -146,8 +145,8 @@ func TaskTranscode(tctx *TaskContext) (*data.TaskOutput, error) { if err != nil { return nil, fmt.Errorf("error reading data from source OS url=%s err=%w", fullPath, err) } - sourceFile, err := readFile(fir) - fir.Body.Close() + defer fir.Body.Close() + sourceFile, err := readFile(fir.Name, fir.Size, fir.Body) if err != nil { return nil, err }