Skip to content

Commit

Permalink
task/runner: Several reliability improvements (#95)
Browse files Browse the repository at this point in the history
* go.mod: Update go-api-client for all retries

* runner: Handle rate limit errors from API and delay task

* runner: Hide catalyst task lost error as well

* runner: Fix log

* go.mod: Update livepeer-data for amqp reliability

* task: Handle panics more gracefully

Move it to the outer function in runner so we
catch errors in the runner code as well

* task: Make sure we publish all events with a separate timeout

we dont want to not sent the publish result because we already
expired the task contexst or something.

* task: Setup dead-lettering for main tasks queue

* go.mod: Update to merged livepeer-data

* Fix test

* OMG add some spacing man

* Stop double wrapping an error

* task: Add migration logic to new queue

* cmd: Make the default value of dead letter empty

Just so the deploys can be safe

* runner: Move cleanup logic to it's own function

* task: Make all AMQP publishes mandatory

* task: Add TODO about handling AMQP returns

* task: Handle some mediaconvert errors

* task: Disable mandatory message if we don't handle them

* runner: Remove some forgotten unused args
  • Loading branch information
victorges authored Dec 7, 2022
1 parent c266111 commit 66ef87d
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 73 deletions.
3 changes: 3 additions & 0 deletions cmd/task-runner/task-runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func parseFlags(build BuildFlags) cliFlags {
fs.StringVar(&cli.runnerOpts.AMQPUri, "amqp-uri", "amqp://guest:guest@localhost:5672/livepeer", "URI for RabbitMQ server to consume from. Specified in the AMQP protocol")
fs.StringVar(&cli.runnerOpts.ExchangeName, "exchange-name", "lp_tasks", "Name of exchange where the task events will be published to")
fs.StringVar(&cli.runnerOpts.QueueName, "queue-name", "lp_runner_task_queue", "Name of task queue to consume from. If it doesn't exist a new queue will be created and bound to the API exchange")
fs.StringVar(&cli.runnerOpts.OldQueueName, "old-queue-name", "", "Name of the old task queue that should be unbound and attempted a clean-up")
fs.StringVar(&cli.runnerOpts.DeadLetter.ExchangeName, "dead-letter-exchange-name", "", "Name of the dead letter exchange to create for tasks that are unprocessable")
fs.StringVar(&cli.runnerOpts.DeadLetter.QueueName, "dead-letter-queue-name", "", "Name of the queue where the dead-lettered tasks should be routed to. This queue is not consumed automatically. If empty, the name will be the same as the dead-letter exchange")
fs.DurationVar(&cli.runnerOpts.MinTaskProcessingTime, "min-task-processing-time", task.DefaultMinTaskProcessingTime, "Minimum time that a task processing must take as rate-limiting strategy. If the task finishes earlier, the runner will wait for the remaining time before starting another task")
fs.DurationVar(&cli.runnerOpts.MaxTaskProcessingTime, "max-task-processing-time", task.DefaultMaxTaskProcessingTime, "Timeout for task processing. If the task is not completed within this time it will be marked as failed")
fs.UintVar(&cli.runnerOpts.MaxConcurrentTasks, "max-concurrent-tasks", task.DefaultMaxConcurrentTasks, "Maximum number of tasks to run simultaneously")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5
github.com/livepeer/go-tools v0.1.0
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/livepeer-data v0.5.2
github.com/livepeer/livepeer-data v0.6.2
github.com/livepeer/stream-tester v0.12.22-0.20220912212136-f2dff6bd9343
github.com/peterbourgon/ff v1.7.1
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ github.com/livepeer/go-tools v0.1.0 h1:GOZwUL2ME8aJOEyusmBrj056NucRFcaGTnTbQ8koB
github.com/livepeer/go-tools v0.1.0/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ=
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 h1:ISkFQYYDgfnM6Go+VyemF66DKFc8kNoI9SwMv7GC9sM=
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07/go.mod h1:RDTLvmm/NJWjzuUpEDyIWmLTqSfpZEcnPnacG8sfh34=
github.com/livepeer/livepeer-data v0.5.2 h1:PS1uXm6VUZdW/gbChGp2jB1r8P5YjWbUJnodJzOuEyY=
github.com/livepeer/livepeer-data v0.5.2/go.mod h1:6xP6P9IzKaenw9jJoxkcZzJx1qSQe3UBYKUBZ61gXl8=
github.com/livepeer/livepeer-data v0.6.2 h1:b/+CppOJsXipt2eBwTBnW1ldGu+2HVVQsV+iq65x73E=
github.com/livepeer/livepeer-data v0.6.2/go.mod h1:6xP6P9IzKaenw9jJoxkcZzJx1qSQe3UBYKUBZ61gXl8=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/livepeer/stream-tester v0.12.22-0.20220912212136-f2dff6bd9343 h1:9mAQMfQGIpv7+0X6Z8+qag6zag+/r+zsqLJGB00n+pc=
Expand Down
4 changes: 2 additions & 2 deletions task/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Prepare(tctx *TaskContext, assetSpec *api.AssetSpec, file io.ReadSeekCloser
if err != nil {
return "", err
}
stream, err := lapi.CreateStreamR(api.CreateStreamReq{Name: streamName, Record: true, Profiles: profiles})
stream, err := lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, Profiles: profiles})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func Prepare(tctx *TaskContext, assetSpec *api.AssetSpec, file io.ReadSeekCloser
glog.V(model.VERBOSE).Infof("Got segment seqNo=%d pts=%s dur=%s data len bytes=%d\n", seg.SeqNo, seg.Pts, seg.Duration, len(seg.Data))
accumulator.Accumulate(uint64(len(seg.Data)))
started := time.Now()
_, err = lapi.PushSegmentR(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution)
_, err = lapi.PushSegment(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution)
if err != nil {
glog.Errorf("Error while segment push for prepare err=%v\n", err)
break
Expand Down
Loading

0 comments on commit 66ef87d

Please sign in to comment.