Skip to content

Commit

Permalink
Removed handleResponse func and using generic HandleResponse instead
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiDiachuk committed Dec 24, 2024
1 parent 71bac49 commit f71edd5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,27 @@ func NewSendTransactionStatusesDataProvider(
//
// No errors are expected during normal operations.
func (p *SendTransactionStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *SendTransactionStatusesDataProvider) createSubscription(
ctx context.Context,
args sendTransactionStatusesArguments,
) subscription.Subscription {
return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *SendTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error {

messageIndex := counters.NewMonotonousCounter(0)

return func(txResults []*access.TransactionResult) error {

return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) {
index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}
index := messageIndex.Value()

p.send <- &models.TransactionStatusesResponse{
return &models.TransactionStatusesResponse{
TransactionResults: txResults,
MessageIndex: index,
}

return nil
}
}, nil
}))
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *SendTransactionStatusesDataProvider) createSubscription(
ctx context.Context,
args sendTransactionStatusesArguments,
) subscription.Subscription {
return p.api.SendAndSubscribeTransactionStatuses(ctx, &args.Transaction, entities.EventEncodingVersion_JSON_CDC_V0)
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,19 @@ func NewTransactionStatusesDataProvider(
//
// No errors are expected during normal operations.
func (p *TransactionStatusesDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse())
messageIndex := counters.NewMonotonousCounter(0)

return subscription.HandleSubscription(p.subscription, subscription.HandleResponse(p.send, func(txResults []*access.TransactionResult) (interface{}, error) {
index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return nil, status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

return &models.TransactionStatusesResponse{
TransactionResults: txResults,
MessageIndex: index,
}, nil
}))
}

// createSubscription creates a new subscription using the specified input arguments.
Expand All @@ -90,28 +102,6 @@ func (p *TransactionStatusesDataProvider) createSubscription(
return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0)
}

// handleResponse processes an account statuses and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error {
messageIndex := counters.NewMonotonousCounter(0)

return func(txResults []*access.TransactionResult) error {

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

p.send <- &models.TransactionStatusesResponse{
TransactionResults: txResults,
MessageIndex: index,
}

return nil
}
}

// parseAccountStatusesArguments validates and initializes the account statuses arguments.
func parseTransactionStatusesArguments(
arguments models.Arguments,
Expand Down

0 comments on commit f71edd5

Please sign in to comment.