Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Live Video Payments" #3292

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.LiveAITrickleHostForRunner = flag.String("liveAITrickleHostForRunner", "", "Trickle Host used by AI Runner; It's used to overwrite the publicly available Trickle Host")
cfg.LiveAIAuthApiKey = flag.String("liveAIAuthApiKey", "", "API key to use for Live AI authentication requests")
cfg.LiveAIAuthWebhookURL = flag.String("liveAIAuthWebhookUrl", "", "Live AI RTMP authentication webhook URL")
cfg.LivePaymentInterval = flag.Duration("livePaymentInterval", *cfg.LivePaymentInterval, "Interval to pay process Gateway <> Orchestrator Payments for Live AI Video")

// Onchain:
cfg.EthAcctAddr = flag.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address. For use when multiple ETH accounts exist in the keystore directory")
Expand Down
4 changes: 0 additions & 4 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ type LivepeerConfig struct {
KafkaGatewayTopic *string
MediaMTXApiPassword *string
LiveAIAuthApiKey *string
LivePaymentInterval *time.Duration
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -214,7 +213,6 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second

// Onchain:
defaultEthAcctAddr := ""
Expand Down Expand Up @@ -322,7 +320,6 @@ func DefaultLivepeerConfig() LivepeerConfig {
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -1567,7 +1564,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if cfg.LiveAIAuthApiKey != nil {
n.LiveAIAuthApiKey = *cfg.LiveAIAuthApiKey
}
n.LivePaymentInterval = *cfg.LivePaymentInterval
if cfg.LiveAITrickleHostForRunner != nil {
n.LiveAITrickleHostForRunner = *cfg.LiveAITrickleHostForRunner
}
Expand Down
1 change: 0 additions & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ type LivepeerNode struct {
MediaMTXApiPassword string
LiveAITrickleHostForRunner string
LiveAIAuthApiKey string
LivePaymentInterval time.Duration
}

type LivePipeline struct {
Expand Down
64 changes: 3 additions & 61 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,6 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
controlUrl = pubUrl + "-control"
)

// Handle initial payment, the rest of the payments are done separately from the stream processing
// Note that this payment is debit from the balance and acts as a buffer for the AI Realtime Video processing
payment, err := getPayment(r.Header.Get(paymentHeader))
if err != nil {
respondWithError(w, err.Error(), http.StatusPaymentRequired)
return
}
sender := getPaymentSender(payment)
_, ctx, err = verifySegCreds(ctx, h.orchestrator, r.Header.Get(segmentHeader), sender)
if err != nil {
respondWithError(w, err.Error(), http.StatusForbidden)
return
}
if err := orch.ProcessPayment(ctx, payment, core.ManifestID(mid)); err != nil {
respondWithError(w, err.Error(), http.StatusBadRequest)
return
}
if payment.GetExpectedPrice().GetPricePerUnit() > 0 && !orch.SufficientBalance(sender, core.ManifestID(mid)) {
respondWithError(w, "Insufficient balance", http.StatusBadRequest)
return
}

//If successful, then create the trickle channels
// Precreate the channels to avoid race conditions
// TODO get the expected mime type from the request
Expand All @@ -164,39 +142,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
controlPubCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-control", "application/json")
controlPubCh.CreateChannel()

// Start payment receiver which accounts the payments and stops the stream if the payment is insufficient
priceInfo, err := h.orchestrator.PriceInfo(sender, core.ManifestID(mid))
if err != nil {
respondWithError(w, err.Error(), http.StatusInternalServerError)
return

}
var paymentProcessor *LivePaymentProcessor
ctx, cancel := context.WithCancel(context.Background())
if priceInfo != nil {
paymentReceiver := livePaymentReceiver{orchestrator: h.orchestrator}
accountPaymentFunc := func(inPixels int64) error {
err := paymentReceiver.AccountPayment(context.Background(), &SegmentInfoReceiver{
sender: sender,
inPixels: inPixels,
priceInfo: priceInfo,
sessionID: mid,
})
if err != nil {
slog.Warn("Error accounting payment, stopping stream processing", "err", err)
pubCh.Close()
subCh.Close()
controlPubCh.Close()
cancel()
}
return err
}
paymentProcessor = NewLivePaymentProcessor(ctx, h.node.LivePaymentInterval, accountPaymentFunc)
} else {
clog.Warningf(ctx, "No price info found for model %v, Orchestrator will not charge for video processing", modelID)
}

// Subscribe to the publishUrl for payments monitoring and payment processing
// Subscribe to the publishUrl for payments monitoring
go func() {
sub := trickle.NewLocalSubscriber(h.trickleSrv, mid)
for {
Expand All @@ -205,11 +151,8 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
clog.Infof(ctx, "Error getting local trickle segment err=%v", err)
return
}
reader := segment.Reader
if paymentProcessor != nil {
reader = paymentProcessor.process(segment.Reader)
}
io.Copy(io.Discard, reader)
// We can do something with the segment data here
io.Copy(io.Discard, segment.Reader)
}
}()

Expand All @@ -233,7 +176,6 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
pubCh.Close()
subCh.Close()
controlPubCh.Close()
cancel()
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
31 changes: 2 additions & 29 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,23 @@ import (
"github.com/livepeer/go-livepeer/trickle"
)

func startTricklePublish(url *url.URL, params aiRequestParams, sess *AISession) {
func startTricklePublish(url *url.URL, params aiRequestParams) {
publisher, err := trickle.NewTricklePublisher(url.String())
if err != nil {
slog.Info("error publishing trickle", "err", err)
}

// Start payments which probes a segment every "paymentProcessInterval" and sends a payment
ctx, cancel := context.WithCancel(context.Background())
priceInfo := sess.OrchestratorInfo.PriceInfo
var paymentProcessor *LivePaymentProcessor
if priceInfo != nil {
paymentSender := livePaymentSender{}
sendPaymentFunc := func(inPixels int64) error {
return paymentSender.SendPayment(context.Background(), &SegmentInfoSender{
sess: sess.BroadcastSession,
inPixels: inPixels,
priceInfo: priceInfo,
mid: extractMid(url.Path),
})
}
paymentProcessor = NewLivePaymentProcessor(ctx, params.liveParams.paymentProcessInterval, sendPaymentFunc)
} else {
clog.Warningf(ctx, "No price info found from Orchestrator, Gateway will not send payments for the video processing")
}

params.liveParams.segmentReader.SwitchReader(func(reader io.Reader) {
// check for end of stream
if _, eos := reader.(*media.EOSReader); eos {
if err := publisher.Close(); err != nil {
slog.Info("Error closing trickle publisher", "err", err)
}
cancel()
return
}
go func() {
clog.V(8).Infof(context.Background(), "publishing trickle. url=%s", url.Redacted())

r := reader
if paymentProcessor != nil {
r = paymentProcessor.process(reader)
}

// TODO this blocks! very bad!
if err := publisher.Write(r); err != nil {
if err := publisher.Write(reader); err != nil {
slog.Info("Error writing to trickle publisher", "err", err)
}
}()
Expand Down
20 changes: 2 additions & 18 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ type liveRequestParams struct {
segmentReader *media.SwitchableSegmentReader
outputRTMPURL string
stream string

paymentProcessInterval time.Duration
}

// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request.
Expand Down Expand Up @@ -1006,8 +1004,6 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
return &res, nil
}

const initPixelsToPay = 30 * 30 * 1280 * 720 // 30 seconds, 30fps, 720p

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) {
client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
Expand All @@ -1016,11 +1012,9 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
}
return nil, err
}
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, initPixelsToPay)
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)

// Send request to orchestrator
resp, err := client.GenLiveVideoToVideoWithResponse(ctx, req, setHeaders)
resp, err := client.GenLiveVideoToVideoWithResponse(ctx, req)
if err != nil {
return nil, err
}
Expand All @@ -1039,23 +1033,13 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
if err != nil {
return nil, fmt.Errorf("invalid control URL: %w", err)
}
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s", pub, sub, control)
startTricklePublish(pub, params, sess)
startTricklePublish(pub, params)
startTrickleSubscribe(ctx, sub, params)
startControlPublish(control, params)
}
return resp, nil
}

