Skip to content

Commit

Permalink
[clientrpc] add tipStream on paymentsServer
Browse files Browse the repository at this point in the history
  • Loading branch information
vctt94 committed Dec 15, 2024
1 parent 70d0a65 commit 5970fc8
Show file tree
Hide file tree
Showing 4 changed files with 510 additions and 225 deletions.
24 changes: 24 additions & 0 deletions client/rpcserver/payments_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type paymentsServer struct {
c *client.Client

tipProgressStreams *serverStreams[*types.TipProgressEvent]
tipStreams *serverStreams[*types.ReceivedTip]
}

func (p *paymentsServer) TipUser(ctx context.Context, req *types.TipUserRequest, _ *types.TipUserResponse) error {
Expand All @@ -58,6 +59,19 @@ func (p *paymentsServer) TipProgress(ctx context.Context, req *types.TipProgress
return p.tipProgressStreams.runStream(ctx, req.UnackedFrom, stream)
}

func (p *paymentsServer) TipStream(ctx context.Context, req *types.TipStreamRequest, stream types.PaymentsService_TipStreamServer) error {
return p.tipStreams.runStream(ctx, req.UnackedFrom, stream)
}

// tipNtfnHandler is called by the client when a tip arrived from a remote user.
func (p *paymentsServer) tipNtfnHandler(ru *client.RemoteUser, amtMAtoms int64) {
ntfn := &types.ReceivedTip{
Uid: ru.ID().Bytes(),
AmountMatoms: amtMAtoms,
}
p.tipStreams.send(ntfn)
}

func (p *paymentsServer) tipProgressNtfnHandler(ru *client.RemoteUser, amtMAtoms int64, completed bool, attempt int, attemptErr error, willRetry bool) {
var attemptErrMsg string
if attemptErr != nil {
Expand All @@ -79,9 +93,14 @@ func (p *paymentsServer) AckTipProgress(_ context.Context, req *types.AckRequest
return p.tipProgressStreams.ack(req.SequenceId)
}

func (p *paymentsServer) AckTipReceived(_ context.Context, req *types.AckRequest, _ *types.AckResponse) error {
return p.tipStreams.ack(req.SequenceId)
}

func (p *paymentsServer) registerOfflineMessageStorageHandlers() {
nmgr := p.c.NotificationManager()
nmgr.RegisterSync(client.OnTipAttemptProgressNtfn(p.tipProgressNtfnHandler))
nmgr.RegisterSync(client.OnTipReceivedNtfn(p.tipNtfnHandler))
}

var _ types.PaymentsServiceServer = (*paymentsServer)(nil)
Expand All @@ -92,13 +111,18 @@ func (s *Server) InitPaymentsService(cfg PaymentsServerCfg) error {
if err != nil {
return err
}
tipStreams, err := newServerStreams[*types.ReceivedTip](cfg.RootReplayMsgLogs, "tipstream", cfg.Log)
if err != nil {
return err
}

ps := &paymentsServer{
cfg: cfg,
log: cfg.Log,
c: cfg.Client,

tipProgressStreams: tipProgressStreams,
tipStreams: tipStreams,
}
ps.registerOfflineMessageStorageHandlers()
s.services.Bind("PaymentsService", types.PaymentsServiceDefn(), ps)
Expand Down
27 changes: 26 additions & 1 deletion clientrpc/clientrpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,15 @@ service PaymentsService {
rpc TipProgress(TipProgressRequest) returns (stream TipProgressEvent);

/* AckTipProgress acknowledges events received up to a given
sequence_id have been processed. */
sequence_id have been processed. */
rpc AckTipProgress(AckRequest) returns (AckResponse);

/* TipStream returns a stream that gets tips received by the client. */
rpc TipStream(TipStreamRequest) returns (stream ReceivedTip);

/* AckTipReceived acknowledges events received up to a given
sequence_id have been processed. */
rpc AckTipReceived(AckRequest) returns (AckResponse);
}

/* ResourcesService is the service to perform resource and page related actions. */
Expand Down Expand Up @@ -966,3 +973,21 @@ message FileMetadata {
/* attributes of the file. */
map<string,string> attributes = 10;
}

/* TipStreamRequest is the request for a new tip reception stream.*/
message TipStreamRequest {
/* unacked_from specifies to the server the sequence_id of the last processed
PM. PMs received by the server that have a higher sequence_id will be
streamed back to the client. */
uint64 unacked_from = 1;
}

/* ReceivedPM is a private message received by the client. */
message ReceivedTip {
/* uid is the source user ID in raw format. */
bytes uid = 1;
/* amount_matoms is the amount being tipped in milli-atoms. */
int64 amount_matoms = 2;
/* sequence_id is an opaque sequential ID. */
uint64 sequence_id = 3;
}
Loading

0 comments on commit 5970fc8

Please sign in to comment.