From 6820d46eff96658e0286f763ef386cadd3bf66b0 Mon Sep 17 00:00:00 2001
From: Edward McFarlane <3036610+emcfarlane@users.noreply.github.com>
Date: Fri, 12 May 2023 13:13:29 +0100
Subject: [PATCH] Use varint encoding for protobuf streaming (#100)
* Use varint encoding for protobuf streaming
* Update README.md for streaming codecs
---
README.md | 27 +++++-----
larking/codec.go | 118 +++++++++++++++++++++++++++---------------
larking/codec_test.go | 63 ++++++++++++++--------
larking/http.go | 8 +--
4 files changed, 136 insertions(+), 80 deletions(-)
diff --git a/README.md b/README.md
index cb226d8..22a71db 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ Use Google's [API design guide](https://cloud.google.com/apis/design) to design
- Google API service configuration [syntax](https://cloud.google.com/endpoints/docs/grpc-service-config/reference/rpc/google.api#using-grpc-api-service-configuration)
- Websocket streaming with `websocket` kind annotations
- Content streaming with `google.api.HttpBody`
-- Streaming support with [SizeCodec](https://github.com/emcfarlane/larking#streaming-codecs)
+- Streaming support with [StreamCodec](https://github.com/emcfarlane/larking#streaming-codecs)
- Fast with low allocations: see [benchmarks](https://github.com/emcfarlane/larking/tree/main/benchmarks)
@@ -227,20 +227,23 @@ service ChatRoom {
```
#### Streaming Codecs
-Streaming requests servers will upgrade the codec interface to read and write
-marshalled messages to the stream.
-This allows codecs to control framing on the wire.
-For other protocols like `websockets` framing is controlled by the protocol and this isn't needed. Unlike gRPC encoding where compressions is _per message_, compression is based on the stream so only a method to delimiter messages is required.
-
-For example JSON messages are delimited based on the outer JSON braces `{...}`.
-This makes it easy to append payloads to the stream.
-To curl a streaming client method with two messages we can append all the JSON messages in the body:
+Streaming requests will upgrade the codec interface to read and write marshalled messages to the stream.
+Control of framing is given to the application on a per content type basis.
+If the underlying protocol has it's own message framing, like 'websockets', streaming codecs won't be used.
+Unlike gRPC streams where compression is _per message_ here the compression is per _stream_ so only a message delimiter is needed.
+See the [StreamCodec](https://pkg.go.dev/larking.io/larking#StreamCodec) docs for implementation details.
+- Protobuf messages use a varint delimiter encoding: ``.
+- JSON messages are delimited on the outer JSON braces `{}`.
+- Arbitrary content is delimited by the message size limit, chunking into bytes slices of length limit.
+
+To stream json we can append payloads together as a single payload:
```
-curl http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}'
+curl -XPOST http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}'
```
-This will split into two messages.
+The above creates an input stream of two messages.
+(N.B. when using HTTP/1 fully bidirectional streaming is not possible. All stream messages must be written before receiving a response)
-Protobuf messages require the length to be encoded before the message `[4 byte length]`.
+To stream protobuf we can use [protodelim](https://pkg.go.dev/google.golang.org/protobuf@v1.30.0/encoding/protodelim) to read and write varint streams of messages. Similar libraries are found in other [languages](https://github.com/protocolbuffers/protobuf/issues/10229).
#### Twirp
Twirp is supported through gRPC-transcoding with the implicit methods.
diff --git a/larking/codec.go b/larking/codec.go
index 97c6aa9..9dce914 100644
--- a/larking/codec.go
+++ b/larking/codec.go
@@ -11,7 +11,9 @@ import (
"sync"
"google.golang.org/grpc/encoding"
+ "google.golang.org/protobuf/encoding/protodelim"
"google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)
@@ -22,6 +24,28 @@ var bytesPool = sync.Pool{
},
}
+// growcap scales up the capacity of a slice.
+// Taken from the Go 1.14 runtime and proto package.
+func growcap(oldcap, wantcap int) (newcap int) {
+ if wantcap > oldcap*2 {
+ newcap = wantcap
+ } else if oldcap < 1024 {
+ // The Go 1.14 runtime takes this case when len(s) < 1024,
+ // not when cap(s) < 1024. The difference doesn't seem
+ // significant here.
+ newcap = oldcap * 2
+ } else {
+ newcap = oldcap
+ for 0 < newcap && newcap < wantcap {
+ newcap += newcap / 4
+ }
+ if newcap <= 0 {
+ newcap = wantcap
+ }
+ }
+ return newcap
+}
+
// Codec defines the interface used to encode and decode messages.
type Codec interface {
encoding.Codec
@@ -29,20 +53,20 @@ type Codec interface {
MarshalAppend([]byte, interface{}) ([]byte, error)
}
-// SizeCodec is used in streaming RPCs where the message boundaries are
+// StreamCodec is used in streaming RPCs where the message boundaries are
// determined by the codec.
-type SizeCodec interface {
+type StreamCodec interface {
Codec
- // SizeRead returns the size of the next message appended to dst.
- // SizeRead reads from r until either it has read a complete message or
- // encountered an error. SizeRead returns the data read from r.
+ // ReadNext returns the size of the next message appended to buf.
+ // ReadNext reads from r until either it has read a complete message or
+ // encountered an error and returns all the data read from r.
// The message is contained in dst[:n].
// Excess data read from r is stored in dst[n:].
- SizeRead(dst []byte, r io.Reader, limit int) ([]byte, int, error)
- // SizeWrite writes the message to w with a size aware encoding
+ ReadNext(buf []byte, r io.Reader, limit int) (dst []byte, n int, err error)
+ // WriteNext writes the message to w with a size aware encoding
// returning the number of bytes written.
- SizeWrite(w io.Writer, src []byte) (int, error)
+ WriteNext(w io.Writer, src []byte) (n int, err error)
}
func init() {
@@ -83,46 +107,58 @@ func (CodecProto) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, m)
}
-// SizeRead reads the length of the message encoded as 4 byte unsigned integer
-// and then reads the message from r.
-func (c CodecProto) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
- var buf [4]byte
- copy(buf[:], b)
- if len(b) < 4 {
- if _, err := r.Read(buf[len(b):]); err != nil {
- return b, 0, err
+// ReadNext reads a varint size-delimited wire-format message from r.
+func (c CodecProto) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
+ for i := 0; i < binary.MaxVarintLen64; i++ {
+ for i >= len(b) {
+ if len(b) == cap(b) {
+ // Add more capacity (let append pick how much).
+ b = append(b, 0)[:len(b)]
+ }
+ n, err := r.Read(b[len(b):cap(b)])
+ b = b[:len(b)+n]
+ if err != nil {
+ return b, 0, err
+ }
+ }
+ if b[i] < 0x80 {
+ break
}
- b = b[len(b):] // truncate
- } else {
- b = b[4:] // truncate
}
- // Size of the message is encoded as 4 byte unsigned integer.
- u := binary.BigEndian.Uint32(buf[:])
- if int(u) > limit {
- return b, 0, fmt.Errorf("message size %d exceeds limit %d", u, limit)
+ size, n := protowire.ConsumeVarint(b)
+ if n < 0 {
+ return b, 0, protowire.ParseError(n)
}
+ if limit > 0 && int(size) > limit {
+ return b, 0, &protodelim.SizeTooLargeError{Size: size, MaxSize: uint64(limit)}
+ }
+ b = b[n:] // consume the varint
+ n = int(size)
- if len(b) < int(u) {
- if cap(b) < int(u) {
- dst := make([]byte, len(b), int(u))
+ if len(b) < n {
+ if cap(b) < n {
+ dst := make([]byte, len(b), growcap(cap(b), n))
copy(dst, b)
b = dst
}
- if _, err := r.Read(b[len(b):int(u)]); err != nil {
+ if _, err := io.ReadFull(r, b[len(b):n]); err != nil {
+ if err == io.EOF {
+ return b, 0, io.ErrUnexpectedEOF
+ }
return b, 0, err
}
- b = b[:u]
+ b = b[:n]
}
- return b, int(u), nil
+ return b, n, nil
}
-// SizeWrite writes the length of the message encoded as 4 byte unsigned integer
+// WriteNext writes the length of the message encoded as 4 byte unsigned integer
// and then writes the message to w.
-func (c CodecProto) SizeWrite(w io.Writer, b []byte) (int, error) {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(b)))
- if _, err := w.Write(buf[:]); err != nil {
+func (c CodecProto) WriteNext(w io.Writer, b []byte) (int, error) {
+ var sizeArr [binary.MaxVarintLen64]byte
+ sizeBuf := protowire.AppendVarint(sizeArr[:0], uint64(len(b)))
+ if _, err := w.Write(sizeBuf); err != nil {
return 0, err
}
return w.Write(b)
@@ -161,10 +197,10 @@ func (c CodecJSON) Unmarshal(data []byte, v interface{}) error {
return c.UnmarshalOptions.Unmarshal(data, m)
}
-// SizeRead reads the length of the message around the json object.
+// ReadNext reads the length of the message around the json object.
// It reads until it finds a matching number of braces.
// It does not validate the JSON.
-func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
+func (c CodecJSON) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
var (
braceCount int
isString bool
@@ -210,11 +246,11 @@ func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, erro
}
}
}
- return b, 0, fmt.Errorf("message greater than limit %d", limit)
+ return b, 0, &protodelim.SizeTooLargeError{Size: uint64(len(b)), MaxSize: uint64(limit)}
}
-// SizeWrite writes the raw JSON message to w without any size prefix.
-func (c CodecJSON) SizeWrite(w io.Writer, b []byte) (int, error) {
+// WriteNext writes the raw JSON message to w without any size prefix.
+func (c CodecJSON) WriteNext(w io.Writer, b []byte) (int, error) {
return w.Write(b)
}
@@ -236,7 +272,7 @@ func (codecHTTPBody) Unmarshal(data []byte, v interface{}) error {
func (codecHTTPBody) Name() string { return "body" }
-func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
+func (codecHTTPBody) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
var total int
for {
if len(b) == cap(b) {
@@ -255,6 +291,6 @@ func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, er
}
}
-func (codecHTTPBody) SizeWrite(w io.Writer, b []byte) (int, error) {
+func (codecHTTPBody) WriteNext(w io.Writer, b []byte) (int, error) {
return w.Write(b)
}
diff --git a/larking/codec_test.go b/larking/codec_test.go
index 533ad92..f4c2e4f 100644
--- a/larking/codec_test.go
+++ b/larking/codec_test.go
@@ -2,15 +2,15 @@ package larking
import (
"bytes"
- "encoding/binary"
"errors"
"testing"
+ "google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"larking.io/api/testpb"
)
-func TestSizeCodecs(t *testing.T) {
+func TestStreamCodecs(t *testing.T) {
protob, err := proto.Marshal(&testpb.Message{
Text: "hello, protobuf",
@@ -18,6 +18,12 @@ func TestSizeCodecs(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+ protobig, err := proto.Marshal(&testpb.Message{
+ Text: string(bytes.Repeat([]byte{'a'}, 1<<20)),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
jsonb, err := (&CodecJSON{}).Marshal(&testpb.Message{
Text: "hello, json",
})
@@ -37,9 +43,9 @@ func TestSizeCodecs(t *testing.T) {
name: "proto buffered",
codec: CodecProto{},
input: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ t.Log("len(b)=", len(b))
+ return append(b, protob...)
}(),
want: protob,
}, {
@@ -47,39 +53,50 @@ func TestSizeCodecs(t *testing.T) {
codec: CodecProto{},
input: make([]byte, 0, 4+len(protob)),
extra: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ return append(b, protob...)
}(),
want: protob,
}, {
name: "proto partial size",
codec: CodecProto{},
input: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)[:2]
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ return append(b, protob...)[:1]
}(),
extra: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)[2:]
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ return append(b, protob...)[1:]
}(),
want: protob,
}, {
name: "proto partial message",
codec: CodecProto{},
input: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)[:6]
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ return append(b, protob...)[:6]
}(),
extra: func() []byte {
- var buf [4]byte
- binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
- return append(buf[:], protob...)[6:]
+ b := protowire.AppendVarint(nil, uint64(len(protob)))
+ return append(b, protob...)[6:]
}(),
want: protob,
+ }, {
+ name: "proto zero size",
+ codec: CodecProto{},
+ extra: func() []byte {
+ b := protowire.AppendVarint(nil, 0)
+ return b
+ }(),
+ want: []byte{},
+ }, {
+ name: "proto big size",
+ codec: CodecProto{},
+ extra: func() []byte {
+ b := protowire.AppendVarint(nil, uint64(len(protobig)))
+ return append(b, protobig...)
+ }(),
+ want: protobig,
}, {
name: "json buffered",
codec: CodecJSON{},
@@ -106,10 +123,10 @@ func TestSizeCodecs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- codec := tt.codec.(SizeCodec)
+ codec := tt.codec.(StreamCodec)
r := bytes.NewReader(tt.extra)
- b, n, err := codec.SizeRead(tt.input, r, len(tt.want))
+ b, n, err := codec.ReadNext(tt.input, r, len(tt.want))
if err != nil {
if tt.wantErr != nil {
if !errors.Is(err, tt.wantErr) {
@@ -139,7 +156,7 @@ func TestSizeCodecs(t *testing.T) {
}
var buf bytes.Buffer
- if _, err := codec.SizeWrite(&buf, b); err != nil {
+ if _, err := codec.WriteNext(&buf, b); err != nil {
t.Fatal(err)
}
diff --git a/larking/http.go b/larking/http.go
index ff27d33..9a69470 100644
--- a/larking/http.go
+++ b/larking/http.go
@@ -81,11 +81,11 @@ func (s *streamHTTP) writeMsg(c Codec, b []byte, contentType string) (int, error
}
s.sendCount += 1
if s.method.desc.IsStreamingServer() {
- codec, ok := c.(SizeCodec)
+ codec, ok := c.(StreamCodec)
if !ok {
return count, fmt.Errorf("codec %s does not support streaming", codec.Name())
}
- _, err := codec.SizeWrite(s.w, b)
+ _, err := codec.WriteNext(s.w, b)
return count, err
}
return count, s.opts.writeAll(s.w, b)
@@ -154,12 +154,12 @@ func (s *streamHTTP) readMsg(c Codec, b []byte) (int, []byte, error) {
count := s.recvCount
s.recvCount += 1
if s.method.desc.IsStreamingClient() {
- codec, ok := c.(SizeCodec)
+ codec, ok := c.(StreamCodec)
if !ok {
return count, nil, fmt.Errorf("codec %q does not support streaming", codec.Name())
}
b = append(b, s.rbuf...)
- b, n, err := codec.SizeRead(b, s.r, s.opts.maxReceiveMessageSize)
+ b, n, err := codec.ReadNext(b, s.r, s.opts.maxReceiveMessageSize)
if err == io.EOF {
s.rEOF, err = true, nil
}