// extractMid extracts the mid (manifest ID) from the publish URL
// e.g. public URL passed from orchestrator: /live/manifest/123456, then mid is 123456
// we can consider improving it and passing mid directly in the JSON response from Orchestrator,
// but currently it would require changing the OpenAPI schema in livepeer/ai-worker repo
func extractMid(path string) string {
pubSplit := strings.Split(path, "/")
return pubSplit[len(pubSplit)-1]
}

func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 {
if tokensUsed <= 0 {
return 0
Expand Down
17 changes: 4 additions & 13 deletions server/live_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type SegmentInfoSender struct {
sess *BroadcastSession
inPixels int64
priceInfo *net.PriceInfo
mid string
}

type SegmentInfoReceiver struct {
Expand All @@ -47,29 +46,25 @@ type LivePaymentReceiver interface {
}

type livePaymentSender struct {
segmentsToPayUpfront int64
}

type livePaymentReceiver struct {
orchestrator Orchestrator
}

func (r *livePaymentSender) SendPayment(ctx context.Context, segmentInfo *SegmentInfoSender) error {
if segmentInfo.priceInfo == nil || segmentInfo.priceInfo.PricePerUnit == 0 {
clog.V(common.DEBUG).Infof(ctx, "Skipping sending payment, priceInfo not set for requestID=%s, ", segmentInfo.mid)
return nil
}
sess := segmentInfo.sess

if err := refreshSessionIfNeeded(ctx, sess); err != nil {
return err
}
sess.lock.Lock()
sess.Params.ManifestID = core.ManifestID(segmentInfo.mid)
sess.lock.Unlock()

fee := calculateFee(segmentInfo.inPixels, segmentInfo.priceInfo)

balUpdate, err := newBalanceUpdate(sess, fee)
// We pay a few segments upfront to avoid race condition between payment and segment processing
minCredit := new(big.Rat).Mul(fee, new(big.Rat).SetInt64(r.segmentsToPayUpfront))
balUpdate, err := newBalanceUpdate(sess, minCredit)
if err != nil {
return err
}
Expand Down Expand Up @@ -140,10 +135,6 @@ func (r *livePaymentSender) SendPayment(ctx context.Context, segmentInfo *Segmen

func (r *livePaymentReceiver) AccountPayment(
ctx context.Context, segmentInfo *SegmentInfoReceiver) error {
if segmentInfo.priceInfo == nil || segmentInfo.priceInfo.PricePerUnit == 0 {
clog.V(common.DEBUG).Infof(ctx, "Skipping accounting, priceInfo not set for sessionID=%s, ", segmentInfo.sessionID)
return nil
}
fee := calculateFee(segmentInfo.inPixels, segmentInfo.priceInfo)

balance := r.orchestrator.Balance(segmentInfo.sender, core.ManifestID(segmentInfo.sessionID))
Expand Down
Loading
Loading