Skip to content

Commit

Permalink
return real error for client-side streaming.Send
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 16, 2024
1 parent b6adf6c commit 012de39
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
16 changes: 12 additions & 4 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,15 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This will unblock reads eventually.
s.write(recvMsg{err: err})
}

// store closeStreamErr
if err == io.EOF {
err = st.Err()
}
if err != nil {
s.closeStreamErr.Store(err)
}

// If headerChan isn't closed, then close it.
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
Expand Down Expand Up @@ -598,7 +607,6 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) error {
if rawErr, ok := err.(ConnectionError); ok {
err = rawErr
err = status.Err(codes.Unavailable, rawErr.Desc)
}
t.mu.Lock()
Expand Down Expand Up @@ -660,10 +668,10 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
return errStreamDone
return s.getCloseStreamErr()
}
} else if s.getState() != streamActive {
return errStreamDone
return s.getCloseStreamErr()
}
df := newDataFrame()
df.streamID = s.id
Expand All @@ -674,7 +682,7 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
df.originD = df.d
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
return err
return s.getCloseStreamErr()
}
}
return t.controlBuf.put(df)
Expand Down
11 changes: 11 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ type Stream struct {
// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
contentSubtype string

// closeStreamErr is used to store the error when stream is closed
closeStreamErr atomic.Value
}

// isHeaderSent is only valid on the server-side.
Expand Down Expand Up @@ -479,6 +482,14 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return io.ReadFull(s.trReader, p)
}

func (s *Stream) getCloseStreamErr() error {
rawErr := s.closeStreamErr.Load()
if rawErr != nil {
return rawErr.(error)
}
return errStreamDone
}

// StreamWrite only used for unit test
func StreamWrite(s *Stream, buffer *bytes.Buffer) {
s.write(recvMsg{buffer: buffer})
Expand Down
4 changes: 1 addition & 3 deletions pkg/remote/trans/nphttp2/grpc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,7 @@ func TestLargeMessageSuspension(t *testing.T) {
msg := make([]byte, initialWindowSize*8)
ct.Write(s, nil, msg, &Options{})
err = ct.Write(s, nil, msg, &Options{Last: true})
if err != errStreamDone {
t.Fatalf("write got %v, want io.EOF", err)
}
test.Assert(t, errors.Is(err, ContextErr(ctx.Err())), err)
expectedErr := status.Err(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
if _, err := s.Read(make([]byte, 8)); err.Error() != expectedErr.Error() {
t.Fatalf("Read got %v of type %T, want %v", err, err, expectedErr)
Expand Down

0 comments on commit 012de39

Please sign in to comment.