diff --git a/go.mod b/go.mod index 09b5d663..ddd90ccc 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/naturalselectionlabs/rss3-node go 1.21 require ( + github.com/avast/retry-go/v4 v4.5.1 github.com/ethereum/go-ethereum v1.13.5 github.com/samber/lo v1.38.1 github.com/shopspring/decimal v1.3.1 diff --git a/go.sum b/go.sum index 0ed05b3c..62b272ce 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDO github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= +github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= +github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo= diff --git a/provider/ipfs/client_http.go b/provider/ipfs/client_http.go new file mode 100644 index 00000000..3979b5aa --- /dev/null +++ b/provider/ipfs/client_http.go @@ -0,0 +1,174 @@ +package ipfs + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/avast/retry-go/v4" + syncx "github.com/naturalselectionlabs/rss3-node/common/sync" + "github.com/samber/lo" +) + +var ( + ErrorUnsupportedMode = errors.New("unsupported mode") + ErrorNoResults = errors.New("no results") +) + +// FetchMode is the mode of fetching data from IPFS gateways. +type FetchMode string + +const ( + FetchModeQuick FetchMode = "quick" + FetchModeStable FetchMode = "stable" + + DefaultTimeout = 3 * time.Second + DefaultAttempts = 3 +) + +type HTTPClient interface { + Fetch(ctx context.Context, path string, fetchMode FetchMode) (io.ReadCloser, error) +} + +var _ HTTPClient = (*httpClient)(nil) + +type httpClient struct { + httpClient *http.Client + gateways []string + attempts uint + locker sync.RWMutex +} + +func (h *httpClient) Fetch(ctx context.Context, path string, fetchMode FetchMode) (readCloser io.ReadCloser, err error) { + retryableFunc := func() error { + switch fetchMode { + case FetchModeStable: + readCloser, err = h.fetchStable(ctx, path) + case FetchModeQuick: + readCloser, err = h.fetchQuick(ctx, path) + default: + return fmt.Errorf("%w %s", ErrorUnsupportedMode, fetchMode) + } + + return err + } + + retryIfFunc := func(err error) bool { + return !errors.Is(err, ErrorUnsupportedMode) + } + + if err := retry.Do(retryableFunc, retry.Attempts(h.attempts), retry.RetryIf(retryIfFunc)); err != nil { + return nil, fmt.Errorf("retry attempts: %w", err) + } + + return readCloser, nil +} + +func (h *httpClient) fetch(ctx context.Context, gateway string, path string) (io.ReadCloser, error) { + fileURL, err := url.JoinPath(gateway, path) + if err != nil { + return nil, fmt.Errorf("invalid gateway and path: %w", err) + } + + request, err := http.NewRequestWithContext(ctx, http.MethodGet, fileURL, nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + response, err := h.httpClient.Do(request) + if err != nil { + return nil, fmt.Errorf("send request: %w", err) + } + + if response.StatusCode != http.StatusOK { + defer lo.Try(response.Body.Close) + + return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + return response.Body, nil +} + +func (h *httpClient) fetchStable(ctx context.Context, path string) (readCloser io.ReadCloser, err error) { + h.locker.RLock() + defer h.locker.RUnlock() + + for _, gateway := range h.gateways { + if readCloser, err = h.fetch(ctx, gateway, path); err == nil { + return readCloser, nil + } + } + + return nil, ErrorNoResults +} + +func (h *httpClient) fetchQuick(ctx context.Context, path string) (io.ReadCloser, error) { + h.locker.RLock() + defer h.locker.RUnlock() + + quickGroup := syncx.NewQuickGroup[io.ReadCloser](ctx) + + for _, gateway := range h.gateways { + gateway := gateway + + quickGroup.Go(func(ctx context.Context) (io.ReadCloser, error) { + return h.fetch(ctx, gateway, path) + }) + } + + result, err := quickGroup.Wait() + if err != nil { + return nil, ErrorNoResults + } + + return result, nil +} + +func NewHTTPClient(options ...HTTPClientOption) (HTTPClient, error) { + instance := httpClient{ + httpClient: &http.Client{ + Timeout: DefaultTimeout, + }, + gateways: DefaultGateways, + attempts: DefaultAttempts, + } + + for _, option := range options { + if err := option(&instance); err != nil { + return nil, fmt.Errorf("apply options: %w", err) + } + } + + return &instance, nil +} + +type HTTPClientOption func(*httpClient) error + +func WithAttempts(attempts uint) HTTPClientOption { + return func(h *httpClient) error { + h.attempts = attempts + + return nil + } +} + +func WithTimeout(timeout time.Duration) HTTPClientOption { + return func(h *httpClient) error { + h.httpClient.Timeout = timeout + + return nil + } +} + +func WithGateways(gateways []string) HTTPClientOption { + return func(h *httpClient) error { + h.gateways = gateways // Overwrite gateways. + + return nil + } +} diff --git a/provider/ipfs/client_http_test.go b/provider/ipfs/client_http_test.go new file mode 100644 index 00000000..365a6740 --- /dev/null +++ b/provider/ipfs/client_http_test.go @@ -0,0 +1,99 @@ +package ipfs_test + +import ( + "context" + "io" + "testing" + "time" + + "github.com/naturalselectionlabs/rss3-node/provider/ipfs" + "github.com/stretchr/testify/require" +) + +func TestHttpClient_Fetch(t *testing.T) { + t.Parallel() + + type arguments struct { + ctx context.Context + options []ipfs.HTTPClientOption + path string + fetchMode ipfs.FetchMode + } + + testcases := []struct { + name string + arguments arguments + want require.ValueAssertionFunc + wantError require.ErrorAssertionFunc + }{ + { + name: "Crossbell profile of kallydev.csb", + arguments: arguments{ + ctx: context.Background(), + options: []ipfs.HTTPClientOption{ + ipfs.WithTimeout(10 * time.Second), + }, + path: "/ipfs/QmRohM66fF9WLqoLTCi6qQEtLiav4JMEsah21nNMeXxEfx", + fetchMode: ipfs.FetchModeStable, + }, + want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) { + length, ok := v.(int64) + require.True(t, ok) + require.Greater(t, length, int64(0)) + }, + wantError: require.NoError, + }, + { + name: "Crossbell avatar of kallydev.csb", + arguments: arguments{ + ctx: context.Background(), + options: []ipfs.HTTPClientOption{ + ipfs.WithTimeout(10 * time.Second), + }, + path: "/ipfs/QmPkTNGYSUDx5n9hzEDgM19xd2aRTZMfwCuvhcPk3Qazhh", + fetchMode: ipfs.FetchModeStable, + }, + want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) { + length, ok := v.(int64) + require.True(t, ok) + require.Greater(t, length, int64(0)) + }, + wantError: require.NoError, + }, + { + name: "Token lists of Uniswap", + arguments: arguments{ + ctx: context.Background(), + options: []ipfs.HTTPClientOption{ + ipfs.WithTimeout(10 * time.Second), + }, + path: "/ipns/tokens.uniswap.org", + fetchMode: ipfs.FetchModeStable, + }, + want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) { + length, ok := v.(int64) + require.True(t, ok) + require.Greater(t, length, int64(0)) + }, + wantError: require.NoError, + }, + } + + for _, testcase := range testcases { + testcase := testcase + + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + + httpClient, err := ipfs.NewHTTPClient(testcase.arguments.options...) + testcase.wantError(t, err) + + result, err := httpClient.Fetch(testcase.arguments.ctx, testcase.arguments.path, testcase.arguments.fetchMode) + testcase.wantError(t, err) + + length, err := io.Copy(io.Discard, result) + testcase.wantError(t, err) + testcase.want(t, length) + }) + } +} diff --git a/provider/ipfs/gateway.go b/provider/ipfs/gateway.go new file mode 100644 index 00000000..9eac0c3f --- /dev/null +++ b/provider/ipfs/gateway.go @@ -0,0 +1,51 @@ +package ipfs + +import ( + "bufio" + "context" + "net/http" + + "github.com/samber/lo" +) + +const ( + DefaultGatewayIPFS = "https://ipfs.io/" + DefaultGatewayCloudflare = "https://cloudflare-ipfs.com/" + DefaultGateway4EVERLAND = "https://4everland.io/" +) + +var DefaultGateways = []string{ + DefaultGatewayIPFS, + DefaultGatewayCloudflare, + DefaultGateway4EVERLAND, +} + +const ( + DefaultGatewayList = "https://raw.githubusercontent.com/ipfs/public-gateway-checker/master/gateways.txt" +) + +func FetchGateways(ctx context.Context, gatewayList string) ([]string, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, gatewayList, nil) + if err != nil { + return nil, err + } + + // nolint:bodyclose // False positive. + response, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + + defer lo.Try(response.Body.Close) + + var ( + scanner = bufio.NewScanner(response.Body) + gatewayURLs = make([]string, 0) + ) + + for scanner.Scan() { + gatewayURLs = append(gatewayURLs, scanner.Text()) + } + + return gatewayURLs, nil +} diff --git a/provider/ipfs/gateway_test.go b/provider/ipfs/gateway_test.go new file mode 100644 index 00000000..3fc97435 --- /dev/null +++ b/provider/ipfs/gateway_test.go @@ -0,0 +1,58 @@ +package ipfs_test + +import ( + "context" + "testing" + "time" + + "github.com/naturalselectionlabs/rss3-node/provider/ipfs" + "github.com/stretchr/testify/require" +) + +func TestGetGatewayURLs(t *testing.T) { + t.Parallel() + + type arguments struct { + ctx context.Context + timeout time.Duration + gatewayList string + } + + testcases := []struct { + name string + arguments arguments + want require.ValueAssertionFunc + wantError require.ErrorAssertionFunc + }{ + { + name: "Default gateway list", + arguments: arguments{ + ctx: context.Background(), + timeout: ipfs.DefaultTimeout, + gatewayList: ipfs.DefaultGatewayList, + }, + want: func(t require.TestingT, value interface{}, msgAndArgs ...interface{}) { + gatewayURLs, ok := value.([]string) + require.True(t, ok) + + require.Greater(t, len(gatewayURLs), 0) + }, + wantError: require.NoError, + }, + } + + for _, testcase := range testcases { + testcase := testcase + + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(testcase.arguments.ctx, testcase.arguments.timeout) + defer cancel() + + gatewayURLs, err := ipfs.FetchGateways(ctx, testcase.arguments.gatewayList) + testcase.wantError(t, err) + testcase.want(t, gatewayURLs) + }) + } +}