Skip to content

Commit

Permalink
Use varint encoding for protobuf streaming (#100)
Browse files Browse the repository at this point in the history
* Use varint encoding for protobuf streaming

* Update README.md for streaming codecs
  • Loading branch information
emcfarlane authored May 12, 2023
1 parent bb1fdb9 commit 6820d46
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 80 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Use Google's [API design guide](https://cloud.google.com/apis/design) to design
- Google API service configuration [syntax](https://cloud.google.com/endpoints/docs/grpc-service-config/reference/rpc/google.api#using-grpc-api-service-configuration)
- Websocket streaming with `websocket` kind annotations
- Content streaming with `google.api.HttpBody`
- Streaming support with [SizeCodec](https://github.com/emcfarlane/larking#streaming-codecs)
- Streaming support with [StreamCodec](https://github.com/emcfarlane/larking#streaming-codecs)
- Fast with low allocations: see [benchmarks](https://github.com/emcfarlane/larking/tree/main/benchmarks)

<div align="center">
Expand Down Expand Up @@ -227,20 +227,23 @@ service ChatRoom {
```

#### Streaming Codecs
Streaming requests servers will upgrade the codec interface to read and write
marshalled messages to the stream.
This allows codecs to control framing on the wire.
For other protocols like `websockets` framing is controlled by the protocol and this isn't needed. Unlike gRPC encoding where compressions is _per message_, compression is based on the stream so only a method to delimiter messages is required.

For example JSON messages are delimited based on the outer JSON braces `{...}`.
This makes it easy to append payloads to the stream.
To curl a streaming client method with two messages we can append all the JSON messages in the body:
Streaming requests will upgrade the codec interface to read and write marshalled messages to the stream.
Control of framing is given to the application on a per content type basis.
If the underlying protocol has it's own message framing, like 'websockets', streaming codecs won't be used.
Unlike gRPC streams where compression is _per message_ here the compression is per _stream_ so only a message delimiter is needed.
See the [StreamCodec](https://pkg.go.dev/larking.io/larking#StreamCodec) docs for implementation details.
- Protobuf messages use a varint delimiter encoding: `<varint><binary-message>`.
- JSON messages are delimited on the outer JSON braces `{<fields>}`.
- Arbitrary content is delimited by the message size limit, chunking into bytes slices of length limit.

To stream json we can append payloads together as a single payload:
```
curl http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}'
curl -XPOST http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}'
```
This will split into two messages.
The above creates an input stream of two messages.
(N.B. when using HTTP/1 fully bidirectional streaming is not possible. All stream messages must be written before receiving a response)

Protobuf messages require the length to be encoded before the message `[4 byte length]<binary encoded>`.
To stream protobuf we can use [protodelim](https://pkg.go.dev/google.golang.org/[email protected]/encoding/protodelim) to read and write varint streams of messages. Similar libraries are found in other [languages](https://github.com/protocolbuffers/protobuf/issues/10229).

#### Twirp
Twirp is supported through gRPC-transcoding with the implicit methods.
Expand Down
118 changes: 77 additions & 41 deletions larking/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"sync"

"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/encoding/protodelim"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

Expand All @@ -22,27 +24,49 @@ var bytesPool = sync.Pool{
},
}

// growcap scales up the capacity of a slice.
// Taken from the Go 1.14 runtime and proto package.
func growcap(oldcap, wantcap int) (newcap int) {
if wantcap > oldcap*2 {
newcap = wantcap
} else if oldcap < 1024 {
// The Go 1.14 runtime takes this case when len(s) < 1024,
// not when cap(s) < 1024. The difference doesn't seem
// significant here.
newcap = oldcap * 2
} else {
newcap = oldcap
for 0 < newcap && newcap < wantcap {
newcap += newcap / 4
}
if newcap <= 0 {
newcap = wantcap
}
}
return newcap
}

// Codec defines the interface used to encode and decode messages.
type Codec interface {
encoding.Codec
// MarshalAppend appends the marshaled form of v to b and returns the result.
MarshalAppend([]byte, interface{}) ([]byte, error)
}

// SizeCodec is used in streaming RPCs where the message boundaries are
// StreamCodec is used in streaming RPCs where the message boundaries are
// determined by the codec.
type SizeCodec interface {
type StreamCodec interface {
Codec

// SizeRead returns the size of the next message appended to dst.
// SizeRead reads from r until either it has read a complete message or
// encountered an error. SizeRead returns the data read from r.
// ReadNext returns the size of the next message appended to buf.
// ReadNext reads from r until either it has read a complete message or
// encountered an error and returns all the data read from r.
// The message is contained in dst[:n].
// Excess data read from r is stored in dst[n:].
SizeRead(dst []byte, r io.Reader, limit int) ([]byte, int, error)
// SizeWrite writes the message to w with a size aware encoding
ReadNext(buf []byte, r io.Reader, limit int) (dst []byte, n int, err error)
// WriteNext writes the message to w with a size aware encoding
// returning the number of bytes written.
SizeWrite(w io.Writer, src []byte) (int, error)
WriteNext(w io.Writer, src []byte) (n int, err error)
}

func init() {
Expand Down Expand Up @@ -83,46 +107,58 @@ func (CodecProto) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, m)
}

// SizeRead reads the length of the message encoded as 4 byte unsigned integer
// and then reads the message from r.
func (c CodecProto) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
var buf [4]byte
copy(buf[:], b)
if len(b) < 4 {
if _, err := r.Read(buf[len(b):]); err != nil {
return b, 0, err
// ReadNext reads a varint size-delimited wire-format message from r.
func (c CodecProto) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
for i := 0; i < binary.MaxVarintLen64; i++ {
for i >= len(b) {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
b = append(b, 0)[:len(b)]
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
if err != nil {
return b, 0, err
}
}
if b[i] < 0x80 {
break
}
b = b[len(b):] // truncate
} else {
b = b[4:] // truncate
}

// Size of the message is encoded as 4 byte unsigned integer.
u := binary.BigEndian.Uint32(buf[:])
if int(u) > limit {
return b, 0, fmt.Errorf("message size %d exceeds limit %d", u, limit)
size, n := protowire.ConsumeVarint(b)
if n < 0 {
return b, 0, protowire.ParseError(n)
}
if limit > 0 && int(size) > limit {
return b, 0, &protodelim.SizeTooLargeError{Size: size, MaxSize: uint64(limit)}
}
b = b[n:] // consume the varint
n = int(size)

if len(b) < int(u) {
if cap(b) < int(u) {
dst := make([]byte, len(b), int(u))
if len(b) < n {
if cap(b) < n {
dst := make([]byte, len(b), growcap(cap(b), n))
copy(dst, b)
b = dst
}
if _, err := r.Read(b[len(b):int(u)]); err != nil {
if _, err := io.ReadFull(r, b[len(b):n]); err != nil {
if err == io.EOF {
return b, 0, io.ErrUnexpectedEOF
}
return b, 0, err
}
b = b[:u]
b = b[:n]
}
return b, int(u), nil
return b, n, nil
}

// SizeWrite writes the length of the message encoded as 4 byte unsigned integer
// WriteNext writes the length of the message encoded as 4 byte unsigned integer
// and then writes the message to w.
func (c CodecProto) SizeWrite(w io.Writer, b []byte) (int, error) {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(b)))
if _, err := w.Write(buf[:]); err != nil {
func (c CodecProto) WriteNext(w io.Writer, b []byte) (int, error) {
var sizeArr [binary.MaxVarintLen64]byte
sizeBuf := protowire.AppendVarint(sizeArr[:0], uint64(len(b)))
if _, err := w.Write(sizeBuf); err != nil {
return 0, err
}
return w.Write(b)
Expand Down Expand Up @@ -161,10 +197,10 @@ func (c CodecJSON) Unmarshal(data []byte, v interface{}) error {
return c.UnmarshalOptions.Unmarshal(data, m)
}

// SizeRead reads the length of the message around the json object.
// ReadNext reads the length of the message around the json object.
// It reads until it finds a matching number of braces.
// It does not validate the JSON.
func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
func (c CodecJSON) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
var (
braceCount int
isString bool
Expand Down Expand Up @@ -210,11 +246,11 @@ func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, erro
}
}
}
return b, 0, fmt.Errorf("message greater than limit %d", limit)
return b, 0, &protodelim.SizeTooLargeError{Size: uint64(len(b)), MaxSize: uint64(limit)}
}

// SizeWrite writes the raw JSON message to w without any size prefix.
func (c CodecJSON) SizeWrite(w io.Writer, b []byte) (int, error) {
// WriteNext writes the raw JSON message to w without any size prefix.
func (c CodecJSON) WriteNext(w io.Writer, b []byte) (int, error) {
return w.Write(b)
}

Expand All @@ -236,7 +272,7 @@ func (codecHTTPBody) Unmarshal(data []byte, v interface{}) error {

func (codecHTTPBody) Name() string { return "body" }

func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) {
func (codecHTTPBody) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) {
var total int
for {
if len(b) == cap(b) {
Expand All @@ -255,6 +291,6 @@ func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, er
}
}

func (codecHTTPBody) SizeWrite(w io.Writer, b []byte) (int, error) {
func (codecHTTPBody) WriteNext(w io.Writer, b []byte) (int, error) {
return w.Write(b)
}
63 changes: 40 additions & 23 deletions larking/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ package larking

import (
"bytes"
"encoding/binary"
"errors"
"testing"

"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"larking.io/api/testpb"
)

func TestSizeCodecs(t *testing.T) {
func TestStreamCodecs(t *testing.T) {

protob, err := proto.Marshal(&testpb.Message{
Text: "hello, protobuf",
})
if err != nil {
t.Fatal(err)
}
protobig, err := proto.Marshal(&testpb.Message{
Text: string(bytes.Repeat([]byte{'a'}, 1<<20)),
})
if err != nil {
t.Fatal(err)
}
jsonb, err := (&CodecJSON{}).Marshal(&testpb.Message{
Text: "hello, json",
})
Expand All @@ -37,49 +43,60 @@ func TestSizeCodecs(t *testing.T) {
name: "proto buffered",
codec: CodecProto{},
input: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)
b := protowire.AppendVarint(nil, uint64(len(protob)))
t.Log("len(b)=", len(b))
return append(b, protob...)
}(),
want: protob,
}, {
name: "proto unbuffered",
codec: CodecProto{},
input: make([]byte, 0, 4+len(protob)),
extra: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)
b := protowire.AppendVarint(nil, uint64(len(protob)))
return append(b, protob...)
}(),
want: protob,
}, {
name: "proto partial size",
codec: CodecProto{},
input: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)[:2]
b := protowire.AppendVarint(nil, uint64(len(protob)))
return append(b, protob...)[:1]
}(),
extra: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)[2:]
b := protowire.AppendVarint(nil, uint64(len(protob)))
return append(b, protob...)[1:]
}(),
want: protob,
}, {
name: "proto partial message",
codec: CodecProto{},
input: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)[:6]
b := protowire.AppendVarint(nil, uint64(len(protob)))
return append(b, protob...)[:6]
}(),
extra: func() []byte {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], uint32(len(protob)))
return append(buf[:], protob...)[6:]
b := protowire.AppendVarint(nil, uint64(len(protob)))
return append(b, protob...)[6:]
}(),
want: protob,
}, {
name: "proto zero size",
codec: CodecProto{},
extra: func() []byte {
b := protowire.AppendVarint(nil, 0)
return b
}(),
want: []byte{},
}, {
name: "proto big size",
codec: CodecProto{},
extra: func() []byte {
b := protowire.AppendVarint(nil, uint64(len(protobig)))
return append(b, protobig...)
}(),
want: protobig,
}, {
name: "json buffered",
codec: CodecJSON{},
Expand All @@ -106,10 +123,10 @@ func TestSizeCodecs(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
codec := tt.codec.(SizeCodec)
codec := tt.codec.(StreamCodec)

r := bytes.NewReader(tt.extra)
b, n, err := codec.SizeRead(tt.input, r, len(tt.want))
b, n, err := codec.ReadNext(tt.input, r, len(tt.want))
if err != nil {
if tt.wantErr != nil {
if !errors.Is(err, tt.wantErr) {
Expand Down Expand Up @@ -139,7 +156,7 @@ func TestSizeCodecs(t *testing.T) {
}

var buf bytes.Buffer
if _, err := codec.SizeWrite(&buf, b); err != nil {
if _, err := codec.WriteNext(&buf, b); err != nil {
t.Fatal(err)
}

Expand Down
Loading

0 comments on commit 6820d46

Please sign in to comment.