Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Make response for data providers consistent. #6846

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesRespo
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

p.send <- &models.AccountStatusesResponse{
BlockID: accountStatusesResponse.BlockID.String(),
Height: strconv.FormatUint(accountStatusesResponse.Height, 10),
AccountEvents: accountStatusesResponse.AccountEvents,
MessageIndex: index,
p.send <- &models.BaseDataProvidersResponse{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a common function or method Build for BaseDataProvidersResponse that creates/fills BaseDataProvidersResponse and is used by every provider? Also I guess, we can use baseDataProvider as an argument to fill SubscriptionID and Topic. This approach would help avoid code duplication in the providers. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should be merged after #6802 and use response.Build functionality for Payload field

SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.AccountStatusesResponse{
BlockID: accountStatusesResponse.BlockID.String(),
Height: strconv.FormatUint(accountStatusesResponse.Height, 10),
AccountEvents: accountStatusesResponse.AccountEvents,
MessageIndex: index,
},
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,17 @@ func (s *AccountStatusesProviderSuite) requireAccountStatuses(
expectedAccountStatusesResponse, ok := expectedResponse.(backend.AccountStatusesResponse)
require.True(s.T(), ok, "unexpected type: %T", expectedResponse)

actualResponse, ok := v.(*models.AccountStatusesResponse)
require.True(s.T(), ok, "Expected *models.AccountStatusesResponse, got %T", v)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "Expected *models.BaseDataProvidersResponse, got %T", v)

require.Equal(s.T(), expectedAccountStatusesResponse.BlockID.String(), actualResponse.BlockID)
require.Equal(s.T(), len(expectedAccountStatusesResponse.AccountEvents), len(actualResponse.AccountEvents))
actualResponseData, ok := actualResponse.Payload.(*models.AccountStatusesResponse)
Copy link
Contributor

@UlyanaAndrukhiv UlyanaAndrukhiv Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should also check the Topic here and in other tests if we added it to the response.

require.True(s.T(), ok, "unexpected response data type: %T", v)

require.Equal(s.T(), expectedAccountStatusesResponse.BlockID.String(), actualResponseData.BlockID)
require.Equal(s.T(), len(expectedAccountStatusesResponse.AccountEvents), len(actualResponseData.AccountEvents))

for key, expectedEvents := range expectedAccountStatusesResponse.AccountEvents {
actualEvents, ok := actualResponse.AccountEvents[key]
actualEvents, ok := actualResponseData.AccountEvents[key]
require.True(s.T(), ok, "Missing key in actual AccountEvents: %s", key)

s.Require().Equal(expectedEvents, actualEvents, "Mismatch for key: %s", key)
Expand Down Expand Up @@ -252,9 +255,13 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
var responses []*models.AccountStatusesResponse
for i := 0; i < accountStatusesCount; i++ {
res := <-send
accountStatusesRes, ok := res.(*models.AccountStatusesResponse)
accountStatusesRes, ok := res.(*models.BaseDataProvidersResponse)
s.Require().True(ok, "Expected *models.BaseDataProvidersResponse, got %T", res)

accountStatusesResData, ok := accountStatusesRes.Payload.(*models.AccountStatusesResponse)
s.Require().True(ok, "Expected *models.AccountStatusesResponse, got %T", res)
responses = append(responses, accountStatusesRes)

responses = append(responses, accountStatusesResData)
}

// Verifying that indices are starting from 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ func (p *BlockDigestsDataProvider) Run() error {
return subscription.HandleSubscription(
p.subscription,
subscription.HandleResponse(p.send, func(block *flow.BlockDigest) (interface{}, error) {
return &models.BlockDigestMessageResponse{
Block: block,
return &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.BlockDigestMessageResponse{
Block: block,
},
}, nil

}),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,13 @@ func (s *BlocksProviderSuite) requireBlockDigest(v interface{}, expected interfa
expectedBlock, ok := expected.(*flow.Block)
require.True(s.T(), ok, "unexpected type: %T", v)

actualResponse, ok := v.(*models.BlockDigestMessageResponse)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "unexpected response type: %T", v)

s.Require().Equal(expectedBlock.Header.ID(), actualResponse.Block.ID())
s.Require().Equal(expectedBlock.Header.Height, actualResponse.Block.Height)
s.Require().Equal(expectedBlock.Header.Timestamp, actualResponse.Block.Timestamp)
actualResponseData, ok := actualResponse.Payload.(*models.BlockDigestMessageResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

s.Require().Equal(expectedBlock.Header.ID(), actualResponseData.Block.ID())
s.Require().Equal(expectedBlock.Header.Height, actualResponseData.Block.Height)
s.Require().Equal(expectedBlock.Header.Timestamp, actualResponseData.Block.Timestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ func (p *BlockHeadersDataProvider) Run() error {
return subscription.HandleSubscription(
p.subscription,
subscription.HandleResponse(p.send, func(header *flow.Header) (interface{}, error) {
return &models.BlockHeaderMessageResponse{
Header: header,
return &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.BlockHeaderMessageResponse{
Header: header,
},
}, nil
}),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ func (s *BlockHeadersProviderSuite) requireBlockHeader(v interface{}, expected i
expectedBlock, ok := expected.(*flow.Block)
require.True(s.T(), ok, "unexpected type: %T", v)

actualResponse, ok := v.(*models.BlockHeaderMessageResponse)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "unexpected response type: %T", v)

s.Require().Equal(expectedBlock.Header, actualResponse.Header)
actualResponseData, ok := actualResponse.Payload.(*models.BlockHeaderMessageResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

s.Require().Equal(expectedBlock.Header, actualResponseData.Header)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ func (p *BlocksDataProvider) Run() error {
return subscription.HandleSubscription(
p.subscription,
subscription.HandleResponse(p.send, func(block *flow.Block) (interface{}, error) {
return &models.BlockMessageResponse{
Block: block,
return &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.BlockMessageResponse{
Block: block,
},
}, nil
}),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ func (s *BlocksProviderSuite) requireBlock(v interface{}, expected interface{})
expectedBlock, ok := expected.(*flow.Block)
require.True(s.T(), ok, "unexpected type: %T", v)

actualResponse, ok := v.(*models.BlockMessageResponse)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "unexpected response type: %T", v)

s.Require().Equal(expectedBlock, actualResponse.Block)
actualResponseData, ok := actualResponse.Payload.(*models.BlockMessageResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

s.Require().Equal(expectedBlock, actualResponseData.Block)
}
16 changes: 10 additions & 6 deletions engine/access/rest/websockets/data_providers/events_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.Event
return fmt.Errorf("message index already incremented to: %d", messageIndex.Value())
}

p.send <- &models.EventResponse{
BlockId: eventsResponse.BlockID.String(),
BlockHeight: strconv.FormatUint(eventsResponse.Height, 10),
BlockTimestamp: eventsResponse.BlockTimestamp,
Events: eventsResponse.Events,
MessageIndex: index,
p.send <- &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.EventResponse{
BlockId: eventsResponse.BlockID.String(),
BlockHeight: strconv.FormatUint(eventsResponse.Height, 10),
BlockTimestamp: eventsResponse.BlockTimestamp,
Events: eventsResponse.Events,
MessageIndex: index,
},
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ func (s *EventsProviderSuite) requireEvents(v interface{}, expectedResponse inte
expectedEventsResponse, ok := expectedResponse.(backend.EventsResponse)
require.True(s.T(), ok, "unexpected type: %T", expectedResponse)

actualResponse, ok := v.(*models.EventResponse)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "Expected *models.EventResponse, got %T", v)

s.Require().ElementsMatch(expectedEventsResponse.Events, actualResponse.Events)
actualResponseData, ok := actualResponse.Payload.(*models.EventResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

s.Require().ElementsMatch(expectedEventsResponse.Events, actualResponseData.Events)
}

// invalidArgumentsTestCases returns a list of test cases with invalid argument combinations
Expand Down Expand Up @@ -279,9 +282,13 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
var responses []*models.EventResponse
for i := 0; i < eventsCount; i++ {
res := <-send
eventRes, ok := res.(*models.EventResponse)
eventRes, ok := res.(*models.BaseDataProvidersResponse)
s.Require().True(ok, "Expected *models.BaseDataProvidersResponse, got %T", res)

eventResData, ok := eventRes.Payload.(*models.EventResponse)
s.Require().True(ok, "Expected *models.EventResponse, got %T", res)
responses = append(responses, eventRes)

responses = append(responses, eventResData)
}

// Verifying that indices are starting from 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ func (p *SendAndGetTransactionStatusesDataProvider) handleResponse() func(txResu
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

p.send <- &models.TransactionStatusesResponse{
TransactionResult: txResults[i],
MessageIndex: index,
p.send <- &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.TransactionStatusesResponse{
TransactionResult: txResults[i],
MessageIndex: index,
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ func (s *SendTransactionStatusesProviderSuite) requireTransactionStatuses(
expectedTxStatusesResponse, ok := expectedResponse.(*access.TransactionResult)
require.True(s.T(), ok, "unexpected type: %T", expectedResponse)

actualResponse, ok := v.(*models.TransactionStatusesResponse)
require.True(s.T(), ok, "Expected *models.TransactionStatusesResponse, got %T", v)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "Expected *models.BaseDataProvidersResponse, got %T", v)

require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponse.TransactionResult.BlockID)
require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponse.TransactionResult.BlockHeight)
actualResponseData, ok := actualResponse.Payload.(*models.TransactionStatusesResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponseData.TransactionResult.BlockID)
require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponseData.TransactionResult.BlockHeight)
}

// TestSendTransactionStatusesDataProvider_InvalidArguments tests the behavior of the send transaction statuses data provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,13 @@ func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*acc
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

p.send <- &models.TransactionStatusesResponse{
TransactionResult: txResults[i],
MessageIndex: index,
p.send <- &models.BaseDataProvidersResponse{
SubscriptionID: p.ID().String(),
Topic: p.Topic(),
Payload: &models.TransactionStatusesResponse{
TransactionResult: txResults[i],
MessageIndex: index,
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,14 @@ func (s *TransactionStatusesProviderSuite) requireTransactionStatuses(
expectedTxStatusesResponse, ok := expectedResponse.(*access.TransactionResult)
require.True(s.T(), ok, "unexpected type: %T", expectedResponse)

actualResponse, ok := v.(*models.TransactionStatusesResponse)
require.True(s.T(), ok, "Expected *models.TransactionStatusesResponse, got %T", v)
actualResponse, ok := v.(*models.BaseDataProvidersResponse)
require.True(s.T(), ok, "Expected *models.BaseDataProvidersResponse, got %T", v)

require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponse.TransactionResult.BlockID)
require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponse.TransactionResult.BlockHeight)
actualResponseData, ok := actualResponse.Payload.(*models.TransactionStatusesResponse)
require.True(s.T(), ok, "unexpected response data type: %T", v)

require.Equal(s.T(), expectedTxStatusesResponse.BlockID, actualResponseData.TransactionResult.BlockID)
require.Equal(s.T(), expectedTxStatusesResponse.BlockHeight, actualResponseData.TransactionResult.BlockHeight)
}

// TestTransactionStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider
Expand Down Expand Up @@ -277,9 +280,14 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr
var responses []*models.TransactionStatusesResponse
for i := 0; i < txStatusesCount; i++ {
res := <-send
txStatusesRes, ok := res.(*models.TransactionStatusesResponse)

txStatusesRes, ok := res.(*models.BaseDataProvidersResponse)
s.Require().True(ok, "Expected *models.BaseDataProvidersResponse, got %T", res)

txStatusesResData, ok := txStatusesRes.Payload.(*models.TransactionStatusesResponse)
s.Require().True(ok, "Expected *models.TransactionStatusesResponse, got %T", res)
responses = append(responses, txStatusesRes)

responses = append(responses, txStatusesResData)
}

// Verifying that indices are starting from 0
Expand Down
7 changes: 7 additions & 0 deletions engine/access/rest/websockets/models/base_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@ type BaseMessageResponse struct {
Success bool `json:"success"`
Error ErrorMessage `json:"error,omitempty"`
}

// BaseDataProvidersResponse represents a base structure for responses from subscriptions.
type BaseDataProvidersResponse struct {
SubscriptionID string `json:"subscription_id"` // Unique subscriptionID
Topic string `json:"topic"` // Topic of the subscription
Payload interface{} `json:"payload"` // Payload that's being returned within a subscription.
}
Loading