From f7061aba01a4949e4f4d64c5fcaa5467870e76b2 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Thu, 7 Nov 2024 17:16:34 +0200 Subject: [PATCH] feat: PRT - Support StarkNet pathfinder (#1666) * Exported the check of ws:// to a funciton * Add support for StarkNet Pathfinder in the rpcclient * Small script fix * Improve the router key usage * Fix to RouterKey to init with empty extension * Fix internal paths issues * Warn provider when the configured internal paths are different than onchain * Small fix to the router key * Update the starknet spec with new internal paths changes * Fix lint * Add test for new difference function * Add more info to NodeUrl String func * Prefer non internal path when choosing websocket as http node url * Add TestChainRouterWithInternalPaths * Add TestGetVerifications * Small fix to the router key * Add tests for the router key * Add TestCraftChainMessage * Post merge fix * CR Fix: Reuse router key string * Remove unused code * Small test fix * Add an example for starknet with internal paths * CR Fix: Reduce conn map to one conn * CR Fix: Move into if..else * CR Fix: Remove unnecessary field * CR Fix: Small bug fix * CR Fix: Break if true * dynamic internal path population * Add ContainsPredicate to lavaslices * Fixes to the router key * Remove the warning of non configured internal paths * Fix the chain router and its tests --------- Co-authored-by: Omer <100387053+omerlavanet@users.noreply.github.com> Co-authored-by: Ran Mishael --- config/provider_examples/strk_example.yml | 17 + cookbook/specs/starknet.json | 323 +++++++ protocol/chainlib/base_chain_parser.go | 46 +- protocol/chainlib/base_chain_parser_test.go | 142 +++ protocol/chainlib/chain_fetcher.go | 8 +- protocol/chainlib/chain_message.go | 1 + protocol/chainlib/chain_message_test.go | 106 +++ protocol/chainlib/chain_router.go | 240 +++-- protocol/chainlib/chain_router_test.go | 846 +++++++++++++++++- protocol/chainlib/chainlib.go | 4 +- .../chainlib/chainproxy/rpcclient/handler.go | 40 + .../chainlib/chainproxy/rpcclient/json.go | 11 +- protocol/chainlib/common.go | 11 + protocol/chainlib/common_test.go | 2 +- protocol/chainlib/jsonRPC.go | 83 +- protocol/chainlib/jsonRPC_test.go | 2 +- protocol/chainlib/rest.go | 2 +- protocol/chainlib/tendermintRPC.go | 24 +- protocol/common/endpoints.go | 2 +- .../lavasession/consumer_session_manager.go | 22 +- protocol/lavasession/router_key.go | 89 +- protocol/lavasession/router_key_test.go | 90 ++ protocol/lavasession/used_providers.go | 9 +- protocol/rpcprovider/rpcprovider.go | 22 + protocol/rpcprovider/rpcprovider_server.go | 2 +- scripts/init_chain_commands.sh | 2 +- utils/lavaslices/slices.go | 39 + utils/lavaslices/slices_test.go | 54 ++ 28 files changed, 2025 insertions(+), 214 deletions(-) create mode 100644 config/provider_examples/strk_example.yml create mode 100644 protocol/chainlib/base_chain_parser_test.go create mode 100644 protocol/chainlib/chain_message_test.go create mode 100644 protocol/lavasession/router_key_test.go diff --git a/config/provider_examples/strk_example.yml b/config/provider_examples/strk_example.yml new file mode 100644 index 0000000000..111bfaac07 --- /dev/null +++ b/config/provider_examples/strk_example.yml @@ -0,0 +1,17 @@ +endpoints: + - api-interface: jsonrpc + chain-id: STRK + network-address: + address: "127.0.0.1:2220" + node-urls: + - url: /ws + internal-path: "" + - url: /ws/rpc/v0_6 + internal-path: "/rpc/v0_6" + + - url: + internal-path: "" + - url: /rpc/v0_5 + internal-path: "/rpc/v0_5" + - url: /rpc/v0_6 + internal-path: "/rpc/v0_6" diff --git a/cookbook/specs/starknet.json b/cookbook/specs/starknet.json index 567b740a84..651bf0ade8 100644 --- a/cookbook/specs/starknet.json +++ b/cookbook/specs/starknet.json @@ -601,6 +601,87 @@ } ] }, + { + "enabled": false, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "WS-ONLY", + "type": "POST", + "add_on": "" + }, + "apis": [ + { + "name": "pathfinder_subscribe", + "block_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "compute_units": 1000, + "enabled": true, + "category": { + "deterministic": false, + "local": true, + "subscription": true, + "stateful": 0 + }, + "extra_compute_units": 0 + }, + { + "name": "pathfinder_unsubscribe", + "block_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "compute_units": 10, + "enabled": true, + "category": { + "deterministic": false, + "local": true, + "subscription": true, + "stateful": 0 + }, + "extra_compute_units": 0 + } + ], + "parse_directives": [ + { + "function_tag": "SUBSCRIBE", + "api_name": "pathfinder_subscribe" + }, + { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"pathfinder_unsubscribe\",\"params\":[%s],\"id\":1}", + "function_tag": "UNSUBSCRIBE", + "api_name": "pathfinder_unsubscribe" + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws", + "type": "POST", + "add_on": "" + }, + "inheritance_apis": [ + { + "api_interface": "jsonrpc", + "internal_path": "", + "type": "POST", + "add_on": "" + }, + { + "api_interface": "jsonrpc", + "internal_path": "WS-ONLY", + "type": "POST", + "add_on": "" + } + ] + }, { "enabled": true, "collection_data": { @@ -635,6 +716,69 @@ } ] }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws/rpc/v0_6", + "type": "POST", + "add_on": "" + }, + "inheritance_apis": [ + { + "api_interface": "jsonrpc", + "internal_path": "/rpc/v0_6", + "type": "POST", + "add_on": "" + }, + { + "api_interface": "jsonrpc", + "internal_path": "WS-ONLY", + "type": "POST", + "add_on": "" + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/rpc/v0_7", + "type": "POST", + "add_on": "" + }, + "inheritance_apis": [ + { + "api_interface": "jsonrpc", + "internal_path": "", + "type": "POST", + "add_on": "" + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws/rpc/v0_7", + "type": "POST", + "add_on": "" + }, + "inheritance_apis": [ + { + "api_interface": "jsonrpc", + "internal_path": "/rpc/v0_7", + "type": "POST", + "add_on": "" + }, + { + "api_interface": "jsonrpc", + "internal_path": "WS-ONLY", + "type": "POST", + "add_on": "" + } + ] + }, { "enabled": true, "collection_data": { @@ -724,6 +868,93 @@ ] } ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/rpc/pathfinder/v0.1", + "type": "POST", + "add_on": "" + }, + "apis": [ + { + "name": "pathfinder_version", + "block_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "compute_units": 10, + "enabled": true, + "category": { + "deterministic": true, + "local": false, + "subscription": false, + "stateful": 0 + }, + "extra_compute_units": 0 + }, + { + "name": "pathfinder_getProof", + "block_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "compute_units": 10, + "enabled": true, + "category": { + "deterministic": true, + "local": false, + "subscription": false, + "stateful": 0 + }, + "extra_compute_units": 0 + }, + { + "name": "pathfinder_getTransactionStatus", + "block_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "compute_units": 10, + "enabled": true, + "category": { + "deterministic": true, + "local": false, + "subscription": false, + "stateful": 0 + }, + "extra_compute_units": 0 + } + ], + "verifications": [ + { + "name": "enabled", + "parse_directive": { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"pathfinder_version\",\"params\":[],\"id\":1}", + "function_tag": "VERIFICATION", + "result_parsing": { + "parser_arg": [ + "latest" + ], + "parser_func": "DEFAULT" + }, + "api_name": "pathfinder_version" + }, + "values": [ + { + "expected_value": "*", + "severity": "Warning" + } + ] + } + ] } ] }, @@ -768,6 +999,29 @@ } ] }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws", + "type": "POST", + "add_on": "" + }, + "apis": [], + "headers": [], + "inheritance_apis": [], + "parse_directives": [], + "verifications": [ + { + "name": "chain-id", + "values": [ + { + "expected_value": "0x534e5f5345504f4c4941" + } + ] + } + ] + }, { "enabled": true, "collection_data": { @@ -791,6 +1045,29 @@ } ] }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws/rpc/v0_6", + "type": "POST", + "add_on": "" + }, + "apis": [], + "headers": [], + "inheritance_apis": [], + "parse_directives": [], + "verifications": [ + { + "name": "chain-id", + "values": [ + { + "expected_value": "0x534e5f5345504f4c4941" + } + ] + } + ] + }, { "enabled": true, "collection_data": { @@ -813,6 +1090,52 @@ ] } ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/rpc/v0_7", + "type": "POST", + "add_on": "" + }, + "apis": [], + "headers": [], + "inheritance_apis": [], + "parse_directives": [], + "verifications": [ + { + "name": "chain-id", + "values": [ + { + "expected_value": "0x534e5f5345504f4c4941" + } + ] + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws/rpc/v0_7", + "type": "POST", + "add_on": "" + }, + "apis": [], + "headers": [], + "inheritance_apis": [], + "parse_directives": [], + "verifications": [ + { + "name": "chain-id", + "values": [ + { + "expected_value": "0x534e5f5345504f4c4941" + } + ] + } + ] } ] } diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 13da350da2..487d545aba 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -11,6 +11,7 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" "github.com/lavanet/lava/v4/protocol/common" "github.com/lavanet/lava/v4/utils" + "github.com/lavanet/lava/v4/utils/lavaslices" epochstorage "github.com/lavanet/lava/v4/x/epochstorage/types" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" spectypes "github.com/lavanet/lava/v4/x/spec/types" @@ -29,7 +30,7 @@ type BaseChainParser struct { serverApis map[ApiKey]ApiContainer apiCollections map[CollectionKey]*spectypes.ApiCollection headers map[ApiKey]*spectypes.Header - verifications map[VerificationKey][]VerificationContainer + verifications map[VerificationKey]map[string][]VerificationContainer // map[VerificationKey]map[InternalPath][]VerificationContainer allowedAddons map[string]bool extensionParser extensionslib.ExtensionParser active bool @@ -201,7 +202,7 @@ func (bcp *BaseChainParser) SeparateAddonsExtensions(supported []string) (addons } // gets all verifications for an endpoint supporting multiple addons and extensions -func (bcp *BaseChainParser) GetVerifications(supported []string) (retVerifications []VerificationContainer, err error) { +func (bcp *BaseChainParser) GetVerifications(supported []string, internalPath string, apiInterface string) (retVerifications []VerificationContainer, err error) { // addons will contains extensions and addons, // extensions must exist in all verifications, addons must be split because they are separated addons, extensions, err := bcp.SeparateAddonsExtensions(supported) @@ -212,24 +213,27 @@ func (bcp *BaseChainParser) GetVerifications(supported []string) (retVerificatio extensions = []string{""} } addons = append(addons, "") // always add the empty addon + for _, addon := range addons { for _, extension := range extensions { verificationKey := VerificationKey{ Extension: extension, Addon: addon, } - verifications, ok := bcp.verifications[verificationKey] + collectionVerifications, ok := bcp.verifications[verificationKey] if ok { - retVerifications = append(retVerifications, verifications...) + if verifications, ok := collectionVerifications[internalPath]; ok { + retVerifications = append(retVerifications, verifications...) + } } } } - return + return retVerifications, nil } func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[string]struct{}, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, serverApis map[ApiKey]ApiContainer, apiCollections map[CollectionKey]*spectypes.ApiCollection, headers map[ApiKey]*spectypes.Header, - verifications map[VerificationKey][]VerificationContainer, + verifications map[VerificationKey]map[string][]VerificationContainer, ) { bcp.spec = spec bcp.internalPaths = internalPaths @@ -263,6 +267,22 @@ func (bcp *BaseChainParser) GetParsingByTag(tag spectypes.FUNCTION_TAG) (parsing return val.Parsing, val.ApiCollection, ok } +func (bcp *BaseChainParser) IsTagInCollection(tag spectypes.FUNCTION_TAG, collectionKey CollectionKey) bool { + bcp.rwLock.RLock() + defer bcp.rwLock.RUnlock() + + apiCollection, ok := bcp.apiCollections[collectionKey] + return ok && lavaslices.ContainsPredicate(apiCollection.ParseDirectives, func(elem *spectypes.ParseDirective) bool { + return elem.FunctionTag == tag + }) +} + +func (bcp *BaseChainParser) GetAllInternalPaths() []string { + bcp.rwLock.RLock() + defer bcp.rwLock.RUnlock() + return lavaslices.KeysSlice(bcp.internalPaths) +} + func (bcp *BaseChainParser) ExtensionParsing(addon string, parsedMessageArg *baseChainMessageContainer, extensionInfo extensionslib.ExtensionInfo) { if extensionInfo.ExtensionOverride == nil { // consumer side extension parsing. to set the extension based on the latest block and the request @@ -350,13 +370,13 @@ func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addo return api, nil } -func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths map[string]struct{}, retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey][]VerificationContainer) { +func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths map[string]struct{}, retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey]map[string][]VerificationContainer) { retInternalPaths = map[string]struct{}{} serverApis := map[ApiKey]ApiContainer{} taggedApis := map[spectypes.FUNCTION_TAG]TaggedContainer{} headers := map[ApiKey]*spectypes.Header{} apiCollections := map[CollectionKey]*spectypes.ApiCollection{} - verifications := map[VerificationKey][]VerificationContainer{} + verifications := map[VerificationKey]map[string][]VerificationContainer{} if spec.Enabled { for _, apiCollection := range spec.ApiCollections { if !apiCollection.Enabled { @@ -385,7 +405,7 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths if !api.Enabled { continue } - // + // TODO: find a better spot for this (more optimized, precompile regex, etc) if rpcInterface == spectypes.APIInterfaceRest { re := regexp.MustCompile(`{[^}]+}`) @@ -455,6 +475,7 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths } verCont := VerificationContainer{ + InternalPath: apiCollection.CollectionData.InternalPath, ConnectionType: apiCollection.CollectionData.Type, Name: verification.Name, ParseDirective: *verification.ParseDirective, @@ -464,10 +485,13 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths Severity: parseValue.Severity, } + internalPath := apiCollection.CollectionData.InternalPath if extensionVerifications, ok := verifications[verificationKey]; !ok { - verifications[verificationKey] = []VerificationContainer{verCont} + verifications[verificationKey] = map[string][]VerificationContainer{internalPath: {verCont}} + } else if collectionVerifications, ok := extensionVerifications[internalPath]; !ok { + verifications[verificationKey][internalPath] = []VerificationContainer{verCont} } else { - verifications[verificationKey] = append(extensionVerifications, verCont) + verifications[verificationKey][internalPath] = append(collectionVerifications, verCont) } } } diff --git a/protocol/chainlib/base_chain_parser_test.go b/protocol/chainlib/base_chain_parser_test.go new file mode 100644 index 0000000000..8339cd2688 --- /dev/null +++ b/protocol/chainlib/base_chain_parser_test.go @@ -0,0 +1,142 @@ +package chainlib + +import ( + reflect "reflect" + "strconv" + "testing" + + "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" + spectypes "github.com/lavanet/lava/v4/x/spec/types" + "github.com/stretchr/testify/require" +) + +func TestGetVerifications(t *testing.T) { + verifications := map[VerificationKey]map[string][]VerificationContainer{ + { + Extension: "", + Addon: "", + }: { + "/x": { + {InternalPath: "/x"}, + }, + "": { + {InternalPath: ""}, + }, + }, + { + Extension: "", + Addon: "addon1", + }: { + "/x": { + {InternalPath: "/x"}, + }, + "": { + {InternalPath: ""}, + }, + }, + { + Extension: "ext1", + Addon: "addon1", + }: { + "/x": { + {InternalPath: "/x"}, + }, + "": { + {InternalPath: ""}, + }, + }, + { + Extension: "ext1", + Addon: "", + }: { + "/x": { + {InternalPath: "/x"}, + }, + "": { + {InternalPath: ""}, + }, + }, + } + + playBook := []struct { + Extension string + Addon string + InternalPath string + }{ + { + Extension: "", + Addon: "", + InternalPath: "", + }, + { + Extension: "", + Addon: "", + InternalPath: "/x", + }, + { + Extension: "ext1", + Addon: "addon1", + InternalPath: "", + }, + { + Extension: "ext1", + Addon: "addon1", + InternalPath: "/x", + }, + { + Extension: "", + Addon: "addon1", + InternalPath: "", + }, + { + Extension: "", + Addon: "addon1", + InternalPath: "/x", + }, + { + Extension: "ext1", + Addon: "", + InternalPath: "", + }, + { + Extension: "ext1", + Addon: "", + InternalPath: "/x", + }, + } + + baseChainParser := BaseChainParser{ + verifications: verifications, + allowedAddons: map[string]bool{"addon1": true}, + } + baseChainParser.extensionParser = extensionslib.NewExtensionParser(map[string]struct{}{"ext1": {}}, nil) + + for idx, play := range playBook { + for _, apiInterface := range []string{spectypes.APIInterfaceJsonRPC, spectypes.APIInterfaceTendermintRPC, spectypes.APIInterfaceRest, spectypes.APIInterfaceGrpc} { + t.Run("GetVerifications "+strconv.Itoa(idx), func(t *testing.T) { + var supported []string + if play.Extension == "" && play.Addon == "" { + supported = []string{""} + } else if play.Extension == "" { + supported = []string{play.Addon} + } else if play.Addon == "" { + supported = []string{play.Extension} + } else { + supported = []string{play.Extension, play.Addon} + } + + actualVerifications, err := baseChainParser.GetVerifications(supported, play.InternalPath, apiInterface) + require.NoError(t, err) + + expectedVerificationKey := VerificationKey{Extension: play.Extension, Addon: play.Addon} + expectedVerifications := verifications[expectedVerificationKey][play.InternalPath] + // add the empty addon to the expected verifications + if play.Addon != "" { + expectedVerificationKey.Addon = "" + expectedVerifications = append(expectedVerifications, verifications[expectedVerificationKey][play.InternalPath]...) + } + require.True(t, reflect.DeepEqual(expectedVerifications, actualVerifications), "expected: %v, actual: %v", expectedVerifications, actualVerifications) + }) + } + } +} diff --git a/protocol/chainlib/chain_fetcher.go b/protocol/chainlib/chain_fetcher.go index a0a8fde139..a99408e690 100644 --- a/protocol/chainlib/chain_fetcher.go +++ b/protocol/chainlib/chain_fetcher.go @@ -49,7 +49,7 @@ func (cf *ChainFetcher) FetchEndpoint() lavasession.RPCProviderEndpoint { func (cf *ChainFetcher) Validate(ctx context.Context) error { for _, url := range cf.endpoint.NodeUrls { addons := url.Addons - verifications, err := cf.chainParser.GetVerifications(addons) + verifications, err := cf.chainParser.GetVerifications(addons, url.InternalPath, cf.endpoint.ApiInterface) if err != nil { return err } @@ -167,7 +167,8 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon } } - chainMessage, err := CraftChainMessage(parsing, collectionType, cf.chainParser, &CraftData{Path: path, Data: data, ConnectionType: collectionType}, cf.ChainFetcherMetadata()) + craftData := &CraftData{Path: path, Data: data, ConnectionType: collectionType, InternalPath: verification.InternalPath} + chainMessage, err := CraftChainMessage(parsing, collectionType, cf.chainParser, craftData, cf.ChainFetcherMetadata()) if err != nil { return utils.LavaFormatError("[-] verify failed creating chainMessage", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } @@ -253,6 +254,7 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon utils.LogAttr("rawData", parsedInput.GetRawParsedData()), utils.LogAttr("verificationKey", verification.VerificationKey), utils.LogAttr("apiInterface", cf.endpoint.ApiInterface), + utils.LogAttr("internalPath", proxyUrl.InternalPath), ) return nil } @@ -455,7 +457,7 @@ type DummyChainFetcher struct { func (cf *DummyChainFetcher) Validate(ctx context.Context) error { for _, url := range cf.endpoint.NodeUrls { addons := url.Addons - verifications, err := cf.chainParser.GetVerifications(addons) + verifications, err := cf.chainParser.GetVerifications(addons, url.InternalPath, cf.endpoint.ApiInterface) if err != nil { return err } diff --git a/protocol/chainlib/chain_message.go b/protocol/chainlib/chain_message.go index fdd33ea80c..399d7694c7 100644 --- a/protocol/chainlib/chain_message.go +++ b/protocol/chainlib/chain_message.go @@ -183,6 +183,7 @@ type CraftData struct { Path string Data []byte ConnectionType string + InternalPath string } func CraftChainMessage(parsing *spectypes.ParseDirective, connectionType string, chainParser ChainParser, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) { diff --git a/protocol/chainlib/chain_message_test.go b/protocol/chainlib/chain_message_test.go new file mode 100644 index 0000000000..5e988623b7 --- /dev/null +++ b/protocol/chainlib/chain_message_test.go @@ -0,0 +1,106 @@ +package chainlib + +import ( + "fmt" + "net/http" + "testing" + + testcommon "github.com/lavanet/lava/v4/testutil/common" + "github.com/lavanet/lava/v4/x/spec/types" + "github.com/stretchr/testify/require" +) + +func TestCraftChainMessage(t *testing.T) { + type play struct { + apiInterface string + craftData *CraftData + } + + expectedInternalPath := "/x" + method := "banana" + + playBook := []play{ + { + apiInterface: types.APIInterfaceJsonRPC, + craftData: &CraftData{ + Path: method, + Data: []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"%s","params":[],"id":1}`, method)), + ConnectionType: http.MethodPost, + InternalPath: expectedInternalPath, + }, + }, + { + apiInterface: types.APIInterfaceTendermintRPC, + craftData: &CraftData{ + Path: method, + Data: []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"%s","params":[],"id":1}`, method)), + ConnectionType: "", + InternalPath: expectedInternalPath, + }, + }, + { + apiInterface: types.APIInterfaceRest, + craftData: &CraftData{ + Data: []byte(method), + ConnectionType: http.MethodGet, + InternalPath: expectedInternalPath, + }, + }, + { + apiInterface: types.APIInterfaceRest, + craftData: &CraftData{ + Path: method, + Data: []byte(`{"data":"banana"}`), + ConnectionType: http.MethodPost, + InternalPath: expectedInternalPath, + }, + }, + { + apiInterface: types.APIInterfaceGrpc, + craftData: &CraftData{ + Path: method, + ConnectionType: "", + InternalPath: expectedInternalPath, + }, + }, + } + + for _, play := range playBook { + runName := play.apiInterface + if play.craftData.ConnectionType != "" { + runName += "_" + play.craftData.ConnectionType + } + + t.Run(runName, func(t *testing.T) { + chainParser, err := NewChainParser(play.apiInterface) + require.NoError(t, err) + + spec := testcommon.CreateMockSpec() + spec.ApiCollections = []*types.ApiCollection{ + { + Enabled: true, + CollectionData: types.CollectionData{ + ApiInterface: play.apiInterface, + Type: play.craftData.ConnectionType, + InternalPath: expectedInternalPath, + }, + Apis: []*types.Api{ + { + Name: method, + ComputeUnits: 100, + Enabled: true, + }, + }, + }, + } + chainParser.SetSpec(spec) + + chainMsg, err := CraftChainMessage(&types.ParseDirective{ApiName: method}, play.craftData.ConnectionType, chainParser, play.craftData, nil) + require.NoError(t, err) + require.NotNil(t, chainMsg) + + internalPath := chainMsg.GetApiCollection().CollectionData.InternalPath + require.Equal(t, expectedInternalPath, internalPath) + }) + } +} diff --git a/protocol/chainlib/chain_router.go b/protocol/chainlib/chain_router.go index 4237965d3c..cbad7305e6 100644 --- a/protocol/chainlib/chain_router.go +++ b/protocol/chainlib/chain_router.go @@ -2,7 +2,6 @@ package chainlib import ( "context" - "net/url" "strings" "sync" @@ -16,11 +15,6 @@ import ( "google.golang.org/grpc/metadata" ) -type MethodRoute struct { - lavasession.RouterKey - method string -} - type chainRouterEntry struct { ChainProxy addonsSupported map[string]struct{} @@ -39,16 +33,18 @@ func (cre *chainRouterEntry) isSupporting(addon string) bool { type chainRouterImpl struct { lock *sync.RWMutex - chainProxyRouter map[lavasession.RouterKey][]chainRouterEntry + chainProxyRouter map[string][]chainRouterEntry // key is routing key } -func (cri *chainRouterImpl) GetChainProxySupporting(ctx context.Context, addon string, extensions []string, method string) (ChainProxy, error) { +func (cri *chainRouterImpl) GetChainProxySupporting(ctx context.Context, addon string, extensions []string, method string, internalPath string) (ChainProxy, error) { cri.lock.RLock() defer cri.lock.RUnlock() // check if that specific method has a special route, if it does apply it to the router key wantedRouterKey := lavasession.NewRouterKey(extensions) - if chainProxyEntries, ok := cri.chainProxyRouter[wantedRouterKey]; ok { + wantedRouterKey.ApplyInternalPath(internalPath) + wantedRouterKeyStr := wantedRouterKey.String() + if chainProxyEntries, ok := cri.chainProxyRouter[wantedRouterKeyStr]; ok { for _, chainRouterEntry := range chainProxyEntries { if chainRouterEntry.isSupporting(addon) { // check if the method is supported @@ -58,30 +54,34 @@ func (cri *chainRouterImpl) GetChainProxySupporting(ctx context.Context, addon s } utils.LavaFormatTrace("chainProxy supporting method routing selected", utils.LogAttr("addon", addon), - utils.LogAttr("wantedRouterKey", wantedRouterKey), + utils.LogAttr("wantedRouterKey", wantedRouterKeyStr), utils.LogAttr("method", method), ) } - if wantedRouterKey != lavasession.GetEmptyRouterKey() { // add trailer only when router key is not default (||) - grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeExtension, string(wantedRouterKey))) + if wantedRouterKeyStr != lavasession.GetEmptyRouterKey().String() { // add trailer only when router key is not default (||) + grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeExtension, wantedRouterKeyStr)) } return chainRouterEntry.ChainProxy, nil } utils.LavaFormatTrace("chainProxy supporting extensions but not supporting addon", utils.LogAttr("addon", addon), - utils.LogAttr("wantedRouterKey", wantedRouterKey), + utils.LogAttr("wantedRouterKey", wantedRouterKeyStr), ) } // no support for this addon return nil, utils.LavaFormatError("no chain proxy supporting requested addon", nil, utils.Attribute{Key: "addon", Value: addon}) } // no support for these extensions - return nil, utils.LavaFormatError("no chain proxy supporting requested extensions", nil, utils.Attribute{Key: "extensions", Value: extensions}) + return nil, utils.LavaFormatError("no chain proxy supporting requested extensions and internal path", nil, + utils.LogAttr("extensions", extensions), + utils.LogAttr("internalPath", internalPath), + utils.LogAttr("supported", cri.chainProxyRouter), + ) } func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool { - routerKey := lavasession.NewRouterKey(extensions) + routerKey := lavasession.NewRouterKey(extensions).String() _, ok := cri.chainProxyRouter[routerKey] return ok } @@ -89,7 +89,7 @@ func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool { func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { // add the parsed addon from the apiCollection addon := chainMessage.GetApiCollection().CollectionData.AddOn - selectedChainProxy, err := cri.GetChainProxySupporting(ctx, addon, extensions, chainMessage.GetApi().Name) + selectedChainProxy, err := cri.GetChainProxySupporting(ctx, addon, extensions, chainMessage.GetApi().Name, chainMessage.GetApiCollection().CollectionData.InternalPath) if err != nil { return nil, "", nil, common.NodeUrl{}, "", err } @@ -98,11 +98,66 @@ func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, return relayReply, subscriptionID, relayReplyServer, proxyUrl, chainId, err } +func (cri *chainRouterImpl) autoGenerateMissingInternalPaths(isWs bool, nodeUrl common.NodeUrl, routerKey lavasession.RouterKey, autoGeneratedInternalPaths map[string]struct{}, rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser, returnedBatch map[string]lavasession.RPCProviderEndpoint) error { + baseUrl := nodeUrl.Url + for _, internalPath := range chainParser.GetAllInternalPaths() { + if internalPath == "" { + // skip the root since we've already added it + continue + } + + autoGeneratedInternalPaths[internalPath] = struct{}{} + + nodeUrl.InternalPath = internalPath // add internal path to the nodeUrl + nodeUrl.Url = baseUrl + internalPath + routerKey.ApplyInternalPath(internalPath) + if isWs { + addons, _, err := chainParser.SeparateAddonsExtensions(nodeUrl.Addons) + if err != nil { + return err + } + + lookForSubscriptionTag := func() bool { + for _, connectionType := range []string{"POST", ""} { + if len(addons) == 0 { + addons = append(addons, "") + } + + for _, addon := range addons { + // check subscription exists, we only care for subscription API's because otherwise we use http anyway. + collectionKey := CollectionKey{ + InternalPath: internalPath, + Addon: addon, + ConnectionType: connectionType, + } + + if chainParser.IsTagInCollection(spectypes.FUNCTION_TAG_SUBSCRIBE, collectionKey) { + return true + } + } + } + return false + } + + if !lookForSubscriptionTag() { + continue + } + } + + cri.setRouterKeyInBatch(nodeUrl, returnedBatch, routerKey, rpcProviderEndpoint, false) // will not override existing entries + } + + return nil +} + // batch nodeUrls with the same addons together in a copy -func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasession.RPCProviderEndpoint) (map[lavasession.RouterKey]lavasession.RPCProviderEndpoint, error) { - returnedBatch := map[lavasession.RouterKey]lavasession.RPCProviderEndpoint{} - routesToCheck := map[lavasession.RouterKey]bool{} +func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser) (map[string]lavasession.RPCProviderEndpoint, error) { + returnedBatch := map[string]lavasession.RPCProviderEndpoint{} + routesToCheck := map[string]bool{} methodRoutes := map[string]int{} + httpRootRouteSet := false + autoGeneratedInternalPaths := map[string]struct{}{} + for _, nodeUrl := range rpcProviderEndpoint.NodeUrls { routerKey := lavasession.NewRouterKey(nodeUrl.Addons) if len(nodeUrl.Methods) > 0 { @@ -114,10 +169,38 @@ func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasess methodRoutes[methodRoutesUnique] = len(methodRoutes) existing = len(methodRoutes) } - routerKey = routerKey.ApplyMethodsRoute(existing) + routerKey.ApplyMethodsRoute(existing) + } + routerKey.ApplyInternalPath(nodeUrl.InternalPath) + isWs, err := IsUrlWebSocket(nodeUrl.Url) + // Some parsing may fail because of gRPC + if err == nil && isWs { + // now change the router key to fit the websocket extension key. + nodeUrl.Addons = append(nodeUrl.Addons, WebSocketExtension) + routerKey.SetExtensions(nodeUrl.Addons) + } + + _, isAlreadyAutoGenerated := autoGeneratedInternalPaths[nodeUrl.InternalPath] + cri.setRouterKeyInBatch(nodeUrl, returnedBatch, routerKey, rpcProviderEndpoint, isAlreadyAutoGenerated) // will override existing entries + + if nodeUrl.InternalPath == "" { // root path + if !isWs { + httpRootRouteSet = true + } + + err = cri.autoGenerateMissingInternalPaths(isWs, nodeUrl, routerKey, autoGeneratedInternalPaths, rpcProviderEndpoint, chainParser, returnedBatch) + if err != nil { + return nil, err + } } - cri.parseNodeUrl(nodeUrl, returnedBatch, routerKey, rpcProviderEndpoint) } + + // check if batch has http configured, if not, add a websocket one + // prefer one without internal path + if !httpRootRouteSet { + return nil, utils.LavaFormatError("HTTP/HTTPS is mandatory. It is recommended to configure both HTTP/HTTP and WS/WSS.", nil, utils.LogAttr("nodeUrls", rpcProviderEndpoint.NodeUrls)) + } + if len(returnedBatch) == 0 { return nil, utils.LavaFormatError("invalid batch, routes are empty", nil, utils.LogAttr("endpoint", rpcProviderEndpoint)) } @@ -127,52 +210,45 @@ func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasess return nil, utils.LavaFormatError("invalid batch, missing regular route for method route", nil, utils.LogAttr("routerKey", routerKey)) } } + utils.LavaFormatDebug("batched nodeUrls by services", utils.LogAttr("batch", returnedBatch)) return returnedBatch, nil } -func (*chainRouterImpl) parseNodeUrl(nodeUrl common.NodeUrl, returnedBatch map[lavasession.RouterKey]lavasession.RPCProviderEndpoint, routerKey lavasession.RouterKey, rpcProviderEndpoint lavasession.RPCProviderEndpoint) { - u, err := url.Parse(nodeUrl.Url) - // Some parsing may fail because of gRPC - if err == nil && (u.Scheme == "ws" || u.Scheme == "wss") { - // if websocket, check if we have a router key for http already. if not add a websocket router key - // so in case we didn't get an http endpoint, we can use the ws one. - if _, ok := returnedBatch[routerKey]; !ok { - returnedBatch[routerKey] = lavasession.RPCProviderEndpoint{ - NetworkAddress: rpcProviderEndpoint.NetworkAddress, - ChainID: rpcProviderEndpoint.ChainID, - ApiInterface: rpcProviderEndpoint.ApiInterface, - Geolocation: rpcProviderEndpoint.Geolocation, - NodeUrls: []common.NodeUrl{nodeUrl}, - } - } - // now change the router key to fit the websocket extension key. - nodeUrl.Addons = append(nodeUrl.Addons, WebSocketExtension) - routerKey = lavasession.NewRouterKey(nodeUrl.Addons) - } - - if existingEndpoint, ok := returnedBatch[routerKey]; !ok { - returnedBatch[routerKey] = lavasession.RPCProviderEndpoint{ +func (*chainRouterImpl) setRouterKeyInBatch(nodeUrl common.NodeUrl, returnedBatch map[string]lavasession.RPCProviderEndpoint, routerKey lavasession.RouterKey, rpcProviderEndpoint lavasession.RPCProviderEndpoint, overrideExistingEntry bool) { + // if the router key does not exit, create it anyway + // if we need to override, override + // if it exists and we should not override, add the node url to the existing list + routerKeyString := routerKey.String() + if existingEndpoint, ok := returnedBatch[routerKeyString]; ok && !overrideExistingEntry { + // setting the incoming url first as it might be http while existing is websocket. (we prioritize http over ws when possible) + returnedBatch[routerKeyString] = lavasession.RPCProviderEndpoint{ NetworkAddress: rpcProviderEndpoint.NetworkAddress, ChainID: rpcProviderEndpoint.ChainID, ApiInterface: rpcProviderEndpoint.ApiInterface, Geolocation: rpcProviderEndpoint.Geolocation, - NodeUrls: []common.NodeUrl{nodeUrl}, + NodeUrls: append([]common.NodeUrl{nodeUrl}, existingEndpoint.NodeUrls...), } - } else { - // setting the incoming url first as it might be http while existing is websocket. (we prioritize http over ws when possible) - existingEndpoint.NodeUrls = append([]common.NodeUrl{nodeUrl}, existingEndpoint.NodeUrls...) - returnedBatch[routerKey] = existingEndpoint + + return + } + + returnedBatch[routerKeyString] = lavasession.RPCProviderEndpoint{ + NetworkAddress: rpcProviderEndpoint.NetworkAddress, + ChainID: rpcProviderEndpoint.ChainID, + ApiInterface: rpcProviderEndpoint.ApiInterface, + Geolocation: rpcProviderEndpoint.Geolocation, + NodeUrls: []common.NodeUrl{nodeUrl}, } } func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser, proxyConstructor func(context.Context, uint, lavasession.RPCProviderEndpoint, ChainParser) (ChainProxy, error)) (*chainRouterImpl, error) { - chainProxyRouter := map[lavasession.RouterKey][]chainRouterEntry{} + chainProxyRouter := map[string][]chainRouterEntry{} cri := chainRouterImpl{ lock: &sync.RWMutex{}, } - requiredMap := map[requirementSt]struct{}{} - supportedMap := map[requirementSt]struct{}{} - rpcProviderEndpointBatch, err := cri.BatchNodeUrlsByServices(rpcProviderEndpoint) + requiredMap := map[string]struct{}{} // key is requirement + supportedMap := map[string]requirement{} // key is requirement + rpcProviderEndpointBatch, err := cri.BatchNodeUrlsByServices(rpcProviderEndpoint, chainParser) if err != nil { return nil, err } @@ -185,19 +261,21 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase // this function calculated all routing combinations and populates them for verification at the end of the function updateRouteCombinations := func(extensions, addons []string) (fullySupportedRouterKey lavasession.RouterKey) { allExtensionsRouterKey := lavasession.NewRouterKey(extensions) - requirement := requirementSt{ - extensions: allExtensionsRouterKey, - addon: "", + requirement := requirement{ + RouterKey: allExtensionsRouterKey, + addon: "", } for _, addon := range addons { populateRequiredForAddon(addon, extensions, requiredMap) requirement.addon = addon - supportedMap[requirement] = struct{}{} + supportedMap[requirement.String()] = requirement addonsSupportedMap[addon] = struct{}{} } return allExtensionsRouterKey } routerKey := updateRouteCombinations(extensions, addons) + routerKey.ApplyInternalPath(rpcProviderEndpointEntry.NodeUrls[0].InternalPath) + routerKeyStr := routerKey.String() methodsRouted := map[string]struct{}{} methods := rpcProviderEndpointEntry.NodeUrls[0].Methods if len(methods) > 0 { @@ -216,14 +294,14 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase addonsSupported: addonsSupportedMap, methodsRouted: methodsRouted, } - if chainRouterEntries, ok := chainProxyRouter[routerKey]; !ok { - chainProxyRouter[routerKey] = []chainRouterEntry{chainRouterEntryInst} + if chainRouterEntries, ok := chainProxyRouter[routerKeyStr]; !ok { + chainProxyRouter[routerKeyStr] = []chainRouterEntry{chainRouterEntryInst} } else { if len(methodsRouted) > 0 { // if there are routed methods we want this in the beginning to intercept them - chainProxyRouter[routerKey] = append([]chainRouterEntry{chainRouterEntryInst}, chainRouterEntries...) + chainProxyRouter[routerKeyStr] = append([]chainRouterEntry{chainRouterEntryInst}, chainRouterEntries...) } else { - chainProxyRouter[routerKey] = append(chainRouterEntries, chainRouterEntryInst) + chainProxyRouter[routerKeyStr] = append(chainRouterEntries, chainRouterEntryInst) } } } @@ -234,9 +312,10 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase _, apiCollection, hasSubscriptionInSpec := chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_SUBSCRIBE) // validating we have websocket support for subscription supported specs. webSocketSupported := false - for key := range supportedMap { - if key.IsRequirementMet(WebSocketExtension) { + for _, requirement := range supportedMap { + if requirement.IsRequirementMet(WebSocketExtension) { webSocketSupported = true + break } } if hasSubscriptionInSpec && apiCollection.Enabled && !webSocketSupported { @@ -260,40 +339,37 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase } cri.chainProxyRouter = chainProxyRouter + utils.LavaFormatDebug("chainRouter created", utils.LogAttr("chainProxyRouter", chainProxyRouter)) return &cri, nil } -type requirementSt struct { - extensions lavasession.RouterKey - addon string +type requirement struct { + lavasession.RouterKey + addon string } -func (rs *requirementSt) String() string { - return string(rs.extensions) + rs.addon +func (rs *requirement) String() string { + return rs.RouterKey.String() + "addon:" + rs.addon + lavasession.RouterKeySeparator } -func (rs *requirementSt) IsRequirementMet(requirement string) bool { - return strings.Contains(string(rs.extensions), requirement) || strings.Contains(rs.addon, requirement) +func (rs *requirement) IsRequirementMet(requirement string) bool { + return rs.RouterKey.HasExtension(requirement) || strings.Contains(rs.addon, requirement) } -func populateRequiredForAddon(addon string, extensions []string, required map[requirementSt]struct{}) { - if len(extensions) == 0 { - required[requirementSt{ - extensions: lavasession.NewRouterKey([]string{}), - addon: addon, - }] = struct{}{} - return +func populateRequiredForAddon(addon string, extensions []string, required map[string]struct{}) { + requirement := requirement{ + RouterKey: lavasession.NewRouterKey(extensions), + addon: addon, } - requirement := requirementSt{ - extensions: lavasession.NewRouterKey(extensions), - addon: addon, - } - if _, ok := required[requirement]; ok { + + requirementKey := requirement.String() + if _, ok := required[requirementKey]; ok { // already handled return } - required[requirement] = struct{}{} + + required[requirementKey] = struct{}{} for i := 0; i < len(extensions); i++ { extensionsWithoutI := make([]string, len(extensions)-1) copy(extensionsWithoutI[:i], extensions[:i]) diff --git a/protocol/chainlib/chain_router_test.go b/protocol/chainlib/chain_router_test.go index 4705f08927..d7e2ca6cbe 100644 --- a/protocol/chainlib/chain_router_test.go +++ b/protocol/chainlib/chain_router_test.go @@ -2,6 +2,7 @@ package chainlib import ( "context" + "fmt" "log" "net" "os" @@ -1110,7 +1111,7 @@ func TestChainRouterWithMethodRoutes(t *testing.T) { } chainMsg, err := chainParser.ParseMsg(api, nil, "", nil, extension) require.NoError(t, err) - chainProxy, err := chainRouter.GetChainProxySupporting(ctx, chainMsg.GetApiCollection().CollectionData.AddOn, common.GetExtensionNames(chainMsg.GetExtensions()), api) + chainProxy, err := chainRouter.GetChainProxySupporting(ctx, chainMsg.GetApiCollection().CollectionData.AddOn, common.GetExtensionNames(chainMsg.GetExtensions()), api, "") require.NoError(t, err) _, urlFromProxy := chainProxy.GetChainProxyInformation() require.Equal(t, url, urlFromProxy, "chainMsg: %+v, ---chainRouter: %+v", chainMsg, chainRouter) @@ -1202,3 +1203,846 @@ func TestMain(m *testing.M) { listener.Close() os.Exit(code) } + +func TestChainRouterWithInternalPaths(t *testing.T) { + type play struct { + name string + specApiCollections []*spectypes.ApiCollection + apiInterface string + nodeUrls []common.NodeUrl + expectedServicesToNodeUrls map[string][]common.NodeUrl + expectedError bool + } + + playBook := []play{} + + apiInterfaces := []string{spectypes.APIInterfaceJsonRPC, spectypes.APIInterfaceTendermintRPC} + for _, apiInterface := range apiInterfaces { + playBook = append(playBook, []play{ + { + name: "No internal paths in spec - single http node url configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + }, + }, + { + name: "No internal paths in spec - multiple http node urls configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "https://localhost:5678", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": { + {Url: "https://localhost:1234", InternalPath: ""}, + {Url: "https://localhost:5678", InternalPath: ""}, + }, + }, + }, + { + name: "No internal paths in spec - single ws node url - should error", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "wss://localhost:1234/ws", + InternalPath: "", + }, + }, + expectedError: true, + }, + { + name: "No internal paths in spec - both ws and http node urls", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "wss://localhost:1234/ws", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "|websocket|": {{Url: "wss://localhost:1234/ws", InternalPath: ""}}, + }, + }, + { + name: "With internal paths in spec - single http node url configured - not covering all internal paths", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:1234/Y", InternalPath: "/Y"}}, + }, + }, + { + name: "With internal paths in spec - multiple http node urls configured - covering some internal paths", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "https://localhost:1234/X", + InternalPath: "/X", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:1234/Y", InternalPath: "/Y"}}, + }, + }, + { + name: "With internal paths in spec - multiple http node urls configured - covering all internal paths", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "https://localhost:1234/X", + InternalPath: "/X", + }, + { + Url: "https://localhost:1234/Y", + InternalPath: "/Y", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:1234/Y", InternalPath: "/Y"}}, + }, + }, + { + name: "With internal paths in spec - multiple http node urls configured - no root internal path - should error", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234/X", + InternalPath: "/X", + }, + { + Url: "https://localhost:1234/Y", + InternalPath: "/Y", + }, + }, + expectedError: true, + }, + { + name: "With internal paths in spec - multiple http node urls and ws configured - covering all internal paths", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "wss://localhost:1234/ws", + InternalPath: "", + }, + { + Url: "https://localhost:5678", + InternalPath: "", + }, + { + Url: "https://localhost:5678/X", + InternalPath: "/X", + }, + { + Url: "https://localhost:9012/Y", + InternalPath: "/Y", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:5678", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:5678/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:9012/Y", InternalPath: "/Y"}}, + "|websocket|": {{Url: "wss://localhost:1234/ws", InternalPath: ""}}, + }, + }, + { + name: "With internal paths in spec - only root http and ws configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "wss://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:1234/Y", InternalPath: "/Y"}}, + "|websocket|": {{Url: "wss://localhost:1234", InternalPath: ""}}, + }, + }, + { + name: "With internal paths in spec - only root http and ws and one out of two internal paths are configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/Y", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "https://localhost:5678/X", + InternalPath: "/X", + }, + { + Url: "wss://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:5678/X", InternalPath: "/X"}}, + "||internal-path:/Y|": {{Url: "https://localhost:1234/Y", InternalPath: "/Y"}}, + "|websocket|": {{Url: "wss://localhost:1234", InternalPath: ""}}, + }, + }, + { + name: "With internal paths and ws internal paths in spec - only http is configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/WS", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + ParseDirectives: []*spectypes.ParseDirective{{ + FunctionTag: spectypes.FUNCTION_TAG_SUBSCRIBE, + }}, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/WS|": {{Url: "https://localhost:1234/WS", InternalPath: "/WS"}}, + }, + }, + { + name: "With internal paths and ws internal paths in spec - http and ws is configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/WS", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + ParseDirectives: []*spectypes.ParseDirective{{ + FunctionTag: spectypes.FUNCTION_TAG_SUBSCRIBE, + }}, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "wss://localhost:5678", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/WS|": {{Url: "https://localhost:1234/WS", InternalPath: "/WS"}}, + "|websocket|": {{Url: "wss://localhost:5678", InternalPath: ""}}, + "|websocket|internal-path:/WS|": {{Url: "wss://localhost:5678/WS", InternalPath: "/WS"}}, + }, + }, + { + name: "With internal paths and multiple ws internal paths in spec - http and ws is configured", + apiInterface: apiInterface, + specApiCollections: []*spectypes.ApiCollection{ + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/X", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + ParseDirectives: []*spectypes.ParseDirective{{ + FunctionTag: spectypes.FUNCTION_TAG_SUBSCRIBE, + }}, + }, + { + Enabled: true, + CollectionData: spectypes.CollectionData{ + ApiInterface: apiInterface, + InternalPath: "/WS", + Type: "POST", + AddOn: "", + }, + InheritanceApis: []*spectypes.CollectionData{ + { + ApiInterface: apiInterface, + InternalPath: "", + Type: "POST", + AddOn: "", + }, + }, + ParseDirectives: []*spectypes.ParseDirective{{ + FunctionTag: spectypes.FUNCTION_TAG_SUBSCRIBE, + }}, + }, + }, + nodeUrls: []common.NodeUrl{ + { + Url: "https://localhost:1234", + InternalPath: "", + }, + { + Url: "wss://localhost:1234", + InternalPath: "", + }, + }, + expectedServicesToNodeUrls: map[string][]common.NodeUrl{ + "||": {{Url: "https://localhost:1234", InternalPath: ""}}, + "||internal-path:/X|": {{Url: "https://localhost:1234/X", InternalPath: "/X"}}, + "||internal-path:/WS|": {{Url: "https://localhost:1234/WS", InternalPath: "/WS"}}, + "|websocket|": {{Url: "wss://localhost:1234", InternalPath: ""}}, + "|websocket|internal-path:/WS|": {{Url: "wss://localhost:1234/WS", InternalPath: "/WS"}}, + "|websocket|internal-path:/X|": {{Url: "wss://localhost:1234/X", InternalPath: "/X"}}, + }, + }, + }...) + } + + for _, play := range playBook { + t.Run(play.apiInterface+" - "+play.name, func(t *testing.T) { + chainParser, err := NewChainParser(play.apiInterface) + require.NoError(t, err) + + IgnoreSubscriptionNotConfiguredError = false + + spec := testcommon.CreateMockSpec() + spec.ApiCollections = play.specApiCollections + chainParser.SetSpec(spec) + + endpoint := lavasession.RPCProviderEndpoint{ + NetworkAddress: lavasession.NetworkAddressData{}, + ChainID: spec.Index, + ApiInterface: play.apiInterface, + Geolocation: 1, + NodeUrls: play.nodeUrls, + } + + chainRouter := &chainRouterImpl{} + + nodeUrlsByService, err := chainRouter.BatchNodeUrlsByServices(endpoint, chainParser) + if play.expectedError { + require.Error(t, err) + return + } + + require.NoError(t, err) + + require.Equal(t, len(play.expectedServicesToNodeUrls), len(nodeUrlsByService), nodeUrlsByService) + actualNodeUrlsCount := 0 + for routerKey, actualEndpoint := range nodeUrlsByService { + // Check that the router key is in the expected services + require.Contains(t, play.expectedServicesToNodeUrls, routerKey, routerKey) + actualNodeUrlsCount += len(actualEndpoint.NodeUrls) + + expectedNodeUrls := play.expectedServicesToNodeUrls[routerKey] + require.Len(t, actualEndpoint.NodeUrls, len(expectedNodeUrls), + fmt.Sprintf("RouterKey: %v, NodeUrls: %v", routerKey, actualEndpoint.NodeUrls)) + + for _, actualNodeUrl := range actualEndpoint.NodeUrls { + found := false + for _, expectedNodeUrls := range expectedNodeUrls { + if expectedNodeUrls.Url == actualNodeUrl.Url && expectedNodeUrls.InternalPath == actualNodeUrl.InternalPath { + found = true + break + } + } + require.True(t, found, actualNodeUrl) + } + } + }) + } +} diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 6fbf4ba536..93d604e773 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -63,9 +63,11 @@ type ChainParser interface { DataReliabilityParams() (enabled bool, dataReliabilityThreshold uint32) ChainBlockStats() (allowedBlockLagForQosSync int64, averageBlockTime time.Duration, blockDistanceForFinalizedData, blocksInFinalizationProof uint32) GetParsingByTag(tag spectypes.FUNCTION_TAG) (parsing *spectypes.ParseDirective, apiCollection *spectypes.ApiCollection, existed bool) + IsTagInCollection(tag spectypes.FUNCTION_TAG, collectionKey CollectionKey) bool + GetAllInternalPaths() []string CraftMessage(parser *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) HandleHeaders(metadata []pairingtypes.Metadata, apiCollection *spectypes.ApiCollection, headersDirection spectypes.Header_HeaderType) (filtered []pairingtypes.Metadata, overwriteReqBlock string, ignoredMetadata []pairingtypes.Metadata) - GetVerifications(supported []string) ([]VerificationContainer, error) + GetVerifications(supported []string, internalPath string, apiInterface string) ([]VerificationContainer, error) SeparateAddonsExtensions(supported []string) (addons, extensions []string, err error) SetPolicy(policy PolicyInf, chainId string, apiInterface string) error Active() bool diff --git a/protocol/chainlib/chainproxy/rpcclient/handler.go b/protocol/chainlib/chainproxy/rpcclient/handler.go index b2775326fc..bb1aa31199 100755 --- a/protocol/chainlib/chainproxy/rpcclient/handler.go +++ b/protocol/chainlib/chainproxy/rpcclient/handler.go @@ -242,6 +242,12 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { return true } return false + case msg.isStarkNetPathfinderNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResultStarkNetPathfinder(msg) + return true + } + return false case msg.isResponse(): h.handleResponse(msg) h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) @@ -251,10 +257,31 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { } } +func (h *handler) handleSubscriptionResultStarkNetPathfinder(msg *JsonrpcMessage) { + var result starkNetPathfinderSubscriptionResult + if err := json.Unmarshal(msg.Result, &result); err != nil { + utils.LavaFormatTrace("Dropping invalid starknet pathfinder subscription message", + utils.LogAttr("err", err), + utils.LogAttr("result", string(msg.Result)), + ) + h.log.Debug("Dropping invalid subscription message") + return + } + + id := strconv.Itoa(result.ID) + if h.clientSubs[id] != nil { + h.clientSubs[id].deliver(msg) + } +} + // handleSubscriptionResult processes subscription notifications. func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) { var result ethereumSubscriptionResult if err := json.Unmarshal(msg.Params, &result); err != nil { + utils.LavaFormatTrace("Dropping invalid ethereum subscription message", + utils.LogAttr("err", err), + utils.LogAttr("params", string(msg.Params)), + ) h.log.Debug("Dropping invalid subscription message") return } @@ -266,6 +293,10 @@ func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) { func (h *handler) handleSubscriptionResultTendermint(msg *JsonrpcMessage) { var result tendermintSubscriptionResult if err := json.Unmarshal(msg.Result, &result); err != nil { + utils.LavaFormatTrace("Dropping invalid tendermint subscription message", + utils.LogAttr("err", err), + utils.LogAttr("result", string(msg.Result)), + ) h.log.Debug("Dropping invalid subscription message") return } @@ -302,6 +333,15 @@ func (h *handler) handleResponse(msg *JsonrpcMessage) { } else if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { go op.sub.run() h.clientSubs[op.sub.subid] = op.sub + } else { + // This is because StarkNet Pathfinder is returning an integer instead of a string in the result + var integerSubId int + if json.Unmarshal(msg.Result, &integerSubId) == nil { + op.err = nil + op.sub.subid = strconv.Itoa(integerSubId) + go op.sub.run() + h.clientSubs[op.sub.subid] = op.sub + } } } diff --git a/protocol/chainlib/chainproxy/rpcclient/json.go b/protocol/chainlib/chainproxy/rpcclient/json.go index 6ee84fd4e9..d2c1c5d777 100755 --- a/protocol/chainlib/chainproxy/rpcclient/json.go +++ b/protocol/chainlib/chainproxy/rpcclient/json.go @@ -49,6 +49,11 @@ type ethereumSubscriptionResult struct { Result json.RawMessage `json:"result,omitempty"` } +type starkNetPathfinderSubscriptionResult struct { + ID int `json:"subscription"` + Result json.RawMessage `json:"result,omitempty"` +} + type tendermintSubscriptionResult struct { Query string `json:"query"` } @@ -68,8 +73,12 @@ type tendermintSubscribeReply struct { Query string `json:"query"` } +func (msg *JsonrpcMessage) isStarkNetPathfinderNotification() bool { + return msg.ID == nil && msg.Method != "" && msg.Result != nil +} + func (msg *JsonrpcMessage) isEthereumNotification() bool { - return msg.ID == nil && msg.Method != "" + return msg.ID == nil && msg.Method != "" && msg.Params != nil } func (msg *JsonrpcMessage) isTendermintNotification() bool { diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index bbb4fe11d0..08a7374b4a 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/http" + "net/url" "strings" "time" @@ -53,6 +54,7 @@ type VerificationKey struct { } type VerificationContainer struct { + InternalPath string ConnectionType string Name string ParseDirective spectypes.ParseDirective @@ -435,3 +437,12 @@ func GetTimeoutInfo(chainMessage ChainMessageForSend) common.TimeoutInfo { Stateful: GetStateful(chainMessage), } } + +func IsUrlWebSocket(urlToParse string) (bool, error) { + u, err := url.Parse(urlToParse) + if err != nil { + return false, err + } + + return u.Scheme == "ws" || u.Scheme == "wss", nil +} diff --git a/protocol/chainlib/common_test.go b/protocol/chainlib/common_test.go index 7ca79f5d41..2d7a9631ca 100644 --- a/protocol/chainlib/common_test.go +++ b/protocol/chainlib/common_test.go @@ -213,7 +213,7 @@ func TestExtractDappIDFromWebsocketConnection(t *testing.T) { testCase := testCase t.Run(testCase.name, func(t *testing.T) { - url := "ws://localhost:3000" + testCase.route + url := "ws://127.0.0.1:3000" + testCase.route dialer := &websocket2.Dialer{} conn, _, err := dialer.Dial(url, testCase.headers) if err != nil { diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 1fdf0f0524..39eedb17c9 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -61,7 +61,12 @@ func (apip *JsonRPCChainParser) getSupportedApi(name, connectionType string, int func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) { if craftData != nil { - chainMessage, err := apip.ParseMsg("", craftData.Data, craftData.ConnectionType, metadata, extensionslib.ExtensionInfo{LatestBlock: 0}) + path := craftData.Path + if craftData.InternalPath != "" { + path = craftData.InternalPath + } + + chainMessage, err := apip.ParseMsg(path, craftData.Data, craftData.ConnectionType, metadata, extensionslib.ExtensionInfo{LatestBlock: 0}) if err == nil { chainMessage.AppendHeader(metadata) } @@ -496,7 +501,7 @@ func (apil *JsonRPCChainListener) GetListeningAddress() string { type JrpcChainProxy struct { BaseChainProxy - conn map[string]*chainproxy.Connector + conn *chainproxy.Connector } func NewJrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lavasession.RPCProviderEndpoint, chainParser ChainParser) (ChainProxy, error) { @@ -504,7 +509,10 @@ func NewJrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav return nil, utils.LavaFormatError("rpcProviderEndpoint.NodeUrl list is empty missing node url", nil, utils.Attribute{Key: "chainID", Value: rpcProviderEndpoint.ChainID}, utils.Attribute{Key: "ApiInterface", Value: rpcProviderEndpoint.ApiInterface}) } _, averageBlockTime, _, _ := chainParser.ChainBlockStats() + + // look for the first node url that has no internal path, otherwise take first node url nodeUrl := rpcProviderEndpoint.NodeUrls[0] + cp := &JrpcChainProxy{ BaseChainProxy: BaseChainProxy{ averageBlockTime: averageBlockTime, @@ -512,72 +520,30 @@ func NewJrpcChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint lav ErrorHandler: &JsonRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID, }, - conn: map[string]*chainproxy.Connector{}, + conn: nil, } validateEndpoints(rpcProviderEndpoint.NodeUrls, spectypes.APIInterfaceJsonRPC) - internalPaths := map[string]struct{}{} - jsonRPCChainParser, ok := chainParser.(*JsonRPCChainParser) - if ok { - internalPaths = jsonRPCChainParser.GetInternalPaths() - } - internalPathsLength := len(internalPaths) - if internalPathsLength > 0 && internalPathsLength == len(rpcProviderEndpoint.NodeUrls) { - return cp, cp.startWithSpecificInternalPaths(ctx, nConns, rpcProviderEndpoint.NodeUrls, internalPaths) - } else if internalPathsLength > 0 && len(rpcProviderEndpoint.NodeUrls) > 1 { - // provider provided specific endpoints but not enough to fill all requirements - return nil, utils.LavaFormatError("Internal Paths specified but not all paths provided", nil, utils.Attribute{Key: "required", Value: internalPaths}, utils.Attribute{Key: "provided", Value: rpcProviderEndpoint.NodeUrls}) - } - return cp, cp.start(ctx, nConns, nodeUrl, internalPaths) + return cp, cp.start(ctx, nConns, nodeUrl) } -func (cp *JrpcChainProxy) startWithSpecificInternalPaths(ctx context.Context, nConns uint, nodeUrls []common.NodeUrl, internalPaths map[string]struct{}) error { - for _, url := range nodeUrls { - _, ok := internalPaths[url.InternalPath] - if !ok { - return utils.LavaFormatError("url.InternalPath was not found in internalPaths", nil, utils.Attribute{Key: "internalPaths", Value: internalPaths}, utils.Attribute{Key: "url.InternalPath", Value: url.InternalPath}) - } - utils.LavaFormatDebug("connecting", utils.Attribute{Key: "url", Value: url.String()}) - conn, err := chainproxy.NewConnector(ctx, nConns, url) - if err != nil { - return err - } - cp.conn[url.InternalPath] = conn - } - if len(cp.conn) != len(internalPaths) { - return utils.LavaFormatError("missing connectors for a chain with internal paths", nil, utils.Attribute{Key: "internalPaths", Value: internalPaths}, utils.Attribute{Key: "nodeUrls", Value: nodeUrls}) - } - return nil -} - -func (cp *JrpcChainProxy) start(ctx context.Context, nConns uint, nodeUrl common.NodeUrl, internalPaths map[string]struct{}) error { - if len(internalPaths) == 0 { - internalPaths = map[string]struct{}{"": {}} // add default path +func (cp *JrpcChainProxy) start(ctx context.Context, nConns uint, nodeUrl common.NodeUrl) error { + conn, err := chainproxy.NewConnector(ctx, nConns, nodeUrl) + if err != nil { + return err } - basePath := nodeUrl.Url - for path := range internalPaths { - nodeUrl.Url = basePath + path - conn, err := chainproxy.NewConnector(ctx, nConns, nodeUrl) - if err != nil { - return err - } - cp.conn[path] = conn - if cp.conn == nil { - return errors.New("g_conn == nil") - } - } + cp.conn = conn return nil } func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpcInterfaceMessages.JsonrpcBatchMessage, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, err error) { - internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath - rpc, err := cp.conn[internalPath].GetRpc(ctx, true) + rpc, err := cp.conn.GetRpc(ctx, true) if err != nil { return nil, err } - defer cp.conn[internalPath].ReturnRpc(rpc) + defer cp.conn.ReturnRpc(rpc) if len(nodeMessage.GetHeaders()) > 0 { for _, metadata := range nodeMessage.GetHeaders() { rpc.SetHeader(metadata.Name, metadata.Value) @@ -602,7 +568,7 @@ func (cp *JrpcChainProxy) sendBatchMessage(ctx context.Context, nodeMessage *rpc } replyMsgs := make([]rpcInterfaceMessages.JsonrpcMessage, len(batch)) for idx, element := range batch { - // convert them because batch elements can't be marshaled back to the user, they are missing tags and flieds + // convert them because batch elements can't be marshaled back to the user, they are missing tags and fields replyMsgs[idx], err = rpcInterfaceMessages.ConvertBatchElement(element) if err != nil { return nil, err @@ -637,16 +603,15 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, reply, err := cp.sendBatchMessage(ctx, batchMessage, chainMessage) return reply, "", nil, err } - internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath - connector := cp.conn[internalPath] - rpc, err := connector.GetRpc(ctx, true) + + rpc, err := cp.conn.GetRpc(ctx, true) if err != nil { return nil, "", nil, err } - defer connector.ReturnRpc(rpc) + defer cp.conn.ReturnRpc(rpc) // appending hashed url - grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, connector.GetUrlHash())) + grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, cp.conn.GetUrlHash())) // Call our node var rpcMessage *rpcclient.JsonrpcMessage diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index 885bf953d2..84bdbe4836 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -214,7 +214,7 @@ func TestAddonAndVerifications(t *testing.T) { require.NotNil(t, chainRouter) require.NotNil(t, chainFetcher) - verifications, err := chainParser.GetVerifications([]string{"debug"}) + verifications, err := chainParser.GetVerifications([]string{"debug"}, "", "jsonrpc") require.NoError(t, err) require.NotEmpty(t, verifications) for _, verification := range verifications { diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index 47622a9c2c..b66473972c 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -47,7 +47,7 @@ func (apip *RestChainParser) CraftMessage(parsing *spectypes.ParseDirective, con var data []byte = nil urlPath := string(craftData.Data) if craftData.ConnectionType == http.MethodPost { - // on post we need to send the data provided in the templace with the api as method + // on post we need to send the data provided in the template with the api as method data = craftData.Data urlPath = craftData.Path } diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 431f323930..844e6ee35d 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -61,7 +61,12 @@ func (apip *TendermintChainParser) getSupportedApi(name, connectionType string) func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirective, connectionType string, craftData *CraftData, metadata []pairingtypes.Metadata) (ChainMessageForSend, error) { if craftData != nil { - chainMessage, err := apip.ParseMsg("", craftData.Data, craftData.ConnectionType, metadata, extensionslib.ExtensionInfo{LatestBlock: 0}) + path := craftData.Path + if craftData.InternalPath != "" { + path = craftData.InternalPath + } + + chainMessage, err := apip.ParseMsg(path, craftData.Data, craftData.ConnectionType, metadata, extensionslib.ExtensionInfo{LatestBlock: 0}) if err == nil { chainMessage.AppendHeader(metadata) } @@ -601,11 +606,11 @@ func NewtendermintRpcChainProxy(ctx context.Context, nConns uint, rpcProviderEnd ErrorHandler: &TendermintRPCErrorHandler{}, ChainID: rpcProviderEndpoint.ChainID, }, - conn: map[string]*chainproxy.Connector{}, + conn: nil, }, } - return cp, cp.start(ctx, nConns, nodeUrl, nil) + return cp, cp.start(ctx, nConns, nodeUrl) } func (cp *tendermintRpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { @@ -646,8 +651,7 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc httpClient := cp.httpClient // appending hashed url - internalPath := chainMessage.GetApiCollection().GetCollectionData().InternalPath - grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, cp.conn[internalPath].GetUrlHash())) + grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, cp.conn.GetUrlHash())) // construct the url by concatenating the node url with the path variable url := cp.NodeUrl.Url + "/" + nodeMessage.Path @@ -723,19 +727,15 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpcInterfaceMessages.TendermintrpcMessage, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) { // Get rpc connection from the connection pool var rpc *rpcclient.Client - internalPath := chainMessage.GetApiCollection().CollectionData.InternalPath - - connector := cp.conn[internalPath] - - rpc, err = connector.GetRpc(ctx, true) + rpc, err = cp.conn.GetRpc(ctx, true) if err != nil { return nil, "", nil, err } // return the rpc connection to the websocket pool after the function completes - defer connector.ReturnRpc(rpc) + defer cp.conn.ReturnRpc(rpc) // appending hashed url - grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, connector.GetUrlHash())) + grpc.SetTrailer(ctx, metadata.Pairs(RPCProviderNodeAddressHash, cp.conn.GetUrlHash())) // create variables for the rpc message and reply message var rpcMessage *rpcclient.JsonrpcMessage diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 26d08ca5a5..03998cc2ff 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -76,7 +76,7 @@ func (nurl NodeUrl) String() string { urlStr := nurl.UrlStr() if len(nurl.Addons) > 0 { - return urlStr + ", addons: (" + strings.Join(nurl.Addons, ",") + ")" + return urlStr + ", addons: (" + strings.Join(nurl.Addons, ",") + ")" + ", internal-path: " + nurl.InternalPath } return urlStr } diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 2a2de957e7..53f6531900 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -52,7 +52,7 @@ type ConsumerSessionManager struct { // contains a sorted list of blocked addresses, sorted by their cu used this epoch for higher chance of response currentlyBlockedProviderAddresses []string - addonAddresses map[RouterKey][]string + addonAddresses map[string][]string // key is RouterKey.String() reportedProviders *ReportedProviders // pairingPurge - contains all pairings that are unwanted this epoch, keeps them in memory in order to avoid release. // (if a consumer session still uses one of them or we want to report it.) @@ -129,13 +129,13 @@ func (csm *ConsumerSessionManager) Initialized() bool { func (csm *ConsumerSessionManager) RemoveAddonAddresses(addon string, extensions []string) { if addon == "" && len(extensions) == 0 { // purge all - csm.addonAddresses = make(map[RouterKey][]string) + csm.addonAddresses = make(map[string][]string) } else { routerKey := NewRouterKey(append(extensions, addon)) if csm.addonAddresses == nil { - csm.addonAddresses = make(map[RouterKey][]string) + csm.addonAddresses = make(map[string][]string) } - csm.addonAddresses[routerKey] = []string{} + csm.addonAddresses[routerKey.String()] = []string{} } } @@ -153,10 +153,11 @@ func (csm *ConsumerSessionManager) CalculateAddonValidAddresses(addon string, ex // assuming csm is Rlocked func (csm *ConsumerSessionManager) getValidAddresses(addon string, extensions []string) (addresses []string) { routerKey := NewRouterKey(append(extensions, addon)) - if csm.addonAddresses == nil || csm.addonAddresses[routerKey] == nil { + routerKeyString := routerKey.String() + if csm.addonAddresses == nil || csm.addonAddresses[routerKeyString] == nil { return csm.CalculateAddonValidAddresses(addon, extensions) } - return csm.addonAddresses[routerKey] + return csm.addonAddresses[routerKeyString] } // After 2 epochs we need to close all open connections. @@ -332,7 +333,7 @@ func (csm *ConsumerSessionManager) setValidAddressesToDefaultValue(addon string, } } csm.RemoveAddonAddresses(addon, extensions) // refresh the list - csm.addonAddresses[NewRouterKey(append(extensions, addon))] = csm.CalculateAddonValidAddresses(addon, extensions) + csm.addonAddresses[NewRouterKey(append(extensions, addon)).String()] = csm.CalculateAddonValidAddresses(addon, extensions) } } @@ -375,11 +376,12 @@ func (csm *ConsumerSessionManager) cacheAddonAddresses(addon string, extensions csm.lock.Lock() // lock to set validAddresses[addon] if it's not cached defer csm.lock.Unlock() routerKey := NewRouterKey(append(extensions, addon)) - if csm.addonAddresses == nil || csm.addonAddresses[routerKey] == nil { + routerKeyString := routerKey.String() + if csm.addonAddresses == nil || csm.addonAddresses[routerKeyString] == nil { csm.RemoveAddonAddresses(addon, extensions) - csm.addonAddresses[routerKey] = csm.CalculateAddonValidAddresses(addon, extensions) + csm.addonAddresses[routerKeyString] = csm.CalculateAddonValidAddresses(addon, extensions) } - return csm.addonAddresses[routerKey] + return csm.addonAddresses[routerKeyString] } // validating we still have providers, otherwise reset valid addresses list diff --git a/protocol/lavasession/router_key.go b/protocol/lavasession/router_key.go index 671f3e780d..175d60b921 100644 --- a/protocol/lavasession/router_key.go +++ b/protocol/lavasession/router_key.go @@ -5,48 +5,89 @@ import ( "strconv" "strings" + "github.com/lavanet/lava/v4/utils/lavaslices" spectypes "github.com/lavanet/lava/v4/x/spec/types" ) const ( - sep = "|" - methodRouteSep = "method-route:" + RouterKeySeparator = "|" + methodRouteSep = "method-route:" + internalPathSep = "internal-path:" ) -type RouterKey string +type RouterKey struct { + methodsRouteUniqueKey int + uniqueExtensions []string + internalPath string +} + +func NewRouterKey(extensions []string) RouterKey { + routerKey := RouterKey{} + routerKey.SetExtensions(extensions) + return routerKey +} + +func NewRouterKeyFromExtensions(extensions []*spectypes.Extension) RouterKey { + extensionsStr := lavaslices.Map(extensions, func(extension *spectypes.Extension) string { + return extension.Name + }) + + return NewRouterKey(extensionsStr) +} -func (rk *RouterKey) ApplyMethodsRoute(routeNum int) RouterKey { - additionalPath := strconv.FormatInt(int64(routeNum), 10) - return RouterKey(string(*rk) + methodRouteSep + additionalPath) +func GetEmptyRouterKey() RouterKey { + return NewRouterKey([]string{}) } -func newRouterKeyInner(uniqueExtensions map[string]struct{}) RouterKey { +func (rk *RouterKey) SetExtensions(extensions []string) { + // make sure addons have no repetitions + uniqueExtensions := map[string]struct{}{} // init with the empty extension + if len(extensions) == 0 { + uniqueExtensions[""] = struct{}{} + } else { + for _, extension := range extensions { + uniqueExtensions[extension] = struct{}{} + } + } + uniqueExtensionsSlice := []string{} for addon := range uniqueExtensions { // we are sorting this anyway so we don't have to keep order uniqueExtensionsSlice = append(uniqueExtensionsSlice, addon) } + sort.Strings(uniqueExtensionsSlice) - return RouterKey(sep + strings.Join(uniqueExtensionsSlice, sep) + sep) + + rk.uniqueExtensions = uniqueExtensionsSlice } -func NewRouterKey(extensions []string) RouterKey { - // make sure addons have no repetitions - uniqueExtensions := map[string]struct{}{} - for _, extension := range extensions { - uniqueExtensions[extension] = struct{}{} - } - return newRouterKeyInner(uniqueExtensions) +func (rk *RouterKey) ApplyMethodsRoute(routeNum int) { + rk.methodsRouteUniqueKey = routeNum } -func NewRouterKeyFromExtensions(extensions []*spectypes.Extension) RouterKey { - // make sure addons have no repetitions - uniqueExtensions := map[string]struct{}{} - for _, extension := range extensions { - uniqueExtensions[extension.Name] = struct{}{} - } - return newRouterKeyInner(uniqueExtensions) +func (rk *RouterKey) ApplyInternalPath(internalPath string) { + rk.internalPath = internalPath } -func GetEmptyRouterKey() RouterKey { - return NewRouterKey([]string{}) +func (rk RouterKey) HasExtension(extension string) bool { + return lavaslices.Contains(rk.uniqueExtensions, extension) +} + +func (rk RouterKey) String() string { + // uniqueExtensions are sorted on init + retStr := rk.uniqueExtensions + if len(retStr) == 0 { + retStr = append(retStr, "") + } + + // if we have a route number, we add it to the key + if rk.methodsRouteUniqueKey != 0 { + retStr = append(retStr, methodRouteSep+strconv.FormatInt(int64(rk.methodsRouteUniqueKey), 10)) + } + + // if we have an internal path, we add it to the key + if rk.internalPath != "" { + retStr = append(retStr, internalPathSep+rk.internalPath) + } + + return RouterKeySeparator + strings.Join(retStr, RouterKeySeparator) + RouterKeySeparator } diff --git a/protocol/lavasession/router_key_test.go b/protocol/lavasession/router_key_test.go new file mode 100644 index 0000000000..cbd7983ebb --- /dev/null +++ b/protocol/lavasession/router_key_test.go @@ -0,0 +1,90 @@ +package lavasession + +import ( + "testing" + + spectypes "github.com/lavanet/lava/v4/x/spec/types" + "github.com/stretchr/testify/require" +) + +func TestRouterKey_SetExtensions(t *testing.T) { + rk := NewRouterKey([]string{"ext1", "ext2", "ext1"}) + require.Equal(t, "|ext1|ext2|", rk.String()) + + rk.SetExtensions([]string{"ext3", "ext2"}) + require.Equal(t, "|ext2|ext3|", rk.String()) +} + +func TestRouterKey_NewRouterKeyFromExtensions(t *testing.T) { + rk := NewRouterKeyFromExtensions([]*spectypes.Extension{ + {Name: "ext1"}, + {Name: "ext2"}, + {Name: "ext3"}, + }) + require.Equal(t, "|ext1|ext2|ext3|", rk.String()) +} + +func TestRouterKey_HasExtension(t *testing.T) { + rk := NewRouterKey([]string{"ext1", "ext2"}) + require.True(t, rk.HasExtension("ext1")) + require.False(t, rk.HasExtension("ext3")) +} + +func TestRouterKey_ApplyMethodsRoute(t *testing.T) { + rk := NewRouterKey([]string{}) + rk.ApplyMethodsRoute(42) + require.Equal(t, "||method-route:42|", rk.String()) +} + +func TestRouterKey_ApplyInternalPath(t *testing.T) { + rk := NewRouterKey([]string{}) + rk.ApplyInternalPath("/x") + require.Equal(t, "||internal-path:/x|", rk.String()) +} + +func TestRouterKey_String_NoExtensionsNoRouteNoPath(t *testing.T) { + rk := NewRouterKey([]string{}) + require.Equal(t, "||", rk.String()) +} + +func TestRouterKey_String_WithExtensionsNoRouteNoPath(t *testing.T) { + rk := NewRouterKey([]string{"ext2", "ext1"}) + require.Equal(t, "|ext1|ext2|", rk.String()) +} + +func TestRouterKey_String_WithExtensionsAndRouteNoPath(t *testing.T) { + rk := NewRouterKey([]string{"ext1", "ext2"}) + rk.ApplyMethodsRoute(42) + require.Equal(t, "|ext1|ext2|method-route:42|", rk.String()) +} + +func TestRouterKey_String_WithExtensionsRouteAndPath(t *testing.T) { + rk := NewRouterKey([]string{"ext1", "ext2"}) + rk.ApplyMethodsRoute(42) + rk.ApplyInternalPath("/x") + require.Equal(t, "|ext1|ext2|method-route:42|internal-path:/x|", rk.String()) +} + +func TestRouterKey_String_NoExtensionsWithRouteAndPath(t *testing.T) { + rk := NewRouterKey([]string{}) + rk.ApplyMethodsRoute(42) + rk.ApplyInternalPath("/x") + require.Equal(t, "||method-route:42|internal-path:/x|", rk.String()) +} + +func TestRouterKey_String_WithPathNoRouteNoExtensions(t *testing.T) { + rk := NewRouterKey([]string{}) + rk.ApplyInternalPath("/another/path") + require.Equal(t, "||internal-path:/another/path|", rk.String()) +} + +func TestGetEmptyRouterKey(t *testing.T) { + rk := GetEmptyRouterKey() + require.Equal(t, "||", rk.String()) +} + +func TestRouterKey_SetExtensions_EmptySlice(t *testing.T) { + rk := NewRouterKey([]string{}) + rk.SetExtensions([]string{}) + require.Equal(t, "||", rk.String()) +} diff --git a/protocol/lavasession/used_providers.go b/protocol/lavasession/used_providers.go index ec5820f9a3..97cc1fce8a 100644 --- a/protocol/lavasession/used_providers.go +++ b/protocol/lavasession/used_providers.go @@ -25,7 +25,7 @@ func NewUsedProviders(blockedProviders BlockedProvidersInf) *UsedProviders { } } return &UsedProviders{ - uniqueUsedProviders: map[RouterKey]*UniqueUsedProviders{NewRouterKey([]string{}): { + uniqueUsedProviders: map[string]*UniqueUsedProviders{GetEmptyRouterKey().String(): { providers: map[string]struct{}{}, unwantedProviders: unwantedProviders, blockOnSyncLoss: map[string]struct{}{}, @@ -48,7 +48,7 @@ type UniqueUsedProviders struct { type UsedProviders struct { lock sync.RWMutex - uniqueUsedProviders map[RouterKey]*UniqueUsedProviders + uniqueUsedProviders map[string]*UniqueUsedProviders originalUnwantedProviders map[string]struct{} selecting bool sessionsLatestBatch int @@ -125,7 +125,8 @@ func (up *UsedProviders) AllUnwantedAddresses() []string { // if it does, return it. If it doesn't // creating a new instance and returning it. func (up *UsedProviders) createOrUseUniqueUsedProvidersForKey(key RouterKey) *UniqueUsedProviders { - uniqueUsedProviders, ok := up.uniqueUsedProviders[key] + keyString := key.String() + uniqueUsedProviders, ok := up.uniqueUsedProviders[keyString] if !ok { uniqueUsedProviders = &UniqueUsedProviders{ providers: map[string]struct{}{}, @@ -133,7 +134,7 @@ func (up *UsedProviders) createOrUseUniqueUsedProvidersForKey(key RouterKey) *Un blockOnSyncLoss: map[string]struct{}{}, erroredProviders: map[string]struct{}{}, } - up.uniqueUsedProviders[key] = uniqueUsedProviders + up.uniqueUsedProviders[keyString] = uniqueUsedProviders } return uniqueUsedProviders } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 89a99d5f98..32aec0011e 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -30,6 +30,7 @@ import ( "github.com/lavanet/lava/v4/protocol/statetracker/updaters" "github.com/lavanet/lava/v4/protocol/upgrade" "github.com/lavanet/lava/v4/utils" + "github.com/lavanet/lava/v4/utils/lavaslices" "github.com/lavanet/lava/v4/utils/rand" "github.com/lavanet/lava/v4/utils/sigs" epochstorage "github.com/lavanet/lava/v4/x/epochstorage/types" @@ -355,11 +356,20 @@ func GetAllAddonsAndExtensionsFromNodeUrlSlice(nodeUrls []common.NodeUrl) *Provi return policy } +func GetAllNodeUrlsInternalPaths(nodeUrls []common.NodeUrl) []string { + paths := []string{} + for _, nodeUrl := range nodeUrls { + paths = append(paths, nodeUrl.InternalPath) + } + return paths +} + func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) error { err := rpcProviderEndpoint.Validate() if err != nil { return utils.LavaFormatError("[PANIC] panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) } + chainID := rpcProviderEndpoint.ChainID apiInterface := rpcProviderEndpoint.ApiInterface providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, rpcp.blockMemorySize) @@ -375,6 +385,18 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint return utils.LavaFormatError("[PANIC] failed to RegisterForSpecUpdates, panic severity critical error, aborting support for chain api due to invalid chain parser, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) } + // warn if not all internal paths are configured + configuredInternalPaths := GetAllNodeUrlsInternalPaths(rpcProviderEndpoint.NodeUrls) + chainInternalPaths := chainParser.GetAllInternalPaths() + overConfiguredInternalPaths := lavaslices.Difference(configuredInternalPaths, chainInternalPaths) + if len(overConfiguredInternalPaths) > 0 { + utils.LavaFormatWarning("Some configured internal paths are not in the chain's spec", nil, + utils.LogAttr("chainID", chainID), + utils.LogAttr("apiInterface", apiInterface), + utils.LogAttr("internalPaths", strings.Join(overConfiguredInternalPaths, ",")), + ) + } + // after registering for spec updates our chain parser contains the spec and we can add our addons and extensions to allow our provider to function properly providerPolicy := GetAllAddonsAndExtensionsFromNodeUrlSlice(rpcProviderEndpoint.NodeUrls) utils.LavaFormatDebug("supported services for provider", diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 4ca070c3b3..53765c3b6b 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -795,7 +795,7 @@ func (rpcps *RPCProviderServer) TryRelay(ctx context.Context, request *pairingty } } else if len(request.RelayData.Extensions) > 0 { // if cached, Add Archive trailer if requested by the consumer. - grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RPCProviderNodeExtension, string(lavasession.NewRouterKey(request.RelayData.Extensions)))) + grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RPCProviderNodeExtension, lavasession.NewRouterKey(request.RelayData.Extensions).String())) } if dataReliabilityEnabled { diff --git a/scripts/init_chain_commands.sh b/scripts/init_chain_commands.sh index e9ccefeeb5..295f201531 100755 --- a/scripts/init_chain_commands.sh +++ b/scripts/init_chain_commands.sh @@ -64,7 +64,7 @@ wait_count_blocks 2 echo; echo "#### Voting on plans del proposal ####" lavad tx gov vote $(latest_vote) yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE -echo; echo "#### Sending proposal for plans del ####" +echo; echo "#### Buy DefaultPlan subscription for user1 ####" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) --enable-auto-renewal -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE # wait_count_blocks 2 # lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_addon.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE diff --git a/utils/lavaslices/slices.go b/utils/lavaslices/slices.go index abad07d4f1..56b385d38e 100644 --- a/utils/lavaslices/slices.go +++ b/utils/lavaslices/slices.go @@ -131,6 +131,15 @@ func Contains[T comparable](slice []T, elem T) bool { return false } +func ContainsPredicate[T comparable](slice []T, predicate func(elem T) bool) bool { + for _, e := range slice { + if predicate(e) { + return true + } + } + return false +} + // Remove removes the first instance (if exists) of elem from the slice, and // returns the new slice and indication if removal took place. func Remove[T comparable](slice []T, elem T) ([]T, bool) { @@ -237,6 +246,28 @@ func UnionByFunc[T ComparableByFunc](arrays ...[]T) []T { return res } +func Difference[T comparable](slice1, slice2 []T) []T { + // This function returns the difference between two slices + // (i.e., the elements that are in slice1 but not in slice2) + + // Create a map to store elements of the second slice for quick lookup + elementMap := make(map[T]bool) + for _, elem := range slice2 { + elementMap[elem] = true + } + + // Create a slice to hold the difference + diff := make([]T, 0) + for _, elem := range slice1 { + // If the element in slice1 is not in slice2, add it to the result + if !elementMap[elem] { + diff = append(diff, elem) + } + } + + return diff +} + func Map[T, V any](slice []T, filter func(T) V) []V { values := make([]V, len(slice)) for i := range slice { @@ -245,6 +276,14 @@ func Map[T, V any](slice []T, filter func(T) V) []V { return values } +func KeysSlice[T comparable, V any](in map[T]V) []T { + keys := []T{} + for k := range in { + keys = append(keys, k) + } + return keys +} + func Filter[T any](slice []T, filter func(T) bool) []T { values := make([]T, 0) for _, v := range slice { diff --git a/utils/lavaslices/slices_test.go b/utils/lavaslices/slices_test.go index 4a5880a8a2..8ae4f1bb39 100644 --- a/utils/lavaslices/slices_test.go +++ b/utils/lavaslices/slices_test.go @@ -2,6 +2,7 @@ package lavaslices import ( "math" + "reflect" "testing" "time" @@ -510,3 +511,56 @@ func TestSliceSplitter(t *testing.T) { } } } + +func TestDifference(t *testing.T) { + tests := []struct { + name string + slice1 []int + slice2 []int + expected []int + }{ + { + name: "Basic difference", + slice1: []int{1, 2, 3, 4}, + slice2: []int{3, 4, 5, 6}, + expected: []int{1, 2}, + }, + { + name: "No difference", + slice1: []int{1, 2, 3}, + slice2: []int{1, 2, 3}, + expected: []int{}, + }, + { + name: "All elements different", + slice1: []int{1, 2, 3}, + slice2: []int{4, 5, 6}, + expected: []int{1, 2, 3}, + }, + { + name: "Empty first slice", + slice1: []int{}, + slice2: []int{1, 2, 3}, + expected: []int{}, + }, + { + name: "Empty second slice", + slice1: []int{1, 2, 3}, + slice2: []int{}, + expected: []int{1, 2, 3}, + }, + { + name: "Mixed elements", + slice1: []int{1, 2, 2, 3, 4}, + slice2: []int{2, 4}, + expected: []int{1, 3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := Difference(tt.slice1, tt.slice2) + require.True(t, reflect.DeepEqual(result, tt.expected)) + }) + } +}