Skip to content

Commit

Permalink
task/import: Revert the early optimization (#55)
Browse files Browse the repository at this point in the history
* task/import: Revert the early optimization

The error group and piped streams were causing more
harm than good. Let's simplify the code and see if
some of the weirder errors stop happening (like the
weird 1 status codes from ffprobe, or the tasks
disappearing and restarting in another node)

* Use from, to
  • Loading branch information
victorges authored Aug 11, 2022
1 parent 34d9b37 commit 3cd05b9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 66 deletions.
104 changes: 53 additions & 51 deletions task/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,73 @@ import (
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/livepeer-data/pkg/data"
"golang.org/x/sync/errgroup"
)

func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) {
var (
ctx = tctx.Context
playbackID = tctx.OutputAsset.PlaybackID
params = *tctx.Task.Params.Import
osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage)
ctx = tctx.Context
playbackID = tctx.OutputAsset.PlaybackID
params = *tctx.Task.Params.Import
osSess = tctx.outputOS // Import deals with outputOS only (URL -> ObjectStorage)
cancelProgress context.CancelFunc
)
filename, size, contents, err := getFile(ctx, osSess, params)
if err != nil {
return nil, err
}
defer contents.Close()

secondaryReader, pipe := io.Pipe()
mainReader := NewReadCounter(io.TeeReader(contents, pipe))
defer func() { cancelProgress() }()
measureProgress := func(r io.Reader, from, to float64) *ReadCounter {
if cancelProgress != nil {
cancelProgress()
}
var progressCtx context.Context
progressCtx, cancelProgress = context.WithCancel(ctx)
counter := NewReadCounter(r)
go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, counter.Count, from, to)
return counter
}

progressCtx, cancelProgress := context.WithCancel(ctx)
defer cancelProgress()
go ReportProgress(progressCtx, tctx.lapi, tctx.Task.ID, size, mainReader.Count, 0, 0.5)
// Download the file to local disk (or memory).
input := measureProgress(contents, 0, 0.2)
sizeInt := int64(size)
sourceFile, err := readFile(filename, &sizeInt, input)
if err != nil {
return nil, err
}
defer sourceFile.Close()

eg, egCtx := errgroup.WithContext(ctx)
var (
videoFilePath, metadataFilePath, fullPath string
metadata *FileMetadata
)
// Probe the source file to retrieve metadata
eg.Go(func() (err error) {
metadata, err = Probe(egCtx, tctx.OutputAsset.ID, filename, mainReader)
pipe.CloseWithError(err)
if err != nil {
return err
}
metadataFilePath, err = saveMetadataFile(egCtx, osSess, playbackID, metadata)
return err
})
// Save source file to our storage
eg.Go(func() (err error) {
fullPath = videoFileName(playbackID)
videoFilePath, err = osSess.SaveData(egCtx, fullPath, secondaryReader, nil, fileUploadTimeout)
if err != nil {
return fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err)
}
glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath)
return nil
})
if err := eg.Wait(); err != nil {
// TODO: Delete the source file
// Probe metadata from the source file and save it to object store.
input = measureProgress(sourceFile, 0.2, 0.25)
metadata, err := Probe(ctx, tctx.OutputAsset.ID, filename, input)
if err != nil {
return nil, err
}
metadataFilePath, err := saveMetadataFile(ctx, osSess, playbackID, metadata)
if err != nil {
return nil, err
}

// Save source file to object store.
_, err = sourceFile.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("error seeking to start of source file: %w", err)
}
input = measureProgress(sourceFile, 0.25, 0.5)
fullPath := videoFileName(playbackID)
videoFilePath, err := osSess.SaveData(ctx, fullPath, input, nil, fileUploadTimeout)
if err != nil {
return nil, fmt.Errorf("error uploading file=%q to object store: %w", fullPath, err)
}
glog.Infof("Saved file=%s to url=%s", fullPath, videoFilePath)
cancelProgress()
playbackRecordingID, err := prepareImportedAsset(tctx, metadata, fullPath)

_, err = sourceFile.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("error seeking to start of source file: %w", err)
}
playbackRecordingID, err := prepareImportedAsset(tctx, metadata, sourceFile)
if err != nil {
return nil, fmt.Errorf("error preparing asset: %w", err)
}
Expand Down Expand Up @@ -119,23 +132,12 @@ func getFile(ctx context.Context, osSess drivers.OSSession, params api.ImportTas
return filename(req, resp), size, resp.Body, nil
}

func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, fullPath string) (string, error) {
func prepareImportedAsset(tctx *TaskContext, metadata *FileMetadata, sourceFile io.ReadSeekCloser) (string, error) {
if sessID := tctx.Params.Import.RecordedSessionID; sessID != "" {
return sessID, nil
}

fileInfoReader, err := tctx.outputOS.ReadData(tctx, fullPath)
if err != nil {
return "", fmt.Errorf("error reading imported file from output OS path=%s err=%w", fullPath, err)
}
defer fileInfoReader.Body.Close()
importedFile, err := readFile(fileInfoReader)
if err != nil {
return "", err
}
defer importedFile.Close()

playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, importedFile, 0.5)
playbackRecordingID, err := Prepare(tctx, metadata.AssetSpec, sourceFile, 0.5)
if err != nil {
glog.Errorf("Error preparing file assetId=%s taskType=import err=%q", tctx.OutputAsset.ID, err)
return "", err
Expand Down
29 changes: 14 additions & 15 deletions task/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/golang/glog"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/joy4/av"
"github.com/livepeer/joy4/av/avutil"
"github.com/livepeer/joy4/format"
Expand All @@ -34,8 +33,8 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

func readFileToMemory(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) {
fileInMem, err := io.ReadAll(fir.Body)
func readFileToMemory(r io.Reader) (io.ReadSeekCloser, error) {
fileInMem, err := io.ReadAll(r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -76,20 +75,20 @@ func getTempFile(size int64) (*os.File, error) {
return file, nil
}

func readFile(fir *drivers.FileInfoReader) (io.ReadSeekCloser, error) {
var fileSize int64
if fir.Size != nil {
fileSize = *fir.Size
func readFile(name string, sizePtr *int64, content io.Reader) (io.ReadSeekCloser, error) {
var size int64
if sizePtr != nil {
size = *sizePtr
}
glog.Infof("Source file name=%s size=%d", fir.Name, fileSize)
if fir.Size != nil && *fir.Size < maxFileSizeForMemory {
glog.Infof("Source file name=%s size=%d", name, size)
if size > 0 && size < maxFileSizeForMemory {
// use memory
return readFileToMemory(fir)
return readFileToMemory(content)
}
if file, err := getTempFile(fileSize); err != nil {
return readFileToMemory(fir)
if file, err := getTempFile(size); err != nil {
return readFileToMemory(content)
} else {
if _, err = file.ReadFrom(fir.Body); err != nil {
if _, err = file.ReadFrom(content); err != nil {
file.Close()
os.Remove(file.Name())
return nil, err
Expand Down Expand Up @@ -146,8 +145,8 @@ func TaskTranscode(tctx *TaskContext) (*data.TaskOutput, error) {
if err != nil {
return nil, fmt.Errorf("error reading data from source OS url=%s err=%w", fullPath, err)
}
sourceFile, err := readFile(fir)
fir.Body.Close()
defer fir.Body.Close()
sourceFile, err := readFile(fir.Name, fir.Size, fir.Body)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3cd05b9

Please sign in to comment.