Skip to content

Commit

Permalink
WIP: client: Unify the override of all operations
Browse files Browse the repository at this point in the history
For two operations it has already been properly done, and for the rest
for some reason through global variables.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Nov 19, 2024
1 parent 924a998 commit aaeeee5
Show file tree
Hide file tree
Showing 15 changed files with 636 additions and 345 deletions.
9 changes: 1 addition & 8 deletions client/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@ import (

v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-sdk-go/accounting"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
)

var (
// special variable for test purposes, to overwrite real RPC calls.
rpcAPIBalance = rpcapi.Balance
)

// PrmBalanceGet groups parameters of BalanceGet operation.
type PrmBalanceGet struct {
prmCommonMeta
Expand Down Expand Up @@ -74,7 +67,7 @@ func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.
cc.meta = prm.prmCommonMeta
cc.req = &req
cc.call = func() (responseV2, error) {
return rpcAPIBalance(&c.c, &req, client.WithContext(ctx))
return c.server.getBalance(ctx, req)
}
cc.result = func(r responseV2) {
resp := r.(*v2accounting.BalanceResponse)
Expand Down
23 changes: 23 additions & 0 deletions client/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,34 @@ package client

import (
"context"
"fmt"
"testing"

apiaccounting "github.com/nspcc-dev/neofs-api-go/v2/accounting"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/stretchr/testify/require"
)

type getBalanceServer struct {
unimplementedNeoFSAPIServer
signer neofscrypto.Signer

balance apiaccounting.Decimal
}

func (x getBalanceServer) getBalance(context.Context, apiaccounting.BalanceRequest) (*apiaccounting.BalanceResponse, error) {
var body apiaccounting.BalanceResponseBody
body.SetBalance(&x.balance)
var resp apiaccounting.BalanceResponse
resp.SetBody(&body)

if err := signServiceMessage(x.signer, &resp, nil); err != nil {
return nil, fmt.Errorf("sign response message: %w", err)
}

return &resp, nil
}

func TestClient_BalanceGet(t *testing.T) {
c := newClient(t, nil)
ctx := context.Background()
Expand Down
237 changes: 233 additions & 4 deletions client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,52 @@ import (
"context"
"fmt"

v2netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap"
"github.com/nspcc-dev/neofs-api-go/v2/accounting"
"github.com/nspcc-dev/neofs-api-go/v2/container"
"github.com/nspcc-dev/neofs-api-go/v2/netmap"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/reputation"
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/session"
)

type getObjectStream interface {
Read(*object.GetResponse) error
}

type getObjectPayloadRangeStream interface {
Read(*object.GetRangeResponse) error
}

type searchObjectsStream interface {
Read(*object.SearchResponse) error
}

// interface of NeoFS API server. Exists for test purposes only.
type neoFSAPIServer interface {
createSession(context.Context, session.CreateRequest) (*session.CreateResponse, error)

netMapSnapshot(context.Context, v2netmap.SnapshotRequest) (*v2netmap.SnapshotResponse, error)
getBalance(context.Context, accounting.BalanceRequest) (*accounting.BalanceResponse, error)
netMapSnapshot(context.Context, netmap.SnapshotRequest) (*netmap.SnapshotResponse, error)
getNetworkInfo(context.Context, netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error)
getNodeInfo(context.Context, netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error)
putContainer(context.Context, container.PutRequest) (*container.PutResponse, error)
getContainer(context.Context, container.GetRequest) (*container.GetResponse, error)
deleteContainer(context.Context, container.DeleteRequest) (*container.DeleteResponse, error)
listContainers(context.Context, container.ListRequest) (*container.ListResponse, error)
getEACL(context.Context, container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error)
setEACL(context.Context, container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error)
announceContainerSpace(context.Context, container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error)
announceIntermediateReputation(context.Context, reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error)
announceLocalTrust(context.Context, reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error)
putObject(context.Context) (objectWriter, error)
deleteObject(context.Context, object.DeleteRequest) (*object.DeleteResponse, error)
hashObjectPayloadRanges(context.Context, object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)
headObject(context.Context, object.HeadRequest) (*object.HeadResponse, error)
getObject(context.Context, object.GetRequest) (getObjectStream, error)
getObjectPayloadRange(context.Context, object.GetRangeRequest) (getObjectPayloadRangeStream, error)
searchObjects(context.Context, object.SearchRequest) (searchObjectsStream, error)
}

// wrapper over real client connection which communicates over NeoFS API protocol.
Expand All @@ -28,7 +63,7 @@ func rpcErr(e error) error {

// executes NetmapService.NetmapSnapshot RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) netMapSnapshot(ctx context.Context, req v2netmap.SnapshotRequest) (*v2netmap.SnapshotResponse, error) {
func (x *coreServer) netMapSnapshot(ctx context.Context, req netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) {
resp, err := rpcapi.NetMapSnapshot((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
Expand All @@ -37,6 +72,39 @@ func (x *coreServer) netMapSnapshot(ctx context.Context, req v2netmap.SnapshotRe
return resp, nil
}

// executes NetmapService.NetworkInfo RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) getNetworkInfo(ctx context.Context, req netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) {
resp, err := rpcapi.NetworkInfo((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes NetmapService.LocalNodeInfo RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) getNodeInfo(ctx context.Context, req netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) {
resp, err := rpcapi.LocalNodeInfo((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes AccountingService.Balance RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) getBalance(ctx context.Context, req accounting.BalanceRequest) (*accounting.BalanceResponse, error) {
resp, err := rpcapi.Balance((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes SessionService.Create RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) createSession(ctx context.Context, req session.CreateRequest) (*session.CreateResponse, error) {
Expand All @@ -47,3 +115,164 @@ func (x *coreServer) createSession(ctx context.Context, req session.CreateReques

return resp, nil
}

// executes ContainerService.Put RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) putContainer(ctx context.Context, req container.PutRequest) (*container.PutResponse, error) {
resp, err := rpcapi.PutContainer((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ContainerService.Get RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) getContainer(ctx context.Context, req container.GetRequest) (*container.GetResponse, error) {
resp, err := rpcapi.GetContainer((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ContainerService.Delete RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) deleteContainer(ctx context.Context, req container.DeleteRequest) (*container.DeleteResponse, error) {
// rpcapi.DeleteContainer returns wrong response type
resp := new(container.DeleteResponse)
err := client.SendUnary((*client.Client)(x),
common.CallMethodInfoUnary("neo.fs.v2.container.ContainerService", "Delete"),
&req, resp, client.WithContext(ctx))
if err != nil {
return nil, err
}
return resp, nil
}

// executes ContainerService.List RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) listContainers(ctx context.Context, req container.ListRequest) (*container.ListResponse, error) {
resp, err := rpcapi.ListContainers((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ContainerService.GetExtendedACL RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) getEACL(ctx context.Context, req container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) {
resp, err := rpcapi.GetEACL((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ContainerService.SetExtendedACL RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) setEACL(ctx context.Context, req container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) {
// rpcapi.SetEACL returns wrong response type
resp := new(container.SetExtendedACLResponse)
err := client.SendUnary((*client.Client)(x),
common.CallMethodInfoUnary("neo.fs.v2.container.ContainerService", "SetExtendedACL"),
&req, resp, client.WithContext(ctx))
if err != nil {
return nil, err
}
return resp, nil
}

// executes ContainerService.AnnounceUsedSpace RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) announceContainerSpace(ctx context.Context, req container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) {
// rpcapi.AnnounceUsedSpace returns wrong response type
resp := new(container.AnnounceUsedSpaceResponse)
err := client.SendUnary((*client.Client)(x),
common.CallMethodInfoUnary("neo.fs.v2.container.ContainerService", "AnnounceUsedSpace"),
&req, resp, client.WithContext(ctx))
if err != nil {
return nil, err
}
return resp, nil
}

// executes ReputationService.AnnounceIntermediateResult RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) announceIntermediateReputation(ctx context.Context, req reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := rpcapi.AnnounceIntermediateResult((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ReputationService.AnnounceLocalTrust RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) announceLocalTrust(ctx context.Context, req reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := rpcapi.AnnounceLocalTrust((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

func (x *coreServer) putObject(ctx context.Context) (objectWriter, error) {
// TODO implement me
panic("implement me")
}

// executes ObjectService.Delete RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) deleteObject(ctx context.Context, req object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := rpcapi.DeleteObject((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ObjectService.GetRangeHash RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) hashObjectPayloadRanges(ctx context.Context, req object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := rpcapi.HashObjectRange((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

// executes ObjectService.Head RPC declared in NeoFS API protocol
// using underlying client.Client.
func (x *coreServer) headObject(ctx context.Context, req object.HeadRequest) (*object.HeadResponse, error) {
resp, err := rpcapi.HeadObject((*client.Client)(x), &req, client.WithContext(ctx))
if err != nil {
return nil, rpcErr(err)
}

return resp, nil
}

func (x *coreServer) getObject(ctx context.Context, req object.GetRequest) (getObjectStream, error) {
// TODO implement me
panic("implement me")
}

func (x *coreServer) getObjectPayloadRange(ctx context.Context, req object.GetRangeRequest) (getObjectPayloadRangeStream, error) {
// TODO implement me
panic("implement me")
}

func (x *coreServer) searchObjects(ctx context.Context, req object.SearchRequest) (searchObjectsStream, error) {
// TODO implement me
panic("implement me")
}
Loading

0 comments on commit aaeeee5

Please sign in to comment.