Skip to content

Commit

Permalink
feat(provider/ipfs): implement HTTP client
Browse files Browse the repository at this point in the history
Co-authored-by: KallyDev <[email protected]>
  • Loading branch information
polebug and kallydev committed Nov 17, 2023
1 parent bda8432 commit 0142dbb
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/naturalselectionlabs/rss3-node
go 1.21

require (
github.com/avast/retry-go/v4 v4.5.1
github.com/ethereum/go-ethereum v1.13.5
github.com/samber/lo v1.38.1
github.com/shopspring/decimal v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDO
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40=
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo=
Expand Down
174 changes: 174 additions & 0 deletions provider/ipfs/client_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package ipfs

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"

"github.com/avast/retry-go/v4"
syncx "github.com/naturalselectionlabs/rss3-node/common/sync"
"github.com/samber/lo"
)

var (
ErrorUnsupportedMode = errors.New("unsupported mode")
ErrorNoResults = errors.New("no results")
)

// FetchMode is the mode of fetching data from IPFS gateways.
type FetchMode string

const (
FetchModeQuick FetchMode = "quick"
FetchModeStable FetchMode = "stable"

DefaultTimeout = 3 * time.Second
DefaultAttempts = 3
)

type HTTPClient interface {
Fetch(ctx context.Context, path string, fetchMode FetchMode) (io.ReadCloser, error)
}

var _ HTTPClient = (*httpClient)(nil)

type httpClient struct {
httpClient *http.Client
gateways []string
attempts uint
locker sync.RWMutex
}

func (h *httpClient) Fetch(ctx context.Context, path string, fetchMode FetchMode) (readCloser io.ReadCloser, err error) {
retryableFunc := func() error {
switch fetchMode {
case FetchModeStable:
readCloser, err = h.fetchStable(ctx, path)
case FetchModeQuick:
readCloser, err = h.fetchQuick(ctx, path)
default:
return fmt.Errorf("%w %s", ErrorUnsupportedMode, fetchMode)
}

return err
}

retryIfFunc := func(err error) bool {
return !errors.Is(err, ErrorUnsupportedMode)
}

if err := retry.Do(retryableFunc, retry.Attempts(h.attempts), retry.RetryIf(retryIfFunc)); err != nil {
return nil, fmt.Errorf("retry attempts: %w", err)
}

return readCloser, nil
}

func (h *httpClient) fetch(ctx context.Context, gateway string, path string) (io.ReadCloser, error) {
fileURL, err := url.JoinPath(gateway, path)
if err != nil {
return nil, fmt.Errorf("invalid gateway and path: %w", err)
}

request, err := http.NewRequestWithContext(ctx, http.MethodGet, fileURL, nil)
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}

response, err := h.httpClient.Do(request)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}

if response.StatusCode != http.StatusOK {
defer lo.Try(response.Body.Close)

return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}

return response.Body, nil
}

func (h *httpClient) fetchStable(ctx context.Context, path string) (readCloser io.ReadCloser, err error) {
h.locker.RLock()
defer h.locker.RUnlock()

for _, gateway := range h.gateways {
if readCloser, err = h.fetch(ctx, gateway, path); err == nil {
return readCloser, nil
}
}

return nil, ErrorNoResults
}

func (h *httpClient) fetchQuick(ctx context.Context, path string) (io.ReadCloser, error) {
h.locker.RLock()
defer h.locker.RUnlock()

quickGroup := syncx.NewQuickGroup[io.ReadCloser](ctx)

for _, gateway := range h.gateways {
gateway := gateway

quickGroup.Go(func(ctx context.Context) (io.ReadCloser, error) {
return h.fetch(ctx, gateway, path)
})
}

result, err := quickGroup.Wait()
if err != nil {
return nil, ErrorNoResults
}

return result, nil
}

func NewHTTPClient(options ...HTTPClientOption) (HTTPClient, error) {
instance := httpClient{
httpClient: &http.Client{
Timeout: DefaultTimeout,
},
gateways: DefaultGateways,
attempts: DefaultAttempts,
}

for _, option := range options {
if err := option(&instance); err != nil {
return nil, fmt.Errorf("apply options: %w", err)
}
}

return &instance, nil
}

type HTTPClientOption func(*httpClient) error

func WithAttempts(attempts uint) HTTPClientOption {
return func(h *httpClient) error {
h.attempts = attempts

return nil
}
}

