Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Livepeer.Cloud SPE - Proposal #2 - Enable Single Orchestrator AI Job Testing Support for Gateway Nodes #3241

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")
cfg.DiscoveryTimeout = flag.Duration("discoveryTimeout", *cfg.DiscoveryTimeout, "Time to wait for orchestrators to return info to be included in transcoding sessions for manifest (default = 500ms)")

cfg.AISessionTimeout = flag.Duration("aiSessionTimeout", *cfg.AISessionTimeout, "The length of time (in seconds) that an AI Session will be cached (default = 600s)")
cfg.WebhookRefreshInterval = flag.Duration("webhookRefreshInterval", *cfg.WebhookRefreshInterval, "The length of time (in seconds) that an Orchestrator Webhook Discovery Request will be cached (default = 60s)")
cfg.AITesterGateway = flag.Bool("aiTesterGateway", *cfg.AITesterGateway, "Set to true to allow the gateway to run in \"tester\" mode. This will bypass caching of AI session selectors.")
// Transcoding:
cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator")
cfg.Transcoder = flag.Bool("transcoder", *cfg.Transcoder, "Set to true to be a transcoder")
Expand Down
18 changes: 15 additions & 3 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,13 @@ type LivepeerConfig struct {
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
WebhookRefreshInterval *time.Duration
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
AISessionTimeout *time.Duration
AITesterGateway *bool
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -204,6 +207,9 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAIModels := ""
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultAISessionTimeout := 10 * time.Minute
defaultWebhookRefreshInterval := 1 * time.Minute
defaultAITesterGateway := false

// Onchain:
defaultEthAcctAddr := ""
Expand Down Expand Up @@ -304,6 +310,8 @@ func DefaultLivepeerConfig() LivepeerConfig {
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AISessionTimeout: &defaultAISessionTimeout,
AITesterGateway: &defaultAITesterGateway,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -357,8 +365,9 @@ func DefaultLivepeerConfig() LivepeerConfig {
FVfailGsKey: &defaultFVfailGsKey,

// API
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,
WebhookRefreshInterval: &defaultWebhookRefreshInterval,

// Versioning constraints
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,
Expand Down Expand Up @@ -1042,6 +1051,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
server.BroadcastCfg.SetCapabilityMaxPrice(cap, p.ModelID, autoCapPrice)
}
}
n.AITesterGateway = *cfg.AITesterGateway
n.AISessionTimeout = *cfg.AISessionTimeout
}

if n.NodeType == core.RedeemerNode {
Expand Down Expand Up @@ -1399,7 +1410,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error setting orch webhook URL ", err)
}
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
glog.Info("Using orchestrator webhook refresh interval ", *cfg.WebhookRefreshInterval)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout, *cfg.WebhookRefreshInterval)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
}
Expand Down
3 changes: 0 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ var SegUploadTimeoutMultiplier = 0.5
// MinSegmentUploadTimeout defines the minimum timeout enforced for uploading a segment to orchestrators
var MinSegmentUploadTimeout = 2 * time.Second

// WebhookDiscoveryRefreshInterval defines for long the Webhook Discovery values should be cached
var WebhookDiscoveryRefreshInterval = 1 * time.Minute

// Max Segment Duration
var MaxDuration = (5 * time.Minute)

Expand Down
2 changes: 2 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type LivepeerNode struct {
// AI worker public fields
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AISessionTimeout time.Duration
AITesterGateway bool

// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Expand Down
35 changes: 31 additions & 4 deletions core/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"crypto/tls"
"fmt"
"net/http"
"os"
"time"

"github.com/livepeer/go-livepeer/clog"
Expand All @@ -20,9 +21,35 @@
return downloadDataHTTP(ctx, uri)
}

var httpc = &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
Timeout: common.HTTPTimeout / 2,
var osHttpClient = getHTTPClient()

