diff --git a/task/import.go b/task/import.go index b4d98bfb..a10e756f 100644 --- a/task/import.go +++ b/task/import.go @@ -14,15 +14,15 @@ import ( api "github.com/livepeer/go-api-client" "github.com/livepeer/go-tools/drivers" "github.com/livepeer/livepeer-data/pkg/data" - "golang.org/x/sync/errgroup" ) func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) { var ( - ctx = tctx.Context - playbackID = tctx.OutputAsset.PlaybackID - params = *tctx.Task.Params.Import - osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage) + ctx = tctx.Context + playbackID = tctx.OutputAsset.PlaybackID + params = *tctx.Task.Params.Import + osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage) + cancelProgress context.CancelFunc ) filename, size, contents, err := getFile(ctx, osSess, params) if err != nil { @@ -30,44 +30,57 @@ func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) { } defer contents.Close() - secondaryReader, pipe := io.Pipe() - mainReader := NewReadCounter(io.TeeReader(contents, pipe)) + defer func() { cancelProgress() }() + measureProgress := func(r io.Reader, from, to float64) *ReadCounter { + if cancelProgress != nil { + cancelProgress() + } + var progressCtx context.Context + progressCtx, cancelProgress = context.WithCancel(ctx) + counter := NewReadCounter(r) + go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, counter.Count, from, to) + return counter + } - progressCtx, cancelProgress := context.WithCancel(ctx) - defer cancelProgress() - go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, mainReader.Count, 0, 0.5) + // Download the file to local disk (or memory). + input := measureProgress(contents, 0, 0.2) + sizeInt := int64(size) + sourceFile, err := readFile(filename, &sizeInt, input) + if err != nil { + return nil, err + } + defer sourceFile.Close() - eg, egCtx := errgroup.WithContext(ctx) - var ( - videoFilePath, metadataFilePath, fullPath string - metadata *FileMetadata - ) - // Probe the source file to retrieve metadata - eg.Go(func() (err error) { - metadata, err = Probe(egCtx, tctx.OutputAsset.ID, filename, mainReader) - pipe.CloseWithError(err) - if err != nil { - return err - } - metadataFilePath, err = saveMetadataFile(egCtx, osSess, playbackID, metadata) - return err - }) - // Save source file to our storage - eg.Go(func() (err error) { - fullPath = videoFileName(playbackID) - videoFilePath, err = osSess.SaveData(egCtx, fullPath, secondaryReader, nil, fileUploadTimeout) - if err != nil { - return fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err) - } - glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath) - return nil - }) - if err := eg.Wait(); err != nil { - // TODO: Delete the source file + // Probe metadata from the source file and save it to object store. + input = measureProgress(sourceFile, 0.2, 0.25) + metadata, err := Probe(ctx, tctx.OutputAsset.ID, filename, input) + if err != nil { + return nil, err + } + metadataFilePath, err := saveMetadataFile(ctx, osSess, playbackID, metadata) + if err != nil { return nil, err } + + // Save source file to object store. + _, err = sourceFile.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("error seeking to start of source file: %w", err) + } + input = measureProgress(sourceFile, 0.25, 0.5) + fullPath := videoFileName(playbackID) + videoFilePath, err := osSess.SaveData(ctx, fullPath, input, nil, fileUploadTimeout) + if err != nil { + return nil, fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err) + } + glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath) cancelProgress() - playbackRecordingID, err := prepareImportedAsset(tctx, metadata, fullPath) + + _, err = sourceFile.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("error seeking to start of source file: %w", err) + } + playbackRecordingID, err := prepareImportedAsset(tctx, metadata, sourceFile) if err != nil { return nil, fmt.Errorf("error preparing asset: %w", err) } @@ -119,23 +132,12 @@ func getFile(ctx context.Context, osSess drivers.OSSession, params api.ImportTas return filename(req, resp), size, resp.Body, nil } -func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, fullPath string) (string, error) { +func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, sourceFile io.ReadSeekCloser) (string, error) { if sessID := tctx.Params.Import.RecordedSessionID; sessID != "" { return sessID, nil } - fileInfoReader, err := tctx.outputOS.ReadData(tctx, fullPath) - if err != nil { - return "", fmt.Errorf("error reading imported file from output OS path=%s err=%w", fullPath, err) - } - defer fileInfoReader.Body.Close() - importedFile, err := readFile(fileInfoReader) - if err != nil { - return "", err - } - defer importedFile.Close() - - playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, importedFile, 0.5) + playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, sourceFile, 0.5) if err != nil { glog.Errorf("Error preparing file assetId=%s taskType=import err=%q", tctx.OutputAsset.ID, err) return "", err diff --git a/task/transcode.go b/task/transcode.go index da938c52..6686d75b 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -13,7 +13,6 @@ import ( "github.com/golang/glog" api "github.com/livepeer/go-api-client" - "github.com/livepeer/go-tools/drivers" "github.com/livepeer/joy4/av" "github.com/livepeer/joy4/av/avutil" "github.com/livepeer/joy4/format" @@ -34,8 +33,8 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func readFileToMemory(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) { - fileInMem, err := io.ReadAll(fir.Body) +func readFileToMemory(r io.Reader) (io.ReadSeekCloser, error) { + fileInMem, err := io.ReadAll(r) if err != nil { return nil, err } @@ -76,20 +75,20 @@ func getTempFile(size int64) (*os.File, error) { return file, nil } -func readFile(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) { - var fileSize int64 - if fir.Size != nil { - fileSize = *fir.Size +func readFile(name string, sizePtr *int64, content io.Reader) (io.ReadSeekCloser, error) { + var size int64 + if sizePtr != nil { + size = *sizePtr } - glog.Infof("Source file name=%s size=%d", fir.Name, fileSize) - if fir.Size != nil && *fir.Size < maxFileSizeForMemory { + glog.Infof("Source file name=%s size=%d", name, size) + if size > 0 && size < maxFileSizeForMemory { // use memory - return readFileToMemory(fir) + return readFileToMemory(content) } - if file, err := getTempFile(fileSize); err != nil { - return readFileToMemory(fir) + if file, err := getTempFile(size); err != nil { + return readFileToMemory(content) } else { - if _, err = file.ReadFrom(fir.Body); err != nil { + if _, err = file.ReadFrom(content); err != nil { file.Close() os.Remove(file.Name()) return nil, err @@ -146,8 +145,8 @@ func TaskTranscode(tctx *TaskContext) (*data.TaskOutput, error) { if err != nil { return nil, fmt.Errorf("error reading data from source OS url=%s err=%w", fullPath, err) } - sourceFile, err := readFile(fir) - fir.Body.Close() + defer fir.Body.Close() + sourceFile, err := readFile(fir.Name, fir.Size, fir.Body) if err != nil { return nil, err }