From dfcdbb84523a4af262ceff08801b507deb5b171f Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Sat, 23 May 2020 22:40:50 +0800 Subject: [PATCH] fix #51: support deconstructed error. --- README.md | 4 +- client.go | 6 +- cmd/rsocket-cli/rsocket-cli.go | 3 +- cmd/rsocket-cli/runner.go | 26 +++--- docs/.nojekyll | 0 docs/README.md | 3 - docs/_coverpage.md | 15 ---- docs/_media/logo.svg | 1 - docs/_sidebar.md | 4 - docs/index.html | 22 ----- examples/echo/echo.go | 2 +- extension/mime.go | 44 +++++----- internal/common/errors.go | 39 +++++---- internal/common/misc.go | 4 +- internal/common/version.go | 2 +- internal/framing/frame.go | 8 +- internal/framing/header.go | 2 +- internal/socket/duplex.go | 119 ++++++++++++++------------- internal/socket/socket.go | 50 +++++------ internal/transport/connection_tcp.go | 4 +- internal/transport/connection_ws.go | 2 +- internal/transport/transport.go | 5 +- internal/transport/uri.go | 6 +- lease/lease.go | 6 +- rsocket.go | 50 +++++++++-- rx/flux/flux.go | 6 +- rx/flux/proxy.go | 4 +- server.go | 6 +- 28 files changed, 223 insertions(+), 220 deletions(-) delete mode 100644 docs/.nojekyll delete mode 100644 docs/README.md delete mode 100644 docs/_coverpage.md delete mode 100644 docs/_media/logo.svg delete mode 100644 docs/_sidebar.md delete mode 100644 docs/index.html diff --git a/README.md b/README.md index 1f7d0c6..5822ad6 100644 --- a/README.md +++ b/README.md @@ -269,7 +269,7 @@ func init() { logger.SetLevel(logger.LevelInfo) } -``` +``` #### Dependencies - [reactor-go](https://github.com/jjeffcaii/reactor-go) @@ -297,5 +297,5 @@ func init() { - [x] Cancel - [x] Error - [x] Flow Control: RequestN - - [ ] Flow Control: Lease + - [x] Flow Control: Lease - [x] Load Balance diff --git a/client.go b/client.go index a45b8a6..d5217e6 100644 --- a/client.go +++ b/client.go @@ -38,9 +38,9 @@ type ( Start(ctx context.Context) (Client, error) // Start start a client socket with TLS. // Here's an example: - // tc:=&tls.Config{ + // tc := &tls.Config { // InsecureSkipVerify: true, - //} + // } StartTLS(ctx context.Context, tc *tls.Config) (Client, error) } @@ -87,7 +87,7 @@ func Connect() ClientBuilder { fragment: fragmentation.MaxFragment, setup: &socket.SetupInfo{ Version: common.DefaultVersion, - KeepaliveInterval: common.DefaultKeepaliveInteval, + KeepaliveInterval: common.DefaultKeepaliveInterval, KeepaliveLifetime: common.DefaultKeepaliveMaxLifetime, DataMimeType: defaultMimeType, MetadataMimeType: defaultMimeType, diff --git a/cmd/rsocket-cli/rsocket-cli.go b/cmd/rsocket-cli/rsocket-cli.go index f6c3fec..e5fade4 100644 --- a/cmd/rsocket-cli/rsocket-cli.go +++ b/cmd/rsocket-cli/rsocket-cli.go @@ -6,7 +6,6 @@ import ( "os" "time" - "github.com/rsocket/rsocket-go/extension" "github.com/rsocket/rsocket-go/logger" "github.com/rsocket/rsocket-go/rx" "github.com/urfave/cli/v2" @@ -108,7 +107,7 @@ func newFlags(args *Runner) []cli.Flag { &cli.StringFlag{ Name: "metadataFormat", Usage: "Metadata Format", - Value: extension.ApplicationJSON.String(), + Value: "application/json", Destination: &args.MetadataFormat, }, &cli.StringFlag{ diff --git a/cmd/rsocket-cli/runner.go b/cmd/rsocket-cli/runner.go index 82aa7d1..eb73e16 100644 --- a/cmd/rsocket-cli/runner.go +++ b/cmd/rsocket-cli/runner.go @@ -111,7 +111,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) { return } setupPayload := payload.New(setupData, nil) - sendings := p.createPayload() + sendingPayloads := p.createPayload() c, err := cb. DataMimeType(p.DataFormat). MetadataMimeType(p.MetadataFormat). @@ -131,7 +131,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) { } var first payload.Payload if !p.Channel { - first, err = sendings.BlockFirst(ctx) + first, err = sendingPayloads.BlockFirst(ctx) if err != nil { return } @@ -144,7 +144,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) { } else if p.Stream { err = p.execRequestStream(ctx, c, first) } else if p.Channel { - err = p.execRequestChannel(ctx, c, sendings) + err = p.execRequestChannel(ctx, c, sendingPayloads) } else if p.MetadataPush { err = p.execMetadataPush(ctx, c, first) } else { @@ -166,24 +166,24 @@ func (p *Runner) runServerMode(ctx context.Context) error { } ch := make(chan error) go func() { - sendings := p.createPayload() + sendingPayloads := p.createPayload() ch <- sb. Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) { var options []rsocket.OptAbstractSocket - options = append(options, rsocket.RequestStream(func(msg payload.Payload) flux.Flux { - p.showPayload(msg) - return sendings + options = append(options, rsocket.RequestStream(func(message payload.Payload) flux.Flux { + p.showPayload(message) + return sendingPayloads })) - options = append(options, rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux { - msgs.Subscribe(ctx, rx.OnNext(func(input payload.Payload) { + options = append(options, rsocket.RequestChannel(func(messages rx.Publisher) flux.Flux { + messages.Subscribe(ctx, rx.OnNext(func(input payload.Payload) { p.showPayload(input) })) - return sendings + return sendingPayloads })) options = append(options, rsocket.RequestResponse(func(msg payload.Payload) mono.Mono { p.showPayload(msg) return mono.Create(func(i context.Context, sink mono.Sink) { - first, err := sendings.BlockFirst(i) + first, err := sendingPayloads.BlockFirst(i) if err != nil { sink.Error(err) return @@ -207,14 +207,14 @@ func (p *Runner) runServerMode(ctx context.Context) error { return <-ch } -func (p *Runner) execMetadataPush(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { +func (p *Runner) execMetadataPush(_ context.Context, c rsocket.Client, send payload.Payload) (err error) { c.MetadataPush(send) m, _ := send.MetadataUTF8() logger.Infof("%s\n", m) return } -func (p *Runner) execFireAndForget(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { +func (p *Runner) execFireAndForget(_ context.Context, c rsocket.Client, send payload.Payload) (err error) { c.FireAndForget(send) return } diff --git a/docs/.nojekyll b/docs/.nojekyll deleted file mode 100644 index e69de29..0000000 diff --git a/docs/README.md b/docs/README.md deleted file mode 100644 index 611e341..0000000 --- a/docs/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# rsocket-go - -> An awesome project. diff --git a/docs/_coverpage.md b/docs/_coverpage.md deleted file mode 100644 index e5926d4..0000000 --- a/docs/_coverpage.md +++ /dev/null @@ -1,15 +0,0 @@ - - - -alt text - -# rsocket-go alpha - -> A RSocket protocol implementation in Golang. - -- Design For Golang. -- Thin reactive-streams implementation. -- Simulate Java SDK API. - -[GitHub](https://github.com/rsocket/rsocket-go/) -[Get Started](#rsocket-go) diff --git a/docs/_media/logo.svg b/docs/_media/logo.svg deleted file mode 100644 index a1ee8c3..0000000 --- a/docs/_media/logo.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/docs/_sidebar.md b/docs/_sidebar.md deleted file mode 100644 index 0969335..0000000 --- a/docs/_sidebar.md +++ /dev/null @@ -1,4 +0,0 @@ - - -* [Getting started](/) -* [Guide](guide.md) diff --git a/docs/index.html b/docs/index.html deleted file mode 100644 index b36a94e..0000000 --- a/docs/index.html +++ /dev/null @@ -1,22 +0,0 @@ - - - - - Document - - - - - - -
- - - - diff --git a/examples/echo/echo.go b/examples/echo/echo.go index 7efe695..1d8ed6f 100644 --- a/examples/echo/echo.go +++ b/examples/echo/echo.go @@ -110,7 +110,7 @@ func responder() rsocket.RSocket { // cdl.await(); // } catch (InterruptedException e) { // Thread.currentThread().interrupt(); - // throw new Errorf(e); + // throw new Error(e); // } //} s := pl.DataUTF8() diff --git a/extension/mime.go b/extension/mime.go index bab36d3..9258335 100644 --- a/extension/mime.go +++ b/extension/mime.go @@ -1,26 +1,13 @@ package extension -// MIME is MIME types in number. -// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md -type MIME int8 - -func (p MIME) String() string { - return mimeTypes[p] -} - var ( - mimeTypes map[MIME]string - mimeTypesR map[string]MIME + _mimeTypes map[MIME]string + _mimeTypesReverse map[string]MIME ) -// ParseMIME parse a string to MIME. -func ParseMIME(str string) (mime MIME, ok bool) { - mime, ok = mimeTypesR[str] - if !ok { - mime = -1 - } - return -} +// MIME is MIME types in number. +// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md +type MIME int8 // All MIMEs const ( @@ -74,7 +61,7 @@ const ( ) func init() { - mimeTypes = map[MIME]string{ + _mimeTypes = map[MIME]string{ ApplicationAvro: "application/avro", ApplicationCBOR: "application/cbor", ApplicationGraphql: "application/graphql", @@ -123,8 +110,21 @@ func init() { MessageRouting: "message/x.rsocket.routing.v0", MessageCompositeMetadata: "message/x.rsocket.composite-metadata.v0", } - mimeTypesR = make(map[string]MIME, len(mimeTypes)) - for k, v := range mimeTypes { - mimeTypesR[v] = k + _mimeTypesReverse = make(map[string]MIME, len(_mimeTypes)) + for k, v := range _mimeTypes { + _mimeTypesReverse[v] = k + } +} + +func (p MIME) String() string { + return _mimeTypes[p] +} + +// ParseMIME parse a string to MIME. +func ParseMIME(str string) (mime MIME, ok bool) { + mime, ok = _mimeTypesReverse[str] + if !ok { + mime = -1 } + return } diff --git a/internal/common/errors.go b/internal/common/errors.go index de9e90a..f5afbdb 100644 --- a/internal/common/errors.go +++ b/internal/common/errors.go @@ -5,24 +5,31 @@ import "errors" // ErrorCode is code for RSocket error. type ErrorCode uint32 -var errorCodeMap = map[ErrorCode]string{ - ErrorCodeInvalidSetup: "INVALID_SETUP", - ErrorCodeUnsupportedSetup: "UNSUPPORTED_SETUP", - ErrorCodeRejectedSetup: "REJECTED_SETUP", - ErrorCodeRejectedResume: "REJECTED_RESUME", - ErrorCodeConnectionError: "CONNECTION_ERROR", - ErrorCodeConnectionClose: "CONNECTION_CLOSE", - ErrorCodeApplicationError: "APPLICATION_ERROR", - ErrorCodeRejected: "REJECTED", - ErrorCodeCanceled: "CANCELED", - ErrorCodeInvalid: "INVALID", -} - func (p ErrorCode) String() string { - if s, ok := errorCodeMap[p]; ok { - return s + switch p { + case ErrorCodeInvalidSetup: + return "INVALID_SETUP" + case ErrorCodeUnsupportedSetup: + return "UNSUPPORTED_SETUP" + case ErrorCodeRejectedSetup: + return "REJECTED_SETUP" + case ErrorCodeRejectedResume: + return "REJECTED_RESUME" + case ErrorCodeConnectionError: + return "CONNECTION_ERROR" + case ErrorCodeConnectionClose: + return "CONNECTION_CLOSE" + case ErrorCodeApplicationError: + return "APPLICATION_ERROR" + case ErrorCodeRejected: + return "REJECTED" + case ErrorCodeCanceled: + return "CANCELED" + case ErrorCodeInvalid: + return "INVALID" + default: + return "UNKNOWN" } - return "UNKNOWN" } const ( diff --git a/internal/common/misc.go b/internal/common/misc.go index 910fc1b..3492838 100644 --- a/internal/common/misc.go +++ b/internal/common/misc.go @@ -5,8 +5,8 @@ import ( ) const ( - // DefaultKeepaliveInteval is default keepalive interval duration. - DefaultKeepaliveInteval = 20 * time.Second + // DefaultKeepaliveInterval is default keepalive interval duration. + DefaultKeepaliveInterval = 20 * time.Second // DefaultKeepaliveMaxLifetime is default keepalive max lifetime. DefaultKeepaliveMaxLifetime = 90 * time.Second ) diff --git a/internal/common/version.go b/internal/common/version.go index 2d30ef2..9f83d81 100644 --- a/internal/common/version.go +++ b/internal/common/version.go @@ -10,7 +10,7 @@ import ( var DefaultVersion Version = [2]uint16{1, 0} // Version define the version of protocol. -// It inclues major and minor version. +// It includes major and minor version. type Version [2]uint16 // Bytes returns raw bytes of current version. diff --git a/internal/framing/frame.go b/internal/framing/frame.go index 797cdfc..3cb297e 100644 --- a/internal/framing/frame.go +++ b/internal/framing/frame.go @@ -140,8 +140,8 @@ type Frame interface { SetBody(body *common.ByteBuff) // Bytes encodes and returns frame in bytes. Bytes() []byte - // IsResumable returns true if frame supports resume. - IsResumable() bool + // CanResume returns true if frame supports resume. + CanResume() bool // Done marks current frame has been sent. Done() (closed bool) // DoneNotify notifies when frame done. @@ -171,8 +171,8 @@ func (p *BaseFrame) DoneNotify() <-chan struct{} { return p.done } -// IsResumable returns true if frame supports resume. -func (p *BaseFrame) IsResumable() bool { +// CanResume returns true if frame supports resume. +func (p *BaseFrame) CanResume() bool { switch p.header.Type() { case FrameTypeRequestChannel, FrameTypeRequestStream, FrameTypeRequestResponse, FrameTypeRequestFNF, FrameTypeRequestN, FrameTypeCancel, FrameTypeError, FrameTypePayload: return true diff --git a/internal/framing/header.go b/internal/framing/header.go index 4835e7b..925f0e1 100644 --- a/internal/framing/header.go +++ b/internal/framing/header.go @@ -11,7 +11,7 @@ const ( HeaderLen = 6 ) -// FrameHeader is the header fo a RSocke frame. +// FrameHeader is the header fo a RSocket frame. // RSocket frames begin with a RSocket Frame Header. // It includes StreamID, FrameType and Flags. type FrameHeader [HeaderLen]byte diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index 45c6eae..6ca6682 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -856,71 +856,74 @@ func (p *DuplexRSocket) sendPayload( }) } -func (p *DuplexRSocket) drainWithKeepalive(leaseChan <-chan lease.Lease) (ok bool) { +func (p *DuplexRSocket) drainWithKeepaliveAndLease(leaseChan <-chan lease.Lease) (ok bool) { if len(p.outs) > 0 { p.drain(nil) } var out framing.Frame - - if leaseChan == nil { - select { - case <-p.keepaliver.C(): - ok = true - out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true) - if p.tp != nil { - err := p.tp.Send(out, true) - if err != nil { - logger.Errorf("send keepalive frame failed: %s\n", err.Error()) - } - } - case out, ok = <-p.outs: - if !ok { - return - } - if p.tp == nil { - p.outsPriority = append(p.outsPriority, out) - } else if err := p.tp.Send(out, true); err != nil { - logger.Errorf("send frame failed: %s\n", err.Error()) - p.outsPriority = append(p.outsPriority, out) + select { + case <-p.keepaliver.C(): + ok = true + out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true) + if p.tp != nil { + err := p.tp.Send(out, true) + if err != nil { + logger.Errorf("send keepalive frame failed: %s\n", err.Error()) } } - } else { - select { - case <-p.keepaliver.C(): - ok = true - out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true) - if p.tp != nil { - err := p.tp.Send(out, true) - if err != nil { - logger.Errorf("send keepalive frame failed: %s\n", err.Error()) - } - } - case ls, success := <-leaseChan: - ok = success - if !ok { - return - } - out = framing.NewFrameLease(ls.TimeToLive, ls.NumberOfRequests, ls.Metadata) - if p.tp == nil { - p.outsPriority = append(p.outsPriority, out) - } else if err := p.tp.Send(out, true); err != nil { - logger.Errorf("send frame failed: %s\n", err.Error()) - p.outsPriority = append(p.outsPriority, out) - } - case out, ok = <-p.outs: - if !ok { - return - } - if p.tp == nil { - p.outsPriority = append(p.outsPriority, out) - } else if err := p.tp.Send(out, true); err != nil { - logger.Errorf("send frame failed: %s\n", err.Error()) - p.outsPriority = append(p.outsPriority, out) - } + case ls, success := <-leaseChan: + ok = success + if !ok { + return + } + out = framing.NewFrameLease(ls.TimeToLive, ls.NumberOfRequests, ls.Metadata) + if p.tp == nil { + p.outsPriority = append(p.outsPriority, out) + } else if err := p.tp.Send(out, true); err != nil { + logger.Errorf("send frame failed: %s\n", err.Error()) + p.outsPriority = append(p.outsPriority, out) + } + case out, ok = <-p.outs: + if !ok { + return } + if p.tp == nil { + p.outsPriority = append(p.outsPriority, out) + } else if err := p.tp.Send(out, true); err != nil { + logger.Errorf("send frame failed: %s\n", err.Error()) + p.outsPriority = append(p.outsPriority, out) + } + } + return +} +func (p *DuplexRSocket) drainWithKeepalive() (ok bool) { + if len(p.outs) > 0 { + p.drain(nil) } + var out framing.Frame + select { + case <-p.keepaliver.C(): + ok = true + out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true) + if p.tp != nil { + err := p.tp.Send(out, true) + if err != nil { + logger.Errorf("send keepalive frame failed: %s\n", err.Error()) + } + } + case out, ok = <-p.outs: + if !ok { + return + } + if p.tp == nil { + p.outsPriority = append(p.outsPriority, out) + } else if err := p.tp.Send(out, true); err != nil { + logger.Errorf("send frame failed: %s\n", err.Error()) + p.outsPriority = append(p.outsPriority, out) + } + } return } @@ -1021,8 +1024,12 @@ func (p *DuplexRSocket) loopWriteWithKeepaliver(ctx context.Context, leaseChan < } default: } + p.drainOutBack() - if !p.drainWithKeepalive(leaseChan) { + if leaseChan == nil && !p.drainWithKeepalive() { + break + } + if leaseChan != nil && !p.drainWithKeepaliveAndLease(leaseChan) { break } } diff --git a/internal/socket/socket.go b/internal/socket/socket.go index ef1c31b..829c515 100644 --- a/internal/socket/socket.go +++ b/internal/socket/socket.go @@ -33,15 +33,15 @@ type Closeable interface { // Responder is a contract providing different interaction models for RSocket protocol. type Responder interface { // FireAndForget is a single one-way message. - FireAndForget(msg payload.Payload) + FireAndForget(message payload.Payload) // MetadataPush sends asynchronous Metadata frame. - MetadataPush(msg payload.Payload) + MetadataPush(message payload.Payload) // RequestResponse request single response. - RequestResponse(msg payload.Payload) mono.Mono + RequestResponse(message payload.Payload) mono.Mono // RequestStream request a completable stream. - RequestStream(msg payload.Payload) flux.Flux + RequestStream(message payload.Payload) flux.Flux // RequestChannel request a completable stream in both directions. - RequestChannel(msgs rx.Publisher) flux.Flux + RequestChannel(messages rx.Publisher) flux.Flux } // ClientSocket represents a client-side socket. @@ -78,45 +78,45 @@ type AbstractRSocket struct { } // MetadataPush starts a request of MetadataPush. -func (p AbstractRSocket) MetadataPush(msg payload.Payload) { +func (p AbstractRSocket) MetadataPush(message payload.Payload) { if p.MP == nil { logger.Errorf("%s\n", errUnsupportedMetadataPush) return } - p.MP(msg) + p.MP(message) } // FireAndForget starts a request of FireAndForget. -func (p AbstractRSocket) FireAndForget(msg payload.Payload) { +func (p AbstractRSocket) FireAndForget(message payload.Payload) { if p.FF == nil { logger.Errorf("%s\n", errUnsupportedFireAndForget) return } - p.FF(msg) + p.FF(message) } // RequestResponse starts a request of RequestResponse. -func (p AbstractRSocket) RequestResponse(msg payload.Payload) mono.Mono { +func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono { if p.RR == nil { return mono.Error(errUnsupportedRequestResponse) } - return p.RR(msg) + return p.RR(message) } // RequestStream starts a request of RequestStream. -func (p AbstractRSocket) RequestStream(msg payload.Payload) flux.Flux { +func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux { if p.RS == nil { return flux.Error(errUnsupportedRequestStream) } - return p.RS(msg) + return p.RS(message) } // RequestChannel starts a request of RequestChannel. -func (p AbstractRSocket) RequestChannel(msgs rx.Publisher) flux.Flux { +func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux { if p.RC == nil { return flux.Error(errUnsupportedRequestChannel) } - return p.RC(msgs) + return p.RC(messages) } type baseSocket struct { @@ -135,36 +135,36 @@ func (p *baseSocket) refreshLease(ttl time.Duration, n int64) { } } -func (p *baseSocket) FireAndForget(msg payload.Payload) { +func (p *baseSocket) FireAndForget(message payload.Payload) { if err := p.reqLease.allow(); err != nil { logger.Warnf("request FireAndForget failed: %v\n", err) } - p.socket.FireAndForget(msg) + p.socket.FireAndForget(message) } -func (p *baseSocket) MetadataPush(msg payload.Payload) { - p.socket.MetadataPush(msg) +func (p *baseSocket) MetadataPush(message payload.Payload) { + p.socket.MetadataPush(message) } -func (p *baseSocket) RequestResponse(msg payload.Payload) mono.Mono { +func (p *baseSocket) RequestResponse(message payload.Payload) mono.Mono { if err := p.reqLease.allow(); err != nil { return mono.Error(err) } - return p.socket.RequestResponse(msg) + return p.socket.RequestResponse(message) } -func (p *baseSocket) RequestStream(msg payload.Payload) flux.Flux { +func (p *baseSocket) RequestStream(message payload.Payload) flux.Flux { if err := p.reqLease.allow(); err != nil { return flux.Error(err) } - return p.socket.RequestStream(msg) + return p.socket.RequestStream(message) } -func (p *baseSocket) RequestChannel(msgs rx.Publisher) flux.Flux { +func (p *baseSocket) RequestChannel(messages rx.Publisher) flux.Flux { if err := p.reqLease.allow(); err != nil { return flux.Error(err) } - return p.socket.RequestChannel(msgs) + return p.socket.RequestChannel(messages) } func (p *baseSocket) OnClose(fn func(error)) { diff --git a/internal/transport/connection_tcp.go b/internal/transport/connection_tcp.go index 9f076b6..e045278 100644 --- a/internal/transport/connection_tcp.go +++ b/internal/transport/connection_tcp.go @@ -44,7 +44,7 @@ func (p *tcpConn) Read() (f framing.Frame, err error) { return } base := framing.NewBaseFrame(h, bf) - if p.counter != nil && base.IsResumable() { + if p.counter != nil && base.CanResume() { p.counter.incrReadBytes(base.Len()) } f, err = framing.NewFromBase(base) @@ -73,7 +73,7 @@ func (p *tcpConn) Flush() (err error) { func (p *tcpConn) Write(frame framing.Frame) (err error) { size := frame.Len() - if p.counter != nil && frame.IsResumable() { + if p.counter != nil && frame.CanResume() { p.counter.incrWriteBytes(size) } _, err = common.NewUint24(size).WriteTo(p.writer) diff --git a/internal/transport/connection_ws.go b/internal/transport/connection_ws.go index 9a03aaf..1bbd2d5 100644 --- a/internal/transport/connection_ws.go +++ b/internal/transport/connection_ws.go @@ -31,7 +31,7 @@ func (p *wsConnection) Read() (f framing.Frame, err error) { return } if t != websocket.BinaryMessage { - logger.Warnf("omit non-binary messsage %d\n", t) + logger.Warnf("omit non-binary message %d\n", t) return p.Read() } // validate min length diff --git a/internal/transport/transport.go b/internal/transport/transport.go index c89f064..58690f2 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -56,9 +56,6 @@ type Transport struct { hKeepalive FrameHandler } -func (p *Transport) SetRcvLease(ttl time.Duration, n uint32) { -} - // HandleDisaster registers handler when receiving frame of DISASTER Error with zero StreamID. func (p *Transport) HandleDisaster(handler FrameHandler) { p.hError0 = handler @@ -237,7 +234,7 @@ func (p *Transport) HandleKeepalive(handler FrameHandler) { } // DeliveryFrame delivery incoming frames. -func (p *Transport) DeliveryFrame(ctx context.Context, frame framing.Frame) (err error) { +func (p *Transport) DeliveryFrame(_ context.Context, frame framing.Frame) (err error) { header := frame.Header() t := header.Type() sid := header.StreamID() diff --git a/internal/transport/uri.go b/internal/transport/uri.go index 88882a4..d7dd366 100644 --- a/internal/transport/uri.go +++ b/internal/transport/uri.go @@ -86,10 +86,10 @@ func (p *URI) pp() *url.URL { } // ParseURI parse URI string and returns a URI. -func ParseURI(rawurl string) (*URI, error) { - u, err := url.Parse(rawurl) +func ParseURI(rawUrl string) (*URI, error) { + u, err := url.Parse(rawUrl) if err != nil { - return nil, errors.Wrapf(err, "parse url failed: %s", rawurl) + return nil, errors.Wrapf(err, "parse url failed: %s", rawUrl) } return (*URI)(u), nil } diff --git a/lease/lease.go b/lease/lease.go index fbdc81f..ef5b06d 100644 --- a/lease/lease.go +++ b/lease/lease.go @@ -8,9 +8,9 @@ import ( ) var ( - ErrLeaseNotRcv = errors.New("lease was not received yet") - ErrLeaseExpired = errors.New("lease expired") - ErrLeaseNoMoreRequests = errors.New("no more lease") + ErrLeaseNotRcv = errors.New("rsocket: lease was not received yet") + ErrLeaseExpired = errors.New("rsocket: lease expired") + ErrLeaseNoMoreRequests = errors.New("rsocket: no more lease") ) type Leases interface { diff --git a/rsocket.go b/rsocket.go index 4dae749..5ecf0e4 100644 --- a/rsocket.go +++ b/rsocket.go @@ -1,6 +1,7 @@ package rsocket import ( + "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/internal/socket" "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" @@ -8,22 +9,59 @@ import ( "github.com/rsocket/rsocket-go/rx/mono" ) +const ( + // ErrorCodeInvalidSetup means the setup frame is invalid for the server. + ErrorCodeInvalidSetup = common.ErrorCodeInvalidSetup + // ErrorCodeUnsupportedSetup means some (or all) of the parameters specified by the client are unsupported by the server. + ErrorCodeUnsupportedSetup = common.ErrorCodeUnsupportedSetup + // ErrorCodeRejectedSetup means server rejected the setup, it can specify the reason in the payload. + ErrorCodeRejectedSetup = common.ErrorCodeRejectedSetup + // ErrorCodeRejectedResume means server rejected the resume, it can specify the reason in the payload. + ErrorCodeRejectedResume = common.ErrorCodeRejectedResume + // ErrorCodeConnectionError means the connection is being terminated. + ErrorCodeConnectionError = common.ErrorCodeConnectionError + // ErrorCodeConnectionClose means the connection is being terminated. + ErrorCodeConnectionClose = common.ErrorCodeConnectionClose + // ErrorCodeApplicationError means application layer logic generating a Reactive Streams onError event. + ErrorCodeApplicationError = common.ErrorCodeApplicationError + // ErrorCodeRejected means Responder reject it. + ErrorCodeRejected = common.ErrorCodeRejected + // ErrorCodeCanceled means the Responder canceled the request but may have started processing it (similar to REJECTED but doesn't guarantee lack of side-effects). + ErrorCodeCanceled = common.ErrorCodeCanceled + // ErrorCodeInvalid means the request is invalid. + ErrorCodeInvalid = common.ErrorCodeInvalid +) + +type ( + // ErrorCode is code for RSocket error. + ErrorCode = common.ErrorCode + + // Error provides a method of accessing code and data. + Error interface { + error + // ErrorCode returns error code. + ErrorCode() ErrorCode + // ErrorData returns error data bytes. + ErrorData() []byte + } +) + type ( - // ServerAcceptor is alias for server accepter. + // ServerAcceptor is alias for server acceptor. ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) (RSocket, error) // RSocket is a contract providing different interaction models for RSocket protocol. RSocket interface { // FireAndForget is a single one-way message. - FireAndForget(msg payload.Payload) + FireAndForget(message payload.Payload) // MetadataPush sends asynchronous Metadata frame. - MetadataPush(msg payload.Payload) + MetadataPush(message payload.Payload) // RequestResponse request single response. - RequestResponse(msg payload.Payload) mono.Mono + RequestResponse(message payload.Payload) mono.Mono // RequestStream request a completable stream. - RequestStream(msg payload.Payload) flux.Flux + RequestStream(message payload.Payload) flux.Flux // RequestChannel request a completable stream in both directions. - RequestChannel(msgs rx.Publisher) flux.Flux + RequestChannel(messages rx.Publisher) flux.Flux } // CloseableRSocket is a RSocket which support more events. diff --git a/rx/flux/flux.go b/rx/flux/flux.go index 6e5aca3..91607e6 100644 --- a/rx/flux/flux.go +++ b/rx/flux/flux.go @@ -44,7 +44,7 @@ type Flux interface { // DoOnSubscribe add behavior triggered when the Flux is done being subscribed. DoOnSubscribe(rx.FnOnSubscribe) Flux // Map transform the items emitted by this Flux by applying a synchronous function to each item. - Map(fn func(in payload.Payload) payload.Payload) Flux + Map(func(payload.Payload) payload.Payload) Flux // SwitchOnFirst transform the current Flux once it emits its first element, making a conditional transformation possible. SwitchOnFirst(FnSwitchOnFirst) Flux // SubscribeOn run subscribe, onSubscribe and request on a specified scheduler. @@ -52,10 +52,10 @@ type Flux interface { // Raw returns Native Flux in reactor-go. Raw() flux.Flux // BlockFirst subscribe to this Flux and block indefinitely until the upstream signals its first value or completes. - // Returns that value, error if Flux completes errror, or nil if the Flux completes empty. + // Returns that value, error if Flux completes error, or nil if the Flux completes empty. BlockFirst(context.Context) (payload.Payload, error) // BlockLast subscribe to this Flux and block indefinitely until the upstream signals its last value or completes. - // Returns that value, error if Flux completes errror, or nil if the Flux completes empty. + // Returns that value, error if Flux completes error, or nil if the Flux completes empty. BlockLast(context.Context) (payload.Payload, error) // ToChan subscribe to this Flux and puts items into a chan. // It also puts errors into another chan. diff --git a/rx/flux/proxy.go b/rx/flux/proxy.go index b17532a..e7b3553 100644 --- a/rx/flux/proxy.go +++ b/rx/flux/proxy.go @@ -21,11 +21,11 @@ func (p proxy) Raw() flux.Flux { } func (p proxy) mustProcessor() flux.Processor { - proc, ok := p.Flux.(flux.Processor) + processor, ok := p.Flux.(flux.Processor) if !ok { panic(errors.New("require flux.Processor")) } - return proc + return processor } func (p proxy) Next(v payload.Payload) { diff --git a/server.go b/server.go index 93accae..1a05b5a 100644 --- a/server.go +++ b/server.go @@ -348,11 +348,11 @@ L: func (p *server) destroySessions() { for p.sm.Len() > 0 { - session := p.sm.Pop() - if err := session.Close(); err != nil { + nextSession := p.sm.Pop() + if err := nextSession.Close(); err != nil { logger.Warnf("kill session failed: %s\n", err) } else if logger.IsDebugEnabled() { - logger.Debugf("kill session success: %s\n", session) + logger.Debugf("kill session success: %s\n", nextSession) } } }