From 1f894057c70d2e2a5d9f093cdbe1dce11ca2c2f1 Mon Sep 17 00:00:00 2001 From: iowar Date: Tue, 3 Oct 2023 17:05:11 +0300 Subject: [PATCH 1/5] create protobuf rw --- pkg/p2p/protobuf/protobuf.go | 73 ++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 pkg/p2p/protobuf/protobuf.go diff --git a/pkg/p2p/protobuf/protobuf.go b/pkg/p2p/protobuf/protobuf.go new file mode 100644 index 00000000..b487c682 --- /dev/null +++ b/pkg/p2p/protobuf/protobuf.go @@ -0,0 +1,73 @@ +package protobuf + +import ( + "context" + "fmt" + + "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + "github.com/primevprotocol/mev-commit/pkg/p2p" + "google.golang.org/protobuf/proto" +) + +type IProtobuf interface { + ReadMsg(context.Context, *pb.Message) error + WriteMsg(context.Context, *pb.Message) error +} + +type protobuf struct { + in p2p.Stream + out p2p.Stream +} + +const delimitedReaderMaxSize = 1024 * 1024 + +func NewReaderWriter(in p2p.Stream, out p2p.Stream) IProtobuf { + return &protobuf{in: in, out: out} +} + +func (p *protobuf) ReadMsg(ctx context.Context, msg *pb.Message) error { + type result struct { + msgBuf []byte + err error + } + + resultC := make(chan result, 1) + go func() { + msgBuf, err := p.in.ReadMsg() + resultC <- result{msgBuf: msgBuf, err: err} + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case res := <-resultC: + if res.err != nil { + return fmt.Errorf("failed to read msg: %w", res.err) + } + + if err := proto.Unmarshal(res.msgBuf, msg); err != nil { + return fmt.Errorf("failed to unmarshal message: %w", err) + } + + return nil + } +} + +func (p *protobuf) WriteMsg(ctx context.Context, msg *pb.Message) error { + msgBuf, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("failed marshaling message: %w", err) + } + + errC := make(chan error, 1) + go func() { + errC <- p.out.WriteMsg(msgBuf) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err + } +} From 823e95e4b19a066c8927317f316971cb830088d7 Mon Sep 17 00:00:00 2001 From: iowar Date: Tue, 3 Oct 2023 17:08:30 +0300 Subject: [PATCH 2/5] add nolint comment --- pkg/p2p/protobuf/protobuf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/p2p/protobuf/protobuf.go b/pkg/p2p/protobuf/protobuf.go index b487c682..62f5b435 100644 --- a/pkg/p2p/protobuf/protobuf.go +++ b/pkg/p2p/protobuf/protobuf.go @@ -19,7 +19,7 @@ type protobuf struct { out p2p.Stream } -const delimitedReaderMaxSize = 1024 * 1024 +const delimitedReaderMaxSize = 1024 * 1024 //nolint:unused func NewReaderWriter(in p2p.Stream, out p2p.Stream) IProtobuf { return &protobuf{in: in, out: out} From 8fa14a8f1f89fdb56b48cc602d3b2771d511ad53 Mon Sep 17 00:00:00 2001 From: iowar Date: Tue, 3 Oct 2023 17:28:49 +0300 Subject: [PATCH 3/5] protobuf test --- pkg/p2p/protobuf/protobuf_test.go | 60 +++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 pkg/p2p/protobuf/protobuf_test.go diff --git a/pkg/p2p/protobuf/protobuf_test.go b/pkg/p2p/protobuf/protobuf_test.go new file mode 100644 index 00000000..cc3e124e --- /dev/null +++ b/pkg/p2p/protobuf/protobuf_test.go @@ -0,0 +1,60 @@ +package protobuf_test + +import ( + "bytes" + "context" + "testing" + + "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + "github.com/primevprotocol/mev-commit/pkg/p2p/protobuf" + p2ptest "github.com/primevprotocol/mev-commit/pkg/p2p/testing" + "google.golang.org/protobuf/proto" +) + +func TestProtobufEncodingDecoding(t *testing.T) { + t.Parallel() + + t.Run("ok", func(t *testing.T) { + out, in := p2ptest.NewDuplexStream() + + test := &pb.Message{ + Type: pb.Message_DIAL.Enum(), + Dial: &pb.Message_Dial{ + Peer: &pb.Message_PeerInfo{ + Id: []byte("16Uiu2HAmK8EQ9axsSaE9hqjdHX7Hq5Jbeo2tmuNcLHwyQLWKjSYw"), + Addrs: [][]byte{ + []byte("0x9Bbc6Bef724d483C8f834C03fC2D3FE115D47ABF"), + []byte("0x903e2Abdc0fF09aBCB4C23CD8Ef1e267dfD32c2C"), + []byte("0xdCFA8524A3A266A388A4884cB6448463ae19D025"), + }, + }, + }, + } + + iproto := protobuf.NewReaderWriter(in, out) + + if err := iproto.WriteMsg(context.Background(), test); err != nil { + t.Fatal(err) + } + + var res pb.Message + err := iproto.ReadMsg(context.Background(), &res) + if err != nil { + t.Fatal(err) + } + + testBytes, err := proto.Marshal(test) + if err != nil { + t.Fatal(err) + } + + resBytes, err := proto.Marshal(&res) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(testBytes, resBytes) { + t.Fatalf("expected %v, got %v", test, res) + } + }) +} From 839eebe4cf38982f61c3572fea1fa691e299550b Mon Sep 17 00:00:00 2001 From: iowar Date: Tue, 3 Oct 2023 17:37:27 +0300 Subject: [PATCH 4/5] copylock fixed --- pkg/p2p/protobuf/protobuf_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/p2p/protobuf/protobuf_test.go b/pkg/p2p/protobuf/protobuf_test.go index cc3e124e..7dffa70a 100644 --- a/pkg/p2p/protobuf/protobuf_test.go +++ b/pkg/p2p/protobuf/protobuf_test.go @@ -54,7 +54,7 @@ func TestProtobufEncodingDecoding(t *testing.T) { } if !bytes.Equal(testBytes, resBytes) { - t.Fatalf("expected %v, got %v", test, res) + t.Fatalf("expected %v, got %v", testBytes, resBytes) } }) } From 087496ef26081ffc0e56daff2fcf07b85d9c8ade Mon Sep 17 00:00:00 2001 From: iowar Date: Tue, 3 Oct 2023 18:38:42 +0300 Subject: [PATCH 5/5] fix duplex stream --- pkg/p2p/protobuf/protobuf.go | 15 ++++++--------- pkg/p2p/protobuf/protobuf_test.go | 7 ++++--- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/p2p/protobuf/protobuf.go b/pkg/p2p/protobuf/protobuf.go index 62f5b435..e5b69d20 100644 --- a/pkg/p2p/protobuf/protobuf.go +++ b/pkg/p2p/protobuf/protobuf.go @@ -9,20 +9,17 @@ import ( "google.golang.org/protobuf/proto" ) -type IProtobuf interface { +type Encoder interface { ReadMsg(context.Context, *pb.Message) error WriteMsg(context.Context, *pb.Message) error } type protobuf struct { - in p2p.Stream - out p2p.Stream + p2p.Stream } -const delimitedReaderMaxSize = 1024 * 1024 //nolint:unused - -func NewReaderWriter(in p2p.Stream, out p2p.Stream) IProtobuf { - return &protobuf{in: in, out: out} +func NewReaderWriter(s p2p.Stream) Encoder { + return &protobuf{s} } func (p *protobuf) ReadMsg(ctx context.Context, msg *pb.Message) error { @@ -33,7 +30,7 @@ func (p *protobuf) ReadMsg(ctx context.Context, msg *pb.Message) error { resultC := make(chan result, 1) go func() { - msgBuf, err := p.in.ReadMsg() + msgBuf, err := p.Stream.ReadMsg() resultC <- result{msgBuf: msgBuf, err: err} }() @@ -61,7 +58,7 @@ func (p *protobuf) WriteMsg(ctx context.Context, msg *pb.Message) error { errC := make(chan error, 1) go func() { - errC <- p.out.WriteMsg(msgBuf) + errC <- p.Stream.WriteMsg(msgBuf) }() select { diff --git a/pkg/p2p/protobuf/protobuf_test.go b/pkg/p2p/protobuf/protobuf_test.go index 7dffa70a..20209a5d 100644 --- a/pkg/p2p/protobuf/protobuf_test.go +++ b/pkg/p2p/protobuf/protobuf_test.go @@ -31,14 +31,15 @@ func TestProtobufEncodingDecoding(t *testing.T) { }, } - iproto := protobuf.NewReaderWriter(in, out) + reader := protobuf.NewReaderWriter(in) + writer := protobuf.NewReaderWriter(out) - if err := iproto.WriteMsg(context.Background(), test); err != nil { + if err := writer.WriteMsg(context.Background(), test); err != nil { t.Fatal(err) } var res pb.Message - err := iproto.ReadMsg(context.Background(), &res) + err := reader.ReadMsg(context.Background(), &res) if err != nil { t.Fatal(err) }