func WithTimeout(timeout time.Duration) HTTPClientOption {
return func(h *httpClient) error {
h.httpClient.Timeout = timeout

return nil
}
}

func WithGateways(gateways []string) HTTPClientOption {
return func(h *httpClient) error {
h.gateways = gateways // Overwrite gateways.

return nil
}
}
99 changes: 99 additions & 0 deletions provider/ipfs/client_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ipfs_test

import (
"context"
"io"
"testing"
"time"

"github.com/naturalselectionlabs/rss3-node/provider/ipfs"
"github.com/stretchr/testify/require"
)

func TestHttpClient_Fetch(t *testing.T) {
t.Parallel()

type arguments struct {
ctx context.Context
options []ipfs.HTTPClientOption
path string
fetchMode ipfs.FetchMode
}

testcases := []struct {
name string
arguments arguments
want require.ValueAssertionFunc
wantError require.ErrorAssertionFunc
}{
{
name: "Crossbell profile of kallydev.csb",
arguments: arguments{
ctx: context.Background(),
options: []ipfs.HTTPClientOption{
ipfs.WithTimeout(10 * time.Second),
},
path: "/ipfs/QmRohM66fF9WLqoLTCi6qQEtLiav4JMEsah21nNMeXxEfx",
fetchMode: ipfs.FetchModeStable,
},
want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) {
length, ok := v.(int64)
require.True(t, ok)
require.Greater(t, length, int64(0))
},
wantError: require.NoError,
},
{
name: "Crossbell avatar of kallydev.csb",
arguments: arguments{
ctx: context.Background(),
options: []ipfs.HTTPClientOption{
ipfs.WithTimeout(10 * time.Second),
},
path: "/ipfs/QmPkTNGYSUDx5n9hzEDgM19xd2aRTZMfwCuvhcPk3Qazhh",
fetchMode: ipfs.FetchModeStable,
},
want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) {
length, ok := v.(int64)
require.True(t, ok)
require.Greater(t, length, int64(0))
},
wantError: require.NoError,
},
{
name: "Token lists of Uniswap",
arguments: arguments{
ctx: context.Background(),
options: []ipfs.HTTPClientOption{
ipfs.WithTimeout(10 * time.Second),
},
path: "/ipns/tokens.uniswap.org",
fetchMode: ipfs.FetchModeStable,
},
want: func(t require.TestingT, v interface{}, msgAndArgs ...interface{}) {
length, ok := v.(int64)
require.True(t, ok)
require.Greater(t, length, int64(0))
},
wantError: require.NoError,
},
}

for _, testcase := range testcases {
testcase := testcase

t.Run(testcase.name, func(t *testing.T) {
t.Parallel()

httpClient, err := ipfs.NewHTTPClient(testcase.arguments.options...)
testcase.wantError(t, err)

result, err := httpClient.Fetch(testcase.arguments.ctx, testcase.arguments.path, testcase.arguments.fetchMode)
testcase.wantError(t, err)

length, err := io.Copy(io.Discard, result)
testcase.wantError(t, err)
testcase.want(t, length)
})
}
}
51 changes: 51 additions & 0 deletions provider/ipfs/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ipfs

import (
"bufio"
"context"
"net/http"

"github.com/samber/lo"
)

const (
DefaultGatewayIPFS = "https://ipfs.io/"
DefaultGatewayCloudflare = "https://cloudflare-ipfs.com/"
DefaultGateway4EVERLAND = "https://4everland.io/"
)

var DefaultGateways = []string{
DefaultGatewayIPFS,
DefaultGatewayCloudflare,
DefaultGateway4EVERLAND,
}

const (
DefaultGatewayList = "https://raw.githubusercontent.com/ipfs/public-gateway-checker/master/gateways.txt"
)

func FetchGateways(ctx context.Context, gatewayList string) ([]string, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, gatewayList, nil)
if err != nil {
return nil, err
}

// nolint:bodyclose // False positive.
response, err := http.DefaultClient.Do(request)
if err != nil {
return nil, err
}

defer lo.Try(response.Body.Close)

var (
scanner = bufio.NewScanner(response.Body)
gatewayURLs = make([]string, 0)
)

for scanner.Scan() {
gatewayURLs = append(gatewayURLs, scanner.Text())
}

return gatewayURLs, nil
}
Loading

0 comments on commit 0142dbb

Please sign in to comment.