// getHTTPClient creates an HTTP client with a timeout based on an environment variable or defaults to common.HTTPTimeout/2
func getHTTPClient() *http.Client {
// Get the timeout value from the environment variable
timeoutStr := os.Getenv("LIVEPEER_OS_HTTP_TIMEOUT")

// Define a default timeout value as common.HTTPTimeout / 2
defaultTimeout := common.HTTPTimeout / 2

var timeout time.Duration
var err error

// If the environment variable is set, attempt to parse it
if timeoutStr != "" {
timeout, err = time.ParseDuration(timeoutStr)
if err != nil {
timeout = defaultTimeout
}
} else {
// If the environment variable is not set, use the default timeout
timeout = defaultTimeout
}

// Return the HTTP client with the calculated timeout
return &http.Client{
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},

Check failure

Code scanning / CodeQL

Disabled TLS certificate check High

InsecureSkipVerify should not be used in production code.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikezupper Why are we skipping TLS here?

Timeout: timeout,
}
}

func FromNetOsInfo(os *net.OSInfo) *drivers.OSInfo {
Expand Down Expand Up @@ -76,7 +103,7 @@
func downloadDataHTTP(ctx context.Context, uri string) ([]byte, error) {
clog.V(common.VERBOSE).Infof(ctx, "Downloading uri=%s", uri)
started := time.Now()
resp, err := httpc.Get(uri)
resp, err := osHttpClient.Get(uri)

Check failure

Code scanning / CodeQL

Uncontrolled data used in network request Critical

The
URL
of this request depends on a
user-provided value
.
if err != nil {
clog.Errorf(ctx, "Error getting HTTP uri=%s err=%q", uri, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

// assert created webhook pool is correct length
whURL, _ := url.ParseRequestURI("https://livepeer.live/api/orchestrator")
whpool := NewWebhookPool(nil, whURL, 500*time.Millisecond)
whpool := NewWebhookPool(nil, whURL, 500*time.Millisecond, 1*time.Minute)
assert.Equal(3, whpool.Size())

// assert that list is not refreshed if lastRequest is less than 1 min ago and hash is the same
Expand Down
28 changes: 15 additions & 13 deletions discovery/wh_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ type webhookResponse struct {
}

type webhookPool struct {
pool *orchestratorPool
callback *url.URL
responseHash ethcommon.Hash
lastRequest time.Time
mu *sync.RWMutex
bcast common.Broadcaster
discoveryTimeout time.Duration
pool *orchestratorPool
callback *url.URL
responseHash ethcommon.Hash
lastRequest time.Time
mu *sync.RWMutex
bcast common.Broadcaster
discoveryTimeout time.Duration
webhookRefreshInterval time.Duration
}

func NewWebhookPool(bcast common.Broadcaster, callback *url.URL, discoveryTimeout time.Duration) *webhookPool {
func NewWebhookPool(bcast common.Broadcaster, callback *url.URL, discoveryTimeout time.Duration, webhookRefreshInterval time.Duration) *webhookPool {
p := &webhookPool{
callback: callback,
mu: &sync.RWMutex{},
bcast: bcast,
discoveryTimeout: discoveryTimeout,
callback: callback,
mu: &sync.RWMutex{},
bcast: bcast,
discoveryTimeout: discoveryTimeout,
webhookRefreshInterval: webhookRefreshInterval,
}
go p.getInfos()
return p
Expand All @@ -48,7 +50,7 @@ func (w *webhookPool) getInfos() ([]common.OrchestratorLocalInfo, error) {
w.mu.RUnlock()

// retrive addrs from cache if time since lastRequest is less than the refresh interval
if time.Since(lastReq) < common.WebhookDiscoveryRefreshInterval {
if time.Since(lastReq) < w.webhookRefreshInterval {
return pool.GetInfos(), nil
}

Expand Down
47 changes: 29 additions & 18 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,20 @@ func (sel *AISessionSelector) getSessions(ctx context.Context) ([]*BroadcastSess
}

type AISessionManager struct {
node *core.LivepeerNode
selectors map[string]*AISessionSelector
mu sync.Mutex
ttl time.Duration
node *core.LivepeerNode
selectors map[string]*AISessionSelector
mu sync.Mutex
ttl time.Duration
testerGatewayEnabled bool
}

func NewAISessionManager(node *core.LivepeerNode, ttl time.Duration) *AISessionManager {
func NewAISessionManager(node *core.LivepeerNode, ttl time.Duration, testerGatewayEnabled bool) *AISessionManager {
return &AISessionManager{
node: node,
selectors: make(map[string]*AISessionSelector),
mu: sync.Mutex{},
ttl: ttl,
node: node,
selectors: make(map[string]*AISessionSelector),
mu: sync.Mutex{},
ttl: ttl,
testerGatewayEnabled: testerGatewayEnabled,
}
}

Expand Down Expand Up @@ -400,18 +402,27 @@ func (c *AISessionManager) getSelector(ctx context.Context, cap core.Capability,
c.mu.Lock()
defer c.mu.Unlock()

cacheKey := strconv.Itoa(int(cap)) + "_" + modelID
sel, ok := c.selectors[cacheKey]
if !ok {
// Create the selector
var err error
sel, err = NewAISessionSelector(cap, modelID, c.node, c.ttl)
if c.testerGatewayEnabled {
sel, err := NewAISessionSelector(cap, modelID, c.node, c.ttl)
if err != nil {
return nil, err
}
return sel, nil
} else {
cacheKey := strconv.Itoa(int(cap)) + "_" + modelID
sel, ok := c.selectors[cacheKey]
if !ok {
// Create the selector
var err error
sel, err = NewAISessionSelector(cap, modelID, c.node, c.ttl)
if err != nil {
return nil, err
}

c.selectors[cacheKey] = sel
}

c.selectors[cacheKey] = sel
}
return sel, nil

return sel, nil
}
}
60 changes: 60 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,31 @@ func getAvailableTranscodingOptionsHandler() http.Handler {
})
}

func (s *LivepeerServer) getOrchestratorAICapabilitiesHandler() http.Handler {
// Define the data fetch function
fetchFunc := func() (*OrchestratorAICapabilitiesManager, error) {
networkCapsMgr, err := buildOrchestratorAICapabilitiesManager(s.LivepeerNode)
if err != nil {
return nil, fmt.Errorf(`failed to fetch orch AI capabilities: %v`, err.Error())
}
return networkCapsMgr, nil
}

// Initialize the cache with a TTL
cacheTTL := 120 * time.Second
networkCapsMgrCache := NewCache(cacheTTL, fetchFunc)

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Get data from cache (or fetch if stale)
networkCapsMgr, err := networkCapsMgrCache.GetCache()
if err != nil {
respond500(w, err.Error())
return
}
respondJson(w, networkCapsMgr)
})
}

// Rounds
func currentRoundHandler(client eth.LivepeerEthClient) http.Handler {
return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1634,3 +1659,38 @@ func mustHaveDb(db interface{}, h http.Handler) http.Handler {
h.ServeHTTP(w, r)
})
}

// Cache struct to hold the Cacheable output and expiration time
type Cache[T any] struct {
Data T
Expiration time.Time
TTL time.Duration
fetchFunc func() (T, error) // Function to fetch data when cache is stale
}

// NewCache initializes the cache with a TTL and fetch function
func NewCache[T any](ttl time.Duration, fetchFunc func() (T, error)) *Cache[T] {
return &Cache[T]{
TTL: ttl,
fetchFunc: fetchFunc,
}
}

// GetCache returns the cached string if it's still valid; otherwise, it fetches new data
func (c *Cache[T]) GetCache() (T, error) {
var zeroValue T
// Check if cache is still valid
if time.Now().Before(c.Expiration) {
return c.Data, nil
}
// Cache is stale; fetch new data
newData, err := c.fetchFunc()
if err != nil {
return zeroValue, err
}

// Update cache with the new data and set expiration
c.Data = newData
c.Expiration = time.Now().Add(c.TTL)
return c.Data, nil
}
4 changes: 1 addition & 3 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ const StreamKeyBytes = 6
const SegLen = 2 * time.Second
const BroadcastRetry = 15 * time.Second

const AISessionManagerTTL = 10 * time.Minute

var BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P240p30fps4x3, ffmpeg.P360p30fps16x9}

var AuthWebhookURL *url.URL
Expand Down Expand Up @@ -188,7 +186,7 @@ func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bo
rtmpConnections: make(map[core.ManifestID]*rtmpConnection),
internalManifests: make(map[core.ManifestID]core.ManifestID),
recordingsAuthResponses: cache.New(time.Hour, 2*time.Hour),
AISessionManager: NewAISessionManager(lpNode, AISessionManagerTTL),
AISessionManager: NewAISessionManager(lpNode, lpNode.AISessionTimeout, lpNode.AITesterGateway),
}
if lpNode.NodeType == core.BroadcasterNode && httpIngest {
opts.HttpMux.HandleFunc("/live/", ls.HandlePush)
Expand Down
Loading
Loading