Skip to content

Commit

Permalink
feat: optimize gRPC error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 20, 2024
1 parent 240f4ab commit 3087213
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 67 deletions.
14 changes: 9 additions & 5 deletions pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,23 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
var err error
c.finish(errStatusConnClosing)
c.mu.Lock()
err = c.err
c.mu.Unlock()
return nil, err
}
}
}

func (c *controlBuffer) finish() {
func (c *controlBuffer) finish(err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = ErrConnClosing
c.err = err
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
Expand All @@ -473,7 +477,7 @@ func (c *controlBuffer) finish() {
continue
}
if hdr.onOrphaned != nil { // It will be nil on the server-side.
hdr.onOrphaned(ErrConnClosing)
hdr.onOrphaned(err)
}
}
c.mu.Unlock()
Expand Down
46 changes: 38 additions & 8 deletions pkg/remote/trans/nphttp2/grpc/controlbuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package grpc

import (
"context"
"errors"
"testing"
"time"

"github.com/cloudwego/kitex/internal/test"
)

func TestControlBuf(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
cb := newControlBuffer(ctx.Done())

// test put()
Expand All @@ -52,24 +53,53 @@ func TestControlBuf(t *testing.T) {
test.Assert(t, !success, err)

// test throttle() mock a lot of response frame so throttle() will block current goroutine
for i := 0; i < maxQueuedTransportResponseFrames+5; i++ {
exceedSize := 5
for i := 0; i < maxQueuedTransportResponseFrames+exceedSize; i++ {
err := cb.put(&ping{})
test.Assert(t, err == nil, err)
}

// start a new goroutine to consume response frame
go func() {
time.Sleep(time.Millisecond * 100)
for {
for i := 0; i < exceedSize+1; i++ {
it, err := cb.get(false)
if err != nil || it == nil {
break
}
test.Assert(t, err == nil, err)
test.Assert(t, it != nil)
}
}()

cb.throttle()
// consumes all of the frames
for {
it, err := cb.get(false)
if err != nil || it == nil {
break
}
}

finishErr := errors.New("finish")
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
var block bool
cb.mu.Lock()
block = cb.consumerWaiting
cb.mu.Unlock()
if block {
cb.finish(finishErr)
cancel()
return
}
}
}()
item, err = cb.get(true)
test.Assert(t, err == finishErr, err)
test.Assert(t, item == nil, item)

// test finish()
cb.finish()
err = cb.put(testItem)
test.Assert(t, err == finishErr, err)
_, err = cb.get(false)
test.Assert(t, err == finishErr, err)
}
52 changes: 52 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/error_prompt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package grpc

import (
"fmt"
)

const (
remoteErrTpl = "[triggered by %s]"
remoteErrSuffix = "[triggered by remote service]"
handlerSideErrSuffix = "[triggered by handler side]"
sendRSTStreamFrameSuffix = "[send RST Stream Frame]"
)

func remoteErrMsg(msg string) string {
return msg + remoteErrSuffix
}

func remoteErrMsgf(msg string, a ...interface{}) string {
return fmt.Sprintf(msg+remoteErrSuffix, a...)
}

func handlerSideErrMsg(msg string) string {
return msg + handlerSideErrSuffix
}

func handlerSideErrMsgf(msg string, a ...interface{}) string {
return fmt.Sprintf(msg+handlerSideErrSuffix, a...)
}

func sendRSTStreamFrameMsg(msg string) string {
return msg + sendRSTStreamFrameSuffix
}

func sendRSTStreamFrameMsgf(msg string, a ...interface{}) string {
return fmt.Sprintf(msg+sendRSTStreamFrameSuffix, a...)
}
16 changes: 16 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/error_prompt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package grpc

import (
"testing"

"github.com/cloudwego/kitex/internal/test"
)

func TestErrMsg(t *testing.T) {
test.Assert(t, remoteErrMsg("test") == "test[triggered by remote service]")
test.Assert(t, remoteErrMsgf("test: %s", "val") == "test: val[triggered by remote service]")
test.Assert(t, handlerSideErrMsg("test") == "test[triggered by handler side]")
test.Assert(t, handlerSideErrMsgf("test: %s", "val") == "test: val[triggered by handler side]")
test.Assert(t, sendRSTStreamFrameMsg("test") == "test[send RST Stream Frame]")
test.Assert(t, sendRSTStreamFrameMsgf("test: %s", "val") == "test: val[send RST Stream Frame]")
}
29 changes: 23 additions & 6 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
if err != nil {
rst = true
rstCode = http2.ErrCodeCancel
klog.CtxInfof(s.ctx, sendRSTStreamFrameMsgf("KITEX: stream closed by user side ctx canceled, err: %v, code: %d", err, rstCode))
}
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
}
Expand All @@ -557,6 +558,15 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This will unblock reads eventually.
s.write(recvMsg{err: err})
}

// store closeStreamErr
if err == io.EOF {
err = st.Err()
}
if err != nil {
s.closeStreamErr.Store(err)
}

// If headerChan isn't closed, then close it.
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
Expand Down Expand Up @@ -597,6 +607,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) error {
if rawErr, ok := err.(ConnectionError); ok {
err = status.Err(codes.Unavailable, rawErr.Desc)
}
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
Expand All @@ -617,7 +630,7 @@ func (t *http2Client) Close(err error) error {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
t.controlBuf.finish(err)
t.cancel()
cErr := t.conn.Close()

Expand Down Expand Up @@ -656,10 +669,10 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
return errStreamDone
return s.getCloseStreamErr()
}
} else if s.getState() != streamActive {
return errStreamDone
return s.getCloseStreamErr()
}
df := newDataFrame()
df.streamID = s.id
Expand All @@ -670,7 +683,7 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
df.originD = df.d
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
return err
return s.getCloseStreamErr()
}
}
return t.controlBuf.put(df)
Expand Down Expand Up @@ -766,6 +779,7 @@ func (t *http2Client) handleData(f *grpcframe.DataFrame) {
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
klog.CtxErrorf(s.ctx, sendRSTStreamFrameMsgf("KITEX: http2Client.handleData inflow control err: %v, code: %d", err, http2.ErrCodeFlowControl))
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
return
}
Expand Down Expand Up @@ -983,6 +997,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
if !initialHeader && !endStream {
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
klog.CtxErrorf(s.ctx, sendRSTStreamFrameMsgf("KITEX: http2Client.operateHeaders received HEADERS frame in the middle of a stream, code: %d", http2.ErrCodeProtocol))
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
return
}
Expand All @@ -991,6 +1006,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
klog.CtxErrorf(s.ctx, sendRSTStreamFrameMsgf("KITEX: http2Client.operateHeaders decode HEADERS frame failed, err: %v, code: %d", err, http2.ErrCodeProtocol))
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
Expand Down Expand Up @@ -1034,7 +1050,7 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.ReadFrame()
if err != nil {
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
Expand Down Expand Up @@ -1073,12 +1089,13 @@ func (t *http2Client) reader() {
if err != nil {
msg = err.Error()
}
klog.CtxErrorf(s.ctx, sendRSTStreamFrameMsgf("KITEX: http2Client.reader encountered http2.StreamError: %v, code: %d", se, http2.ErrCodeProtocol))
t.closeStream(s, status.New(code, msg).Err(), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
}
continue
} else {
// Transport error.
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
t.Close(err)
return
}
Expand Down
Loading

0 comments on commit 3087213

Please sign in to comment.