Skip to content

Commit

Permalink
Add -liveAITrickleHostForRunner flag (#3281)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Nov 27, 2024
1 parent 976b115 commit 15a37cf
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 96 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {

// Live AI Media Server:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
cfg.LiveAITrickleHostForRunner = flag.String("liveAITrickleHostForRunner", "", "Trickle Host used by AI Runner; It's used to overwrite the publicly available Trickle Host")

// Onchain:
cfg.EthAcctAddr = flag.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address. For use when multiple ETH accounts exist in the keystore directory")
Expand Down
182 changes: 93 additions & 89 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,95 +78,96 @@ const (
)

type LivepeerConfig struct {
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
AIServiceRegistry *bool
AIWorker *bool
Gateway *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
AIModels *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MaxPricePerCapability *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
HevcDecoding *bool
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerGateway *string
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
AIModelsDir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
LiveAIAuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
KafkaGatewayTopic *string
MediaMTXApiPassword *string
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
AIServiceRegistry *bool
AIWorker *bool
Gateway *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
AIModels *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MaxPricePerCapability *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
HevcDecoding *bool
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerGateway *string
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
AIModelsDir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
LiveAIAuthWebhookURL *string
LiveAITrickleHostForRunner *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
KafkaGatewayTopic *string
MediaMTXApiPassword *string
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -1555,6 +1556,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if cfg.MediaMTXApiPassword != nil {
n.MediaMTXApiPassword = *cfg.MediaMTXApiPassword
}
if cfg.LiveAITrickleHostForRunner != nil {
n.LiveAITrickleHostForRunner = *cfg.LiveAITrickleHostForRunner
}

//Create Livepeer Node

Expand Down
3 changes: 2 additions & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ type LivepeerNode struct {
LivePipelines map[string]*LivePipeline
LiveMu *sync.RWMutex

MediaMTXApiPassword string
MediaMTXApiPassword string
LiveAITrickleHostForRunner string
}

type LivePipeline struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/google/uuid v1.6.0
github.com/jaypipes/ghw v0.10.0
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/ai-worker v0.12.4-0.20241125220901-b9bb93fec45d
github.com/livepeer/ai-worker v0.12.4
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20241122145837-7b07ba3a2204
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-netroute v0.2.0/go.mod h1:Vio7LTzZ+6hoT4CMZi5/6CpY3Snzh2vgZhWgxMNwlQI=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/livepeer/ai-worker v0.12.4-0.20241125220901-b9bb93fec45d h1:4Ab7zR46jOfTX2vPQnrxdO0bJEe4azzhFeoPpMvBYM8=
github.com/livepeer/ai-worker v0.12.4-0.20241125220901-b9bb93fec45d/go.mod h1:pfWCS5v8TIWNImxAZ6ikhiJW9Re88rsDnlW5Ktn7r2k=
github.com/livepeer/ai-worker v0.12.4 h1:RuCZP/JUEOo/q10Ry+s0oOr06DOSnpEDTE6y/NqXFxs=
github.com/livepeer/ai-worker v0.12.4/go.mod h1:pfWCS5v8TIWNImxAZ6ikhiJW9Re88rsDnlW5Ktn7r2k=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b h1:VQcnrqtCA2UROp7q8ljkh2XA/u0KRgVv0S1xoUvOweE=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b/go.mod h1:hwJ5DKhl+pTanFWl+EUpw1H7ukPO/H+MFpgA7jjshzw=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cOQee+WqmaDOgGtP2oDMhcVvR4L0yA=
Expand Down
24 changes: 21 additions & 3 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"fmt"
"image"
"io"
"log/slog"
"mime"
"mime/multipart"
"net/http"
url2 "net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -155,11 +157,12 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
}()

// Prepare request to worker
controlUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, controlUrl)
workerReq := worker.LiveVideoToVideoParams{
ModelId: req.ModelId,
PublishUrl: subUrl,
SubscribeUrl: pubUrl,
ControlUrl: &controlUrl,
PublishUrl: overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl),
SubscribeUrl: overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl),
ControlUrl: &controlUrlOverwrite,
Params: req.Params,
}

Expand Down Expand Up @@ -193,6 +196,21 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
})
}

// overwriteHost is used to overwrite the trickle host, because it may be different for runner
// runner may run inside Docker container, in a different network, or even on a different machine
func overwriteHost(hostOverwrite, url string) string {
if hostOverwrite == "" {
return url
}
u, err := url2.ParseRequestURI(url)
if err != nil {
slog.Warn("Couldn't parse url to overwrite for worker, using original url", "url", url, "err", err)
return url
}
u.Host = hostOverwrite
return u.String()
}

func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, orch Orchestrator, req interface{}) {
payment, err := getPayment(r.Header.Get(paymentHeader))
if err != nil {
Expand Down

0 comments on commit 15a37cf

Please sign in to comment.