From 66ef87d7135321eb4fafc8e667a6db092fa77c46 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 7 Dec 2022 20:37:37 -0300 Subject: [PATCH] task/runner: Several reliability improvements (#95) * go.mod: Update go-api-client for all retries * runner: Handle rate limit errors from API and delay task * runner: Hide catalyst task lost error as well * runner: Fix log * go.mod: Update livepeer-data for amqp reliability * task: Handle panics more gracefully Move it to the outer function in runner so we catch errors in the runner code as well * task: Make sure we publish all events with a separate timeout we dont want to not sent the publish result because we already expired the task contexst or something. * task: Setup dead-lettering for main tasks queue * go.mod: Update to merged livepeer-data * Fix test * OMG add some spacing man * Stop double wrapping an error * task: Add migration logic to new queue * cmd: Make the default value of dead letter empty Just so the deploys can be safe * runner: Move cleanup logic to it's own function * task: Make all AMQP publishes mandatory * task: Add TODO about handling AMQP returns * task: Handle some mediaconvert errors * task: Disable mandatory message if we don't handle them * runner: Remove some forgotten unused args --- cmd/task-runner/task-runner.go | 3 + go.mod | 2 +- go.sum | 4 +- task/prepare.go | 4 +- task/runner.go | 216 +++++++++++++++++++++++---------- task/runner_test.go | 67 ++++++++++ task/transcode.go | 4 +- 7 files changed, 227 insertions(+), 73 deletions(-) create mode 100644 task/runner_test.go diff --git a/cmd/task-runner/task-runner.go b/cmd/task-runner/task-runner.go index 1458c704..521b539c 100644 --- a/cmd/task-runner/task-runner.go +++ b/cmd/task-runner/task-runner.go @@ -88,6 +88,9 @@ func parseFlags(build BuildFlags) cliFlags { fs.StringVar(&cli.runnerOpts.AMQPUri, "amqp-uri", "amqp://guest:guest@localhost:5672/livepeer", "URI for RabbitMQ server to consume from. Specified in the AMQP protocol") fs.StringVar(&cli.runnerOpts.ExchangeName, "exchange-name", "lp_tasks", "Name of exchange where the task events will be published to") fs.StringVar(&cli.runnerOpts.QueueName, "queue-name", "lp_runner_task_queue", "Name of task queue to consume from. If it doesn't exist a new queue will be created and bound to the API exchange") + fs.StringVar(&cli.runnerOpts.OldQueueName, "old-queue-name", "", "Name of the old task queue that should be unbound and attempted a clean-up") + fs.StringVar(&cli.runnerOpts.DeadLetter.ExchangeName, "dead-letter-exchange-name", "", "Name of the dead letter exchange to create for tasks that are unprocessable") + fs.StringVar(&cli.runnerOpts.DeadLetter.QueueName, "dead-letter-queue-name", "", "Name of the queue where the dead-lettered tasks should be routed to. This queue is not consumed automatically. If empty, the name will be the same as the dead-letter exchange") fs.DurationVar(&cli.runnerOpts.MinTaskProcessingTime, "min-task-processing-time", task.DefaultMinTaskProcessingTime, "Minimum time that a task processing must take as rate-limiting strategy. If the task finishes earlier, the runner will wait for the remaining time before starting another task") fs.DurationVar(&cli.runnerOpts.MaxTaskProcessingTime, "max-task-processing-time", task.DefaultMaxTaskProcessingTime, "Timeout for task processing. If the task is not completed within this time it will be marked as failed") fs.UintVar(&cli.runnerOpts.MaxConcurrentTasks, "max-concurrent-tasks", task.DefaultMaxConcurrentTasks, "Maximum number of tasks to run simultaneously") diff --git a/go.mod b/go.mod index ca2f3e92..ea285b8b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 github.com/livepeer/go-tools v0.1.0 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 - github.com/livepeer/livepeer-data v0.5.2 + github.com/livepeer/livepeer-data v0.6.2 github.com/livepeer/stream-tester v0.12.22-0.20220912212136-f2dff6bd9343 github.com/peterbourgon/ff v1.7.1 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 112ad78c..313b985d 100644 --- a/go.sum +++ b/go.sum @@ -315,8 +315,8 @@ github.com/livepeer/go-tools v0.1.0 h1:GOZwUL2ME8aJOEyusmBrj056NucRFcaGTnTbQ8koB github.com/livepeer/go-tools v0.1.0/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ= github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 h1:ISkFQYYDgfnM6Go+VyemF66DKFc8kNoI9SwMv7GC9sM= github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07/go.mod h1:RDTLvmm/NJWjzuUpEDyIWmLTqSfpZEcnPnacG8sfh34= -github.com/livepeer/livepeer-data v0.5.2 h1:PS1uXm6VUZdW/gbChGp2jB1r8P5YjWbUJnodJzOuEyY= -github.com/livepeer/livepeer-data v0.5.2/go.mod h1:6xP6P9IzKaenw9jJoxkcZzJx1qSQe3UBYKUBZ61gXl8= +github.com/livepeer/livepeer-data v0.6.2 h1:b/+CppOJsXipt2eBwTBnW1ldGu+2HVVQsV+iq65x73E= +github.com/livepeer/livepeer-data v0.6.2/go.mod h1:6xP6P9IzKaenw9jJoxkcZzJx1qSQe3UBYKUBZ61gXl8= github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU= github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04= github.com/livepeer/stream-tester v0.12.22-0.20220912212136-f2dff6bd9343 h1:9mAQMfQGIpv7+0X6Z8+qag6zag+/r+zsqLJGB00n+pc= diff --git a/task/prepare.go b/task/prepare.go index 93f5cf1c..ca90d2e3 100644 --- a/task/prepare.go +++ b/task/prepare.go @@ -71,7 +71,7 @@ func Prepare(tctx *TaskContext, assetSpec *api.AssetSpec, file io.ReadSeekCloser if err != nil { return "", err } - stream, err := lapi.CreateStreamR(api.CreateStreamReq{Name: streamName, Record: true, Profiles: profiles}) + stream, err := lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, Profiles: profiles}) if err != nil { return "", err } @@ -110,7 +110,7 @@ func Prepare(tctx *TaskContext, assetSpec *api.AssetSpec, file io.ReadSeekCloser glog.V(model.VERBOSE).Infof("Got segment seqNo=%d pts=%s dur=%s data len bytes=%d\n", seg.SeqNo, seg.Pts, seg.Duration, len(seg.Data)) accumulator.Accumulate(uint64(len(seg.Data))) started := time.Now() - _, err = lapi.PushSegmentR(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution) + _, err = lapi.PushSegment(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution) if err != nil { glog.Errorf("Error while segment push for prepare err=%v\n", err) break diff --git a/task/runner.go b/task/runner.go index d4299106..146ceff6 100644 --- a/task/runner.go +++ b/task/runner.go @@ -23,14 +23,19 @@ const ( DefaultMaxTaskProcessingTime = 10 * time.Minute DefaultMinTaskProcessingTime = 5 * time.Second DefaultMaxConcurrentTasks = 3 + taskPublishTimeout = 1 * time.Minute ) -var defaultTasks = map[string]TaskHandler{ - "import": TaskImport, - "upload": TaskUpload, - "export": TaskExport, - "transcode": TaskTranscode, -} +var ( + defaultTasks = map[string]TaskHandler{ + "import": TaskImport, + "upload": TaskUpload, + "export": TaskExport, + "transcode": TaskTranscode, + } + errInternalProcessingError = errors.New("internal error processing file") + taskFatalErrorInfo = &data.ErrorInfo{Message: errInternalProcessingError.Error(), Unretriable: true} +) type TaskHandlerOutput struct { *data.TaskOutput @@ -71,6 +76,10 @@ type Runner interface { type RunnerOptions struct { AMQPUri string ExchangeName, QueueName string + OldQueueName string + DeadLetter struct { + ExchangeName, QueueName string + } MinTaskProcessingTime time.Duration MaxTaskProcessingTime time.Duration @@ -98,6 +107,9 @@ func NewRunner(opts RunnerOptions) Runner { if opts.MaxConcurrentTasks == 0 { opts.MaxConcurrentTasks = DefaultMaxConcurrentTasks } + if opts.DeadLetter.ExchangeName != "" && opts.DeadLetter.QueueName == "" { + opts.DeadLetter.QueueName = opts.DeadLetter.ExchangeName + } return &runner{ RunnerOptions: opts, DelayedExchange: fmt.Sprintf("%s_delayed", opts.ExchangeName), @@ -134,48 +146,88 @@ func (r *runner) Start() error { return fmt.Errorf("error consuming queue: %w", err) } + // TODO: Remove this logic after migration to dead leterred queue + if r.OldQueueName != "" { + go cleanUpOldQueue(r.AMQPUri, r.OldQueueName, r.ExchangeName) + } + r.amqp = amqp return nil } func (r *runner) setupAmqpConnection(c event.AMQPChanSetup) error { - err := c.ExchangeDeclare(r.ExchangeName, "topic", true, false, false, false, nil) - if err != nil { - return fmt.Errorf("error ensuring API exchange exists: %w", err) - } - _, err = c.QueueDeclare(r.QueueName, true, false, false, false, amqp.Table{"x-queue-type": "quorum"}) - if err != nil { - return fmt.Errorf("error declaring task queue: %w", err) - } - err = c.QueueBind(r.QueueName, "task.trigger.#", r.ExchangeName, false, nil) - if err != nil { - return fmt.Errorf("error binding task queue: %w", err) + queueArgs := amqp.Table{"x-queue-type": "quorum"} + if dlx := r.DeadLetter; dlx.ExchangeName != "" { + err := declareQueueAndExchange(c, dlx.ExchangeName, dlx.QueueName, "#", queueArgs) + if err != nil { + return err + } + queueArgs["x-dead-letter-exchange"] = r.DeadLetter.ExchangeName } - err = c.ExchangeDeclare(r.DelayedExchange, "topic", true, false, false, false, nil) + err := declareQueueAndExchange(c, r.ExchangeName, r.QueueName, "task.trigger.#", queueArgs) if err != nil { - return fmt.Errorf("error declaring delayed exchange: %w", err) + return err } - delayedArgs := amqp.Table{ + + queueArgs = amqp.Table{ "x-message-ttl": int32(time.Minute / time.Millisecond), "x-dead-letter-exchange": r.ExchangeName, } - _, err = c.QueueDeclare(r.DelayedExchange, true, false, false, false, delayedArgs) + err = declareQueueAndExchange(c, r.DelayedExchange, r.DelayedExchange, "#", queueArgs) if err != nil { - return fmt.Errorf("error declaring delayed queue: %w", err) + return err } - err = c.QueueBind(r.DelayedExchange, "#", r.DelayedExchange, false, nil) + + if err := c.Qos(int(r.MaxConcurrentTasks), 0, false); err != nil { + return fmt.Errorf("error setting QoS: %w", err) + } + return nil +} + +func cleanUpOldQueue(amqpURI, oldQueue, exchange string) { + defer func() { + if r := recover(); r != nil { + glog.Errorf("Recovered from panic in cleanUpOldQueue: %v", r) + } + }() + + cleanUp := event.NewAMQPConnectFunc(func(c event.AMQPChanSetup) error { + err := c.QueueUnbind(oldQueue, "task.trigger.#", oldQueue, nil) + if err != nil { + glog.Errorf("Error unbinding old queue from exchange queue=%q exchange=%q err=%q", oldQueue, exchange, err) + } + + _, err = c.QueueDelete(oldQueue, false, true, false) + if err != nil { + glog.Errorf("Error deleting old queue=%q err=%q", oldQueue, err) + } + + return nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + cleanUp(ctx, amqpURI, nil, nil) +} + +func declareQueueAndExchange(c event.AMQPChanSetup, exchange, queue, binding string, queueArgs amqp.Table) error { + err := c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil) if err != nil { - return fmt.Errorf("error binding delayed queue: %w", err) + return fmt.Errorf("error declaring exchange %q: %w", exchange, err) } - err = c.Qos(int(r.MaxConcurrentTasks), 0, false) + _, err = c.QueueDeclare(queue, true, false, false, false, queueArgs) if err != nil { - return fmt.Errorf("error setting QoS: %w", err) + return fmt.Errorf("error declaring queue %q: %w", queue, err) + } + err = c.QueueBind(queue, binding, exchange, false, nil) + if err != nil { + return fmt.Errorf("error binding queue %q on %q to exchange %q: %w", queue, binding, exchange, err) } return nil } -func (r *runner) handleAMQPMessage(msg amqp.Delivery) error { +func (r *runner) handleAMQPMessage(msg amqp.Delivery) (err error) { // rate-limit message processing time to limit load defer blockUntil(time.After(r.MinTaskProcessingTime)) @@ -185,8 +237,14 @@ func (r *runner) handleAMQPMessage(msg amqp.Delivery) error { task, err := parseTaskInfo(msg) if err != nil { glog.Errorf("Error parsing AMQP message err=%q msg=%q", err, msg.Body) - return nil + return event.UnprocessableMsgErr(err) } + defer func() { + if rec := recover(); rec != nil { + glog.Errorf("Panic handling task type=%s id=%s step=%q panic=%v stack=%q", task.Type, task.ID, task.Step, rec, debug.Stack()) + err = simplePublishTaskFatalError(r.amqp, r.ExchangeName, task) + } + }() output, err := r.handleTask(ctx, task) if err == nil && output != nil && output.Continue { @@ -196,20 +254,11 @@ func (r *runner) handleAMQPMessage(msg amqp.Delivery) error { glog.Infof("Task handler processed task type=%q id=%s output=%+v error=%q unretriable=%v", task.Type, task.ID, output, err, IsUnretriable(err)) - ctx, cancel = context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() // return the error directly so that if publishing the result fails we nack the message to try again - return r.publishTaskResult(ctx, task, output, err) + return r.publishTaskResult(task, output, err) } func (r *runner) handleTask(ctx context.Context, taskInfo data.TaskInfo) (out *TaskHandlerOutput, err error) { - defer func() { - if r := recover(); r != nil { - glog.Errorf("Panic handling task: value=%q stack:\n%s", r, string(debug.Stack())) - err = UnretriableError{fmt.Errorf("panic handling task: %v", r)} - } - }() - taskCtx, err := r.buildTaskContext(ctx, taskInfo) if err != nil { return nil, fmt.Errorf("error building task context: %w", err) @@ -231,7 +280,10 @@ func (r *runner) handleTask(ctx context.Context, taskInfo data.TaskInfo) (out *T } err = r.lapi.UpdateTaskStatus(taskID, api.TaskPhaseRunning, 0) - if err != nil { + if err == api.ErrRateLimited { + glog.Warningf("Task execution rate limited type=%q id=%s userID=%s", taskType, taskID, taskCtx.UserID) + return nil, r.delayTaskStep(ctx, taskID, taskCtx.Step, taskCtx.StepInput) + } else if err != nil { glog.Errorf("Error updating task progress type=%q id=%s err=%q unretriable=%v", taskType, taskID, err, IsUnretriable(err)) // execute the task anyway } @@ -255,7 +307,7 @@ func parseTaskInfo(msg amqp.Delivery) (data.TaskInfo, error) { } func (r *runner) buildTaskContext(ctx context.Context, info data.TaskInfo) (*TaskContext, error) { - task, err := r.lapi.GetTask(info.ID) + task, err := r.lapi.GetTask(info.ID, true) if err != nil { return nil, err } @@ -275,7 +327,7 @@ func (r *runner) getAssetAndOS(assetID string) (*api.Asset, *api.ObjectStore, dr if assetID == "" { return nil, nil, nil, nil } - asset, err := r.lapi.GetAsset(assetID) + asset, err := r.lapi.GetAsset(assetID, true) if err != nil { return nil, nil, nil, err } @@ -296,8 +348,10 @@ func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptI if err != nil { return fmt.Errorf("failed to get task %s: %w", taskId, err) } + glog.Infof("Received catalyst callback taskType=%q id=%s taskPhase=%s status=%q completionRatio=%v error=%q rawCallback=%+v", task.Type, task.ID, task.Status.Phase, callback.Status, callback.CompletionRatio, callback.Error, *callback) + if task.Status.Phase != api.TaskPhaseRunning && task.Status.Phase != api.TaskPhaseWaiting { return fmt.Errorf("task %s is not running", taskId) @@ -305,6 +359,7 @@ func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptI return fmt.Errorf("outdated catalyst job callback, "+ "task has already been retried (callback: %s current: %s)", attemptID, curr) } + progress := 0.9 * callback.CompletionRatio progress = math.Round(progress*1000) / 1000 currProgress, taskUpdatedAt := task.Status.Progress, data.NewUnixMillisTime(task.Status.UpdatedAt) @@ -314,13 +369,15 @@ func (r *runner) HandleCatalysis(ctx context.Context, taskId, nextStep, attemptI glog.Warningf("Failed to update task progress. taskID=%s err=%v", task.ID, err) } } + if callback.Status == clients.CatalystStatusError { glog.Infof("Catalyst job failed for task type=%q id=%s error=%q unretriable=%v", task.Type, task.ID, callback.Error, callback.Unretriable) err := NewCatalystError(callback.Error, callback.Unretriable) - return r.publishTaskResult(ctx, taskInfo, nil, err) + return r.publishTaskResult(taskInfo, nil, err) } else if callback.Status == clients.CatalystStatusSuccess { - return r.scheduleTaskStep(ctx, task.ID, nextStep, callback) + return r.scheduleTaskStep(task.ID, nextStep, callback) } + return nil } @@ -332,12 +389,12 @@ func (r *runner) delayTaskStep(ctx context.Context, taskID, step string, input i if err != nil { return err } - return r.publishLogged(ctx, task, r.DelayedExchange, + return r.publishLogged(task, r.DelayedExchange, fmt.Sprintf("task.trigger.%s", task.Type), data.NewTaskTriggerEvent(task)) } -func (r *runner) scheduleTaskStep(ctx context.Context, taskID, step string, input interface{}) error { +func (r *runner) scheduleTaskStep(taskID, step string, input interface{}) error { if step == "" { return errors.New("can only schedule sub-steps of tasks") } @@ -346,13 +403,13 @@ func (r *runner) scheduleTaskStep(ctx context.Context, taskID, step string, inpu return err } key, body := fmt.Sprintf("task.trigger.%s", task.Type), data.NewTaskTriggerEvent(task) - if err := r.publishLogged(ctx, task, r.ExchangeName, key, body); err != nil { + if err := r.publishLogged(task, r.ExchangeName, key, body); err != nil { return fmt.Errorf("error publishing task result event: %w", err) } return nil } -func (r *runner) publishTaskResult(ctx context.Context, task data.TaskInfo, output *TaskHandlerOutput, resultErr error) error { +func (r *runner) publishTaskResult(task data.TaskInfo, output *TaskHandlerOutput, resultErr error) error { if r.HumanizeErrors { resultErr = humanizeError(resultErr) } @@ -365,14 +422,14 @@ func (r *runner) publishTaskResult(ctx context.Context, task data.TaskInfo, outp return errors.New("output or resultErr must be non-nil") } key := fmt.Sprintf("task.result.%s.%s", task.Type, task.ID) - if err := r.publishLogged(ctx, task, r.ExchangeName, key, body); err != nil { + if err := r.publishLogged(task, r.ExchangeName, key, body); err != nil { return fmt.Errorf("error publishing task result event: %w", err) } return nil } func (r *runner) getTaskInfo(id, step string, input interface{}) (data.TaskInfo, *api.Task, error) { - task, err := r.lapi.GetTask(id) + task, err := r.lapi.GetTask(id, true) if err != nil { return data.TaskInfo{}, nil, fmt.Errorf("error getting task %q: %w", id, err) } @@ -396,20 +453,8 @@ func (r *runner) getTaskInfo(id, step string, input interface{}) (data.TaskInfo, }, task, nil } -func (r *runner) publishLogged(ctx context.Context, task data.TaskInfo, exchange, key string, body interface{}) error { - msg := event.AMQPMessage{ - Exchange: exchange, - Key: key, - Body: body, - Persistent: true, - WaitResult: true, - } - glog.Infof("Publishing AMQP message. taskType=%q id=%s step=%q exchange=%q key=%q body=%+v", task.Type, task.ID, task.Step, exchange, key, body) - if err := r.amqp.Publish(ctx, msg); err != nil { - glog.Errorf("Error publishing AMQP message. taskType=%q id=%s step=%q exchange=%q key=%q err=%q body=%+v", task.Type, task.ID, task.Step, exchange, key, err, body) - return err - } - return nil +func (r *runner) publishLogged(task data.TaskInfo, exchange, key string, body interface{}) error { + return publishLoggedRaw(r.amqp, task, exchange, key, body) } func (r *runner) Shutdown(ctx context.Context) error { @@ -436,14 +481,18 @@ func humanizeError(err error) error { if errors.As(err, &catErr) { if strings.Contains(errMsg, "unsupported input pixel format") { return errors.New("unsupported input pixel format, must be 'yuv420p' or 'yuvj420p'") + } else if strings.Contains(errMsg, "Unsupported video input") { + return errors.New("unsupported file format") + } else if strings.Contains(errMsg, "ReadPacketData File read failed - end of file hit") { + return errors.New("invalid video file, possibly truncated") } - return errors.New("internal error processing file") + return errInternalProcessingError } if strings.Contains(errMsg, "unexpected eof") { return errors.New("file download failed") } else if strings.Contains(errMsg, "multipartupload: upload multipart failed") { - return errors.New("internal error saving file to storage") + return errors.New("error saving file to storage") } else if strings.Contains(errMsg, "mp4io: parse error") { return UnretriableError{errors.New("file format unsupported, must be a valid MP4")} } @@ -452,10 +501,11 @@ func humanizeError(err error) error { strings.Contains(errMsg, "could not create stream id") || strings.Contains(errMsg, "502 bad gateway") || strings.Contains(errMsg, "task has already been started before") || + strings.Contains(errMsg, "catalyst task lost") || (strings.Contains(errMsg, "eof") && strings.Contains(errMsg, "error processing file")) if isProcessing { - return errors.New("internal error processing file") + return errInternalProcessingError } isTimeout := strings.Contains(errMsg, "context deadline exceeded") || @@ -470,3 +520,37 @@ func humanizeError(err error) error { } func blockUntil(t <-chan time.Time) { <-t } + +// This is a code path to send a task failure event the simplest way possible, +// to be used when handling panics in the task processing code path. It is +// purposedly meant to be a separate flow as much as possible to avoid any +// chance of hitting the same panic again. +func simplePublishTaskFatalError(producer event.AMQPProducer, exchange string, task data.TaskInfo) error { + body := data.NewTaskResultEvent(task, taskFatalErrorInfo, nil) + key := taskResultMessageKey(task.Type, task.ID) + return publishLoggedRaw(producer, task, exchange, key, body) +} + +func publishLoggedRaw(producer event.AMQPProducer, task data.TaskInfo, exchange, key string, body interface{}) error { + ctx, cancel := context.WithTimeout(context.Background(), taskPublishTimeout) + defer cancel() + msg := event.AMQPMessage{ + Exchange: exchange, + Key: key, + Body: body, + Persistent: true, + // TODO: Actually handle returns from the AMQP server so we can toggle mandatory here. Needs further support in pkg/event. + // Mandatory: true, + WaitResult: true, + } + glog.Infof("Publishing AMQP message. taskType=%q id=%s step=%q exchange=%q key=%q body=%+v", task.Type, task.ID, task.Step, exchange, key, body) + if err := producer.Publish(ctx, msg); err != nil { + glog.Errorf("Error publishing AMQP message. taskType=%q id=%s step=%q exchange=%q key=%q err=%q body=%+v", task.Type, task.ID, task.Step, exchange, key, err, body) + return err + } + return nil +} + +func taskResultMessageKey(ttype, id string) string { + return fmt.Sprintf("task.result.%s.%s", ttype, id) +} diff --git a/task/runner_test.go b/task/runner_test.go new file mode 100644 index 00000000..85c6e4cb --- /dev/null +++ b/task/runner_test.go @@ -0,0 +1,67 @@ +package task + +import ( + "context" + "testing" + "time" + + "github.com/livepeer/livepeer-data/pkg/data" + "github.com/livepeer/livepeer-data/pkg/event" + "github.com/stretchr/testify/require" +) + +func TestSimplePublishErrorDoesNotPanic(t *testing.T) { + require := require.New(t) + + var publishedMsg *event.AMQPMessage + producer := func(ctx context.Context, msg event.AMQPMessage) error { + require.Nil(publishedMsg) + publishedMsg = &msg + return nil + } + + exchange, taskInfo := "uniswap", data.TaskInfo{ + ID: "LPT", + Type: "ICO", + Step: "merkle_mine", + } + var err error + require.NotPanics(func() { + err = simplePublishTaskFatalError(producerFunc(producer), exchange, taskInfo) + }) + require.NoError(err) + require.NotNil(publishedMsg) + + require.IsType(&data.TaskResultEvent{}, publishedMsg.Body) + base := publishedMsg.Body.(*data.TaskResultEvent).Base + + require.NotZero(base.ID()) + require.Equal(data.EventTypeTaskResult, base.Type()) + require.LessOrEqual(time.Since(base.Timestamp()), 1*time.Second) + + require.Equal(*publishedMsg, event.AMQPMessage{ + Exchange: "uniswap", + Key: "task.result.ICO.LPT", + Persistent: true, + Mandatory: false, + WaitResult: true, + Body: &data.TaskResultEvent{ + Base: base, + Task: taskInfo, + Error: &data.ErrorInfo{ + Message: "internal error processing file", + Unretriable: true, + }, + }, + }) +} + +type producerFunc func(ctx context.Context, msg event.AMQPMessage) error + +func (f producerFunc) Publish(ctx context.Context, msg event.AMQPMessage) error { + return f(ctx, msg) +} + +func (f producerFunc) Shutdown(ctx context.Context) error { + return nil +} diff --git a/task/transcode.go b/task/transcode.go index a8dbcd74..3555c049 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -158,7 +158,7 @@ func TaskTranscode(tctx *TaskContext) (*TaskHandlerOutput, error) { streamName := fmt.Sprintf("vod_%s", time.Now().Format("2006-01-02T15:04:05Z07:00")) profiles := []api.Profile{tctx.Params.Transcode.Profile} - stream, err := lapi.CreateStreamR(api.CreateStreamReq{Name: streamName, Profiles: profiles}) + stream, err := lapi.CreateStream(api.CreateStreamReq{Name: streamName, Profiles: profiles}) if err != nil { return nil, err } @@ -201,7 +201,7 @@ out: } glog.V(model.VERBOSE).Infof("Got segment seqNo=%d pts=%s dur=%s data len bytes=%d\n", seg.SeqNo, seg.Pts, seg.Duration, len(seg.Data)) started := time.Now() - transcoded, err = lapi.PushSegmentR(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution) + transcoded, err = lapi.PushSegment(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution) if err != nil { glog.Errorf("Segment push playbackID=%s err=%v\n", inputPlaybackID, err) break