diff --git a/README.md b/README.md index cb226d8..22a71db 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Use Google's [API design guide](https://cloud.google.com/apis/design) to design - Google API service configuration [syntax](https://cloud.google.com/endpoints/docs/grpc-service-config/reference/rpc/google.api#using-grpc-api-service-configuration) - Websocket streaming with `websocket` kind annotations - Content streaming with `google.api.HttpBody` -- Streaming support with [SizeCodec](https://github.com/emcfarlane/larking#streaming-codecs) +- Streaming support with [StreamCodec](https://github.com/emcfarlane/larking#streaming-codecs) - Fast with low allocations: see [benchmarks](https://github.com/emcfarlane/larking/tree/main/benchmarks)
@@ -227,20 +227,23 @@ service ChatRoom { ``` #### Streaming Codecs -Streaming requests servers will upgrade the codec interface to read and write -marshalled messages to the stream. -This allows codecs to control framing on the wire. -For other protocols like `websockets` framing is controlled by the protocol and this isn't needed. Unlike gRPC encoding where compressions is _per message_, compression is based on the stream so only a method to delimiter messages is required. - -For example JSON messages are delimited based on the outer JSON braces `{...}`. -This makes it easy to append payloads to the stream. -To curl a streaming client method with two messages we can append all the JSON messages in the body: +Streaming requests will upgrade the codec interface to read and write marshalled messages to the stream. +Control of framing is given to the application on a per content type basis. +If the underlying protocol has it's own message framing, like 'websockets', streaming codecs won't be used. +Unlike gRPC streams where compression is _per message_ here the compression is per _stream_ so only a message delimiter is needed. +See the [StreamCodec](https://pkg.go.dev/larking.io/larking#StreamCodec) docs for implementation details. +- Protobuf messages use a varint delimiter encoding: ``. +- JSON messages are delimited on the outer JSON braces `{}`. +- Arbitrary content is delimited by the message size limit, chunking into bytes slices of length limit. + +To stream json we can append payloads together as a single payload: ``` -curl http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}' +curl -XPOST http://larking.io/v1/streaming -d '{"message":"one"}{"message":"two"}' ``` -This will split into two messages. +The above creates an input stream of two messages. +(N.B. when using HTTP/1 fully bidirectional streaming is not possible. All stream messages must be written before receiving a response) -Protobuf messages require the length to be encoded before the message `[4 byte length]`. +To stream protobuf we can use [protodelim](https://pkg.go.dev/google.golang.org/protobuf@v1.30.0/encoding/protodelim) to read and write varint streams of messages. Similar libraries are found in other [languages](https://github.com/protocolbuffers/protobuf/issues/10229). #### Twirp Twirp is supported through gRPC-transcoding with the implicit methods. diff --git a/larking/codec.go b/larking/codec.go index 97c6aa9..9dce914 100644 --- a/larking/codec.go +++ b/larking/codec.go @@ -11,7 +11,9 @@ import ( "sync" "google.golang.org/grpc/encoding" + "google.golang.org/protobuf/encoding/protodelim" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" ) @@ -22,6 +24,28 @@ var bytesPool = sync.Pool{ }, } +// growcap scales up the capacity of a slice. +// Taken from the Go 1.14 runtime and proto package. +func growcap(oldcap, wantcap int) (newcap int) { + if wantcap > oldcap*2 { + newcap = wantcap + } else if oldcap < 1024 { + // The Go 1.14 runtime takes this case when len(s) < 1024, + // not when cap(s) < 1024. The difference doesn't seem + // significant here. + newcap = oldcap * 2 + } else { + newcap = oldcap + for 0 < newcap && newcap < wantcap { + newcap += newcap / 4 + } + if newcap <= 0 { + newcap = wantcap + } + } + return newcap +} + // Codec defines the interface used to encode and decode messages. type Codec interface { encoding.Codec @@ -29,20 +53,20 @@ type Codec interface { MarshalAppend([]byte, interface{}) ([]byte, error) } -// SizeCodec is used in streaming RPCs where the message boundaries are +// StreamCodec is used in streaming RPCs where the message boundaries are // determined by the codec. -type SizeCodec interface { +type StreamCodec interface { Codec - // SizeRead returns the size of the next message appended to dst. - // SizeRead reads from r until either it has read a complete message or - // encountered an error. SizeRead returns the data read from r. + // ReadNext returns the size of the next message appended to buf. + // ReadNext reads from r until either it has read a complete message or + // encountered an error and returns all the data read from r. // The message is contained in dst[:n]. // Excess data read from r is stored in dst[n:]. - SizeRead(dst []byte, r io.Reader, limit int) ([]byte, int, error) - // SizeWrite writes the message to w with a size aware encoding + ReadNext(buf []byte, r io.Reader, limit int) (dst []byte, n int, err error) + // WriteNext writes the message to w with a size aware encoding // returning the number of bytes written. - SizeWrite(w io.Writer, src []byte) (int, error) + WriteNext(w io.Writer, src []byte) (n int, err error) } func init() { @@ -83,46 +107,58 @@ func (CodecProto) Unmarshal(data []byte, v interface{}) error { return proto.Unmarshal(data, m) } -// SizeRead reads the length of the message encoded as 4 byte unsigned integer -// and then reads the message from r. -func (c CodecProto) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) { - var buf [4]byte - copy(buf[:], b) - if len(b) < 4 { - if _, err := r.Read(buf[len(b):]); err != nil { - return b, 0, err +// ReadNext reads a varint size-delimited wire-format message from r. +func (c CodecProto) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) { + for i := 0; i < binary.MaxVarintLen64; i++ { + for i >= len(b) { + if len(b) == cap(b) { + // Add more capacity (let append pick how much). + b = append(b, 0)[:len(b)] + } + n, err := r.Read(b[len(b):cap(b)]) + b = b[:len(b)+n] + if err != nil { + return b, 0, err + } + } + if b[i] < 0x80 { + break } - b = b[len(b):] // truncate - } else { - b = b[4:] // truncate } - // Size of the message is encoded as 4 byte unsigned integer. - u := binary.BigEndian.Uint32(buf[:]) - if int(u) > limit { - return b, 0, fmt.Errorf("message size %d exceeds limit %d", u, limit) + size, n := protowire.ConsumeVarint(b) + if n < 0 { + return b, 0, protowire.ParseError(n) } + if limit > 0 && int(size) > limit { + return b, 0, &protodelim.SizeTooLargeError{Size: size, MaxSize: uint64(limit)} + } + b = b[n:] // consume the varint + n = int(size) - if len(b) < int(u) { - if cap(b) < int(u) { - dst := make([]byte, len(b), int(u)) + if len(b) < n { + if cap(b) < n { + dst := make([]byte, len(b), growcap(cap(b), n)) copy(dst, b) b = dst } - if _, err := r.Read(b[len(b):int(u)]); err != nil { + if _, err := io.ReadFull(r, b[len(b):n]); err != nil { + if err == io.EOF { + return b, 0, io.ErrUnexpectedEOF + } return b, 0, err } - b = b[:u] + b = b[:n] } - return b, int(u), nil + return b, n, nil } -// SizeWrite writes the length of the message encoded as 4 byte unsigned integer +// WriteNext writes the length of the message encoded as 4 byte unsigned integer // and then writes the message to w. -func (c CodecProto) SizeWrite(w io.Writer, b []byte) (int, error) { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(b))) - if _, err := w.Write(buf[:]); err != nil { +func (c CodecProto) WriteNext(w io.Writer, b []byte) (int, error) { + var sizeArr [binary.MaxVarintLen64]byte + sizeBuf := protowire.AppendVarint(sizeArr[:0], uint64(len(b))) + if _, err := w.Write(sizeBuf); err != nil { return 0, err } return w.Write(b) @@ -161,10 +197,10 @@ func (c CodecJSON) Unmarshal(data []byte, v interface{}) error { return c.UnmarshalOptions.Unmarshal(data, m) } -// SizeRead reads the length of the message around the json object. +// ReadNext reads the length of the message around the json object. // It reads until it finds a matching number of braces. // It does not validate the JSON. -func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) { +func (c CodecJSON) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) { var ( braceCount int isString bool @@ -210,11 +246,11 @@ func (c CodecJSON) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, erro } } } - return b, 0, fmt.Errorf("message greater than limit %d", limit) + return b, 0, &protodelim.SizeTooLargeError{Size: uint64(len(b)), MaxSize: uint64(limit)} } -// SizeWrite writes the raw JSON message to w without any size prefix. -func (c CodecJSON) SizeWrite(w io.Writer, b []byte) (int, error) { +// WriteNext writes the raw JSON message to w without any size prefix. +func (c CodecJSON) WriteNext(w io.Writer, b []byte) (int, error) { return w.Write(b) } @@ -236,7 +272,7 @@ func (codecHTTPBody) Unmarshal(data []byte, v interface{}) error { func (codecHTTPBody) Name() string { return "body" } -func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, error) { +func (codecHTTPBody) ReadNext(b []byte, r io.Reader, limit int) ([]byte, int, error) { var total int for { if len(b) == cap(b) { @@ -255,6 +291,6 @@ func (codecHTTPBody) SizeRead(b []byte, r io.Reader, limit int) ([]byte, int, er } } -func (codecHTTPBody) SizeWrite(w io.Writer, b []byte) (int, error) { +func (codecHTTPBody) WriteNext(w io.Writer, b []byte) (int, error) { return w.Write(b) } diff --git a/larking/codec_test.go b/larking/codec_test.go index 533ad92..f4c2e4f 100644 --- a/larking/codec_test.go +++ b/larking/codec_test.go @@ -2,15 +2,15 @@ package larking import ( "bytes" - "encoding/binary" "errors" "testing" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" "larking.io/api/testpb" ) -func TestSizeCodecs(t *testing.T) { +func TestStreamCodecs(t *testing.T) { protob, err := proto.Marshal(&testpb.Message{ Text: "hello, protobuf", @@ -18,6 +18,12 @@ func TestSizeCodecs(t *testing.T) { if err != nil { t.Fatal(err) } + protobig, err := proto.Marshal(&testpb.Message{ + Text: string(bytes.Repeat([]byte{'a'}, 1<<20)), + }) + if err != nil { + t.Fatal(err) + } jsonb, err := (&CodecJSON{}).Marshal(&testpb.Message{ Text: "hello, json", }) @@ -37,9 +43,9 @@ func TestSizeCodecs(t *testing.T) { name: "proto buffered", codec: CodecProto{}, input: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...) + b := protowire.AppendVarint(nil, uint64(len(protob))) + t.Log("len(b)=", len(b)) + return append(b, protob...) }(), want: protob, }, { @@ -47,39 +53,50 @@ func TestSizeCodecs(t *testing.T) { codec: CodecProto{}, input: make([]byte, 0, 4+len(protob)), extra: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...) + b := protowire.AppendVarint(nil, uint64(len(protob))) + return append(b, protob...) }(), want: protob, }, { name: "proto partial size", codec: CodecProto{}, input: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...)[:2] + b := protowire.AppendVarint(nil, uint64(len(protob))) + return append(b, protob...)[:1] }(), extra: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...)[2:] + b := protowire.AppendVarint(nil, uint64(len(protob))) + return append(b, protob...)[1:] }(), want: protob, }, { name: "proto partial message", codec: CodecProto{}, input: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...)[:6] + b := protowire.AppendVarint(nil, uint64(len(protob))) + return append(b, protob...)[:6] }(), extra: func() []byte { - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(len(protob))) - return append(buf[:], protob...)[6:] + b := protowire.AppendVarint(nil, uint64(len(protob))) + return append(b, protob...)[6:] }(), want: protob, + }, { + name: "proto zero size", + codec: CodecProto{}, + extra: func() []byte { + b := protowire.AppendVarint(nil, 0) + return b + }(), + want: []byte{}, + }, { + name: "proto big size", + codec: CodecProto{}, + extra: func() []byte { + b := protowire.AppendVarint(nil, uint64(len(protobig))) + return append(b, protobig...) + }(), + want: protobig, }, { name: "json buffered", codec: CodecJSON{}, @@ -106,10 +123,10 @@ func TestSizeCodecs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - codec := tt.codec.(SizeCodec) + codec := tt.codec.(StreamCodec) r := bytes.NewReader(tt.extra) - b, n, err := codec.SizeRead(tt.input, r, len(tt.want)) + b, n, err := codec.ReadNext(tt.input, r, len(tt.want)) if err != nil { if tt.wantErr != nil { if !errors.Is(err, tt.wantErr) { @@ -139,7 +156,7 @@ func TestSizeCodecs(t *testing.T) { } var buf bytes.Buffer - if _, err := codec.SizeWrite(&buf, b); err != nil { + if _, err := codec.WriteNext(&buf, b); err != nil { t.Fatal(err) } diff --git a/larking/http.go b/larking/http.go index ff27d33..9a69470 100644 --- a/larking/http.go +++ b/larking/http.go @@ -81,11 +81,11 @@ func (s *streamHTTP) writeMsg(c Codec, b []byte, contentType string) (int, error } s.sendCount += 1 if s.method.desc.IsStreamingServer() { - codec, ok := c.(SizeCodec) + codec, ok := c.(StreamCodec) if !ok { return count, fmt.Errorf("codec %s does not support streaming", codec.Name()) } - _, err := codec.SizeWrite(s.w, b) + _, err := codec.WriteNext(s.w, b) return count, err } return count, s.opts.writeAll(s.w, b) @@ -154,12 +154,12 @@ func (s *streamHTTP) readMsg(c Codec, b []byte) (int, []byte, error) { count := s.recvCount s.recvCount += 1 if s.method.desc.IsStreamingClient() { - codec, ok := c.(SizeCodec) + codec, ok := c.(StreamCodec) if !ok { return count, nil, fmt.Errorf("codec %q does not support streaming", codec.Name()) } b = append(b, s.rbuf...) - b, n, err := codec.SizeRead(b, s.r, s.opts.maxReceiveMessageSize) + b, n, err := codec.ReadNext(b, s.r, s.opts.maxReceiveMessageSize) if err == io.EOF { s.rEOF, err = true, nil }