diff --git a/README.md b/README.md
index 1f7d0c6..5822ad6 100644
--- a/README.md
+++ b/README.md
@@ -269,7 +269,7 @@ func init() {
logger.SetLevel(logger.LevelInfo)
}
-```
+```
#### Dependencies
- [reactor-go](https://github.com/jjeffcaii/reactor-go)
@@ -297,5 +297,5 @@ func init() {
- [x] Cancel
- [x] Error
- [x] Flow Control: RequestN
- - [ ] Flow Control: Lease
+ - [x] Flow Control: Lease
- [x] Load Balance
diff --git a/client.go b/client.go
index a45b8a6..d5217e6 100644
--- a/client.go
+++ b/client.go
@@ -38,9 +38,9 @@ type (
Start(ctx context.Context) (Client, error)
// Start start a client socket with TLS.
// Here's an example:
- // tc:=&tls.Config{
+ // tc := &tls.Config {
// InsecureSkipVerify: true,
- //}
+ // }
StartTLS(ctx context.Context, tc *tls.Config) (Client, error)
}
@@ -87,7 +87,7 @@ func Connect() ClientBuilder {
fragment: fragmentation.MaxFragment,
setup: &socket.SetupInfo{
Version: common.DefaultVersion,
- KeepaliveInterval: common.DefaultKeepaliveInteval,
+ KeepaliveInterval: common.DefaultKeepaliveInterval,
KeepaliveLifetime: common.DefaultKeepaliveMaxLifetime,
DataMimeType: defaultMimeType,
MetadataMimeType: defaultMimeType,
diff --git a/cmd/rsocket-cli/rsocket-cli.go b/cmd/rsocket-cli/rsocket-cli.go
index f6c3fec..e5fade4 100644
--- a/cmd/rsocket-cli/rsocket-cli.go
+++ b/cmd/rsocket-cli/rsocket-cli.go
@@ -6,7 +6,6 @@ import (
"os"
"time"
- "github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/logger"
"github.com/rsocket/rsocket-go/rx"
"github.com/urfave/cli/v2"
@@ -108,7 +107,7 @@ func newFlags(args *Runner) []cli.Flag {
&cli.StringFlag{
Name: "metadataFormat",
Usage: "Metadata Format",
- Value: extension.ApplicationJSON.String(),
+ Value: "application/json",
Destination: &args.MetadataFormat,
},
&cli.StringFlag{
diff --git a/cmd/rsocket-cli/runner.go b/cmd/rsocket-cli/runner.go
index 82aa7d1..eb73e16 100644
--- a/cmd/rsocket-cli/runner.go
+++ b/cmd/rsocket-cli/runner.go
@@ -111,7 +111,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
return
}
setupPayload := payload.New(setupData, nil)
- sendings := p.createPayload()
+ sendingPayloads := p.createPayload()
c, err := cb.
DataMimeType(p.DataFormat).
MetadataMimeType(p.MetadataFormat).
@@ -131,7 +131,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
}
var first payload.Payload
if !p.Channel {
- first, err = sendings.BlockFirst(ctx)
+ first, err = sendingPayloads.BlockFirst(ctx)
if err != nil {
return
}
@@ -144,7 +144,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
} else if p.Stream {
err = p.execRequestStream(ctx, c, first)
} else if p.Channel {
- err = p.execRequestChannel(ctx, c, sendings)
+ err = p.execRequestChannel(ctx, c, sendingPayloads)
} else if p.MetadataPush {
err = p.execMetadataPush(ctx, c, first)
} else {
@@ -166,24 +166,24 @@ func (p *Runner) runServerMode(ctx context.Context) error {
}
ch := make(chan error)
go func() {
- sendings := p.createPayload()
+ sendingPayloads := p.createPayload()
ch <- sb.
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
var options []rsocket.OptAbstractSocket
- options = append(options, rsocket.RequestStream(func(msg payload.Payload) flux.Flux {
- p.showPayload(msg)
- return sendings
+ options = append(options, rsocket.RequestStream(func(message payload.Payload) flux.Flux {
+ p.showPayload(message)
+ return sendingPayloads
}))
- options = append(options, rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
- msgs.Subscribe(ctx, rx.OnNext(func(input payload.Payload) {
+ options = append(options, rsocket.RequestChannel(func(messages rx.Publisher) flux.Flux {
+ messages.Subscribe(ctx, rx.OnNext(func(input payload.Payload) {
p.showPayload(input)
}))
- return sendings
+ return sendingPayloads
}))
options = append(options, rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
p.showPayload(msg)
return mono.Create(func(i context.Context, sink mono.Sink) {
- first, err := sendings.BlockFirst(i)
+ first, err := sendingPayloads.BlockFirst(i)
if err != nil {
sink.Error(err)
return
@@ -207,14 +207,14 @@ func (p *Runner) runServerMode(ctx context.Context) error {
return <-ch
}
-func (p *Runner) execMetadataPush(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) {
+func (p *Runner) execMetadataPush(_ context.Context, c rsocket.Client, send payload.Payload) (err error) {
c.MetadataPush(send)
m, _ := send.MetadataUTF8()
logger.Infof("%s\n", m)
return
}
-func (p *Runner) execFireAndForget(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) {
+func (p *Runner) execFireAndForget(_ context.Context, c rsocket.Client, send payload.Payload) (err error) {
c.FireAndForget(send)
return
}
diff --git a/docs/.nojekyll b/docs/.nojekyll
deleted file mode 100644
index e69de29..0000000
diff --git a/docs/README.md b/docs/README.md
deleted file mode 100644
index 611e341..0000000
--- a/docs/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-# rsocket-go
-
-> An awesome project.
diff --git a/docs/_coverpage.md b/docs/_coverpage.md
deleted file mode 100644
index e5926d4..0000000
--- a/docs/_coverpage.md
+++ /dev/null
@@ -1,15 +0,0 @@
-
-
-
-
-
-# rsocket-go alpha
-
-> A RSocket protocol implementation in Golang.
-
-- Design For Golang.
-- Thin reactive-streams implementation.
-- Simulate Java SDK API.
-
-[GitHub](https://github.com/rsocket/rsocket-go/)
-[Get Started](#rsocket-go)
diff --git a/docs/_media/logo.svg b/docs/_media/logo.svg
deleted file mode 100644
index a1ee8c3..0000000
--- a/docs/_media/logo.svg
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/docs/_sidebar.md b/docs/_sidebar.md
deleted file mode 100644
index 0969335..0000000
--- a/docs/_sidebar.md
+++ /dev/null
@@ -1,4 +0,0 @@
-
-
-* [Getting started](/)
-* [Guide](guide.md)
diff --git a/docs/index.html b/docs/index.html
deleted file mode 100644
index b36a94e..0000000
--- a/docs/index.html
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-
-
- Document
-
-
-
-
-
-
-
-
-
-
-
diff --git a/examples/echo/echo.go b/examples/echo/echo.go
index 7efe695..1d8ed6f 100644
--- a/examples/echo/echo.go
+++ b/examples/echo/echo.go
@@ -110,7 +110,7 @@ func responder() rsocket.RSocket {
// cdl.await();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
- // throw new Errorf(e);
+ // throw new Error(e);
// }
//}
s := pl.DataUTF8()
diff --git a/extension/mime.go b/extension/mime.go
index bab36d3..9258335 100644
--- a/extension/mime.go
+++ b/extension/mime.go
@@ -1,26 +1,13 @@
package extension
-// MIME is MIME types in number.
-// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md
-type MIME int8
-
-func (p MIME) String() string {
- return mimeTypes[p]
-}
-
var (
- mimeTypes map[MIME]string
- mimeTypesR map[string]MIME
+ _mimeTypes map[MIME]string
+ _mimeTypesReverse map[string]MIME
)
-// ParseMIME parse a string to MIME.
-func ParseMIME(str string) (mime MIME, ok bool) {
- mime, ok = mimeTypesR[str]
- if !ok {
- mime = -1
- }
- return
-}
+// MIME is MIME types in number.
+// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md
+type MIME int8
// All MIMEs
const (
@@ -74,7 +61,7 @@ const (
)
func init() {
- mimeTypes = map[MIME]string{
+ _mimeTypes = map[MIME]string{
ApplicationAvro: "application/avro",
ApplicationCBOR: "application/cbor",
ApplicationGraphql: "application/graphql",
@@ -123,8 +110,21 @@ func init() {
MessageRouting: "message/x.rsocket.routing.v0",
MessageCompositeMetadata: "message/x.rsocket.composite-metadata.v0",
}
- mimeTypesR = make(map[string]MIME, len(mimeTypes))
- for k, v := range mimeTypes {
- mimeTypesR[v] = k
+ _mimeTypesReverse = make(map[string]MIME, len(_mimeTypes))
+ for k, v := range _mimeTypes {
+ _mimeTypesReverse[v] = k
+ }
+}
+
+func (p MIME) String() string {
+ return _mimeTypes[p]
+}
+
+// ParseMIME parse a string to MIME.
+func ParseMIME(str string) (mime MIME, ok bool) {
+ mime, ok = _mimeTypesReverse[str]
+ if !ok {
+ mime = -1
}
+ return
}
diff --git a/internal/common/errors.go b/internal/common/errors.go
index de9e90a..f5afbdb 100644
--- a/internal/common/errors.go
+++ b/internal/common/errors.go
@@ -5,24 +5,31 @@ import "errors"
// ErrorCode is code for RSocket error.
type ErrorCode uint32
-var errorCodeMap = map[ErrorCode]string{
- ErrorCodeInvalidSetup: "INVALID_SETUP",
- ErrorCodeUnsupportedSetup: "UNSUPPORTED_SETUP",
- ErrorCodeRejectedSetup: "REJECTED_SETUP",
- ErrorCodeRejectedResume: "REJECTED_RESUME",
- ErrorCodeConnectionError: "CONNECTION_ERROR",
- ErrorCodeConnectionClose: "CONNECTION_CLOSE",
- ErrorCodeApplicationError: "APPLICATION_ERROR",
- ErrorCodeRejected: "REJECTED",
- ErrorCodeCanceled: "CANCELED",
- ErrorCodeInvalid: "INVALID",
-}
-
func (p ErrorCode) String() string {
- if s, ok := errorCodeMap[p]; ok {
- return s
+ switch p {
+ case ErrorCodeInvalidSetup:
+ return "INVALID_SETUP"
+ case ErrorCodeUnsupportedSetup:
+ return "UNSUPPORTED_SETUP"
+ case ErrorCodeRejectedSetup:
+ return "REJECTED_SETUP"
+ case ErrorCodeRejectedResume:
+ return "REJECTED_RESUME"
+ case ErrorCodeConnectionError:
+ return "CONNECTION_ERROR"
+ case ErrorCodeConnectionClose:
+ return "CONNECTION_CLOSE"
+ case ErrorCodeApplicationError:
+ return "APPLICATION_ERROR"
+ case ErrorCodeRejected:
+ return "REJECTED"
+ case ErrorCodeCanceled:
+ return "CANCELED"
+ case ErrorCodeInvalid:
+ return "INVALID"
+ default:
+ return "UNKNOWN"
}
- return "UNKNOWN"
}
const (
diff --git a/internal/common/misc.go b/internal/common/misc.go
index 910fc1b..3492838 100644
--- a/internal/common/misc.go
+++ b/internal/common/misc.go
@@ -5,8 +5,8 @@ import (
)
const (
- // DefaultKeepaliveInteval is default keepalive interval duration.
- DefaultKeepaliveInteval = 20 * time.Second
+ // DefaultKeepaliveInterval is default keepalive interval duration.
+ DefaultKeepaliveInterval = 20 * time.Second
// DefaultKeepaliveMaxLifetime is default keepalive max lifetime.
DefaultKeepaliveMaxLifetime = 90 * time.Second
)
diff --git a/internal/common/version.go b/internal/common/version.go
index 2d30ef2..9f83d81 100644
--- a/internal/common/version.go
+++ b/internal/common/version.go
@@ -10,7 +10,7 @@ import (
var DefaultVersion Version = [2]uint16{1, 0}
// Version define the version of protocol.
-// It inclues major and minor version.
+// It includes major and minor version.
type Version [2]uint16
// Bytes returns raw bytes of current version.
diff --git a/internal/framing/frame.go b/internal/framing/frame.go
index 797cdfc..3cb297e 100644
--- a/internal/framing/frame.go
+++ b/internal/framing/frame.go
@@ -140,8 +140,8 @@ type Frame interface {
SetBody(body *common.ByteBuff)
// Bytes encodes and returns frame in bytes.
Bytes() []byte
- // IsResumable returns true if frame supports resume.
- IsResumable() bool
+ // CanResume returns true if frame supports resume.
+ CanResume() bool
// Done marks current frame has been sent.
Done() (closed bool)
// DoneNotify notifies when frame done.
@@ -171,8 +171,8 @@ func (p *BaseFrame) DoneNotify() <-chan struct{} {
return p.done
}
-// IsResumable returns true if frame supports resume.
-func (p *BaseFrame) IsResumable() bool {
+// CanResume returns true if frame supports resume.
+func (p *BaseFrame) CanResume() bool {
switch p.header.Type() {
case FrameTypeRequestChannel, FrameTypeRequestStream, FrameTypeRequestResponse, FrameTypeRequestFNF, FrameTypeRequestN, FrameTypeCancel, FrameTypeError, FrameTypePayload:
return true
diff --git a/internal/framing/header.go b/internal/framing/header.go
index 4835e7b..925f0e1 100644
--- a/internal/framing/header.go
+++ b/internal/framing/header.go
@@ -11,7 +11,7 @@ const (
HeaderLen = 6
)
-// FrameHeader is the header fo a RSocke frame.
+// FrameHeader is the header fo a RSocket frame.
// RSocket frames begin with a RSocket Frame Header.
// It includes StreamID, FrameType and Flags.
type FrameHeader [HeaderLen]byte
diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go
index 45c6eae..6ca6682 100644
--- a/internal/socket/duplex.go
+++ b/internal/socket/duplex.go
@@ -856,71 +856,74 @@ func (p *DuplexRSocket) sendPayload(
})
}
-func (p *DuplexRSocket) drainWithKeepalive(leaseChan <-chan lease.Lease) (ok bool) {
+func (p *DuplexRSocket) drainWithKeepaliveAndLease(leaseChan <-chan lease.Lease) (ok bool) {
if len(p.outs) > 0 {
p.drain(nil)
}
var out framing.Frame
-
- if leaseChan == nil {
- select {
- case <-p.keepaliver.C():
- ok = true
- out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true)
- if p.tp != nil {
- err := p.tp.Send(out, true)
- if err != nil {
- logger.Errorf("send keepalive frame failed: %s\n", err.Error())
- }
- }
- case out, ok = <-p.outs:
- if !ok {
- return
- }
- if p.tp == nil {
- p.outsPriority = append(p.outsPriority, out)
- } else if err := p.tp.Send(out, true); err != nil {
- logger.Errorf("send frame failed: %s\n", err.Error())
- p.outsPriority = append(p.outsPriority, out)
+ select {
+ case <-p.keepaliver.C():
+ ok = true
+ out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true)
+ if p.tp != nil {
+ err := p.tp.Send(out, true)
+ if err != nil {
+ logger.Errorf("send keepalive frame failed: %s\n", err.Error())
}
}
- } else {
- select {
- case <-p.keepaliver.C():
- ok = true
- out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true)
- if p.tp != nil {
- err := p.tp.Send(out, true)
- if err != nil {
- logger.Errorf("send keepalive frame failed: %s\n", err.Error())
- }
- }
- case ls, success := <-leaseChan:
- ok = success
- if !ok {
- return
- }
- out = framing.NewFrameLease(ls.TimeToLive, ls.NumberOfRequests, ls.Metadata)
- if p.tp == nil {
- p.outsPriority = append(p.outsPriority, out)
- } else if err := p.tp.Send(out, true); err != nil {
- logger.Errorf("send frame failed: %s\n", err.Error())
- p.outsPriority = append(p.outsPriority, out)
- }
- case out, ok = <-p.outs:
- if !ok {
- return
- }
- if p.tp == nil {
- p.outsPriority = append(p.outsPriority, out)
- } else if err := p.tp.Send(out, true); err != nil {
- logger.Errorf("send frame failed: %s\n", err.Error())
- p.outsPriority = append(p.outsPriority, out)
- }
+ case ls, success := <-leaseChan:
+ ok = success
+ if !ok {
+ return
+ }
+ out = framing.NewFrameLease(ls.TimeToLive, ls.NumberOfRequests, ls.Metadata)
+ if p.tp == nil {
+ p.outsPriority = append(p.outsPriority, out)
+ } else if err := p.tp.Send(out, true); err != nil {
+ logger.Errorf("send frame failed: %s\n", err.Error())
+ p.outsPriority = append(p.outsPriority, out)
+ }
+ case out, ok = <-p.outs:
+ if !ok {
+ return
}
+ if p.tp == nil {
+ p.outsPriority = append(p.outsPriority, out)
+ } else if err := p.tp.Send(out, true); err != nil {
+ logger.Errorf("send frame failed: %s\n", err.Error())
+ p.outsPriority = append(p.outsPriority, out)
+ }
+ }
+ return
+}
+func (p *DuplexRSocket) drainWithKeepalive() (ok bool) {
+ if len(p.outs) > 0 {
+ p.drain(nil)
}
+ var out framing.Frame
+ select {
+ case <-p.keepaliver.C():
+ ok = true
+ out = framing.NewFrameKeepalive(p.counter.ReadBytes(), nil, true)
+ if p.tp != nil {
+ err := p.tp.Send(out, true)
+ if err != nil {
+ logger.Errorf("send keepalive frame failed: %s\n", err.Error())
+ }
+ }
+ case out, ok = <-p.outs:
+ if !ok {
+ return
+ }
+ if p.tp == nil {
+ p.outsPriority = append(p.outsPriority, out)
+ } else if err := p.tp.Send(out, true); err != nil {
+ logger.Errorf("send frame failed: %s\n", err.Error())
+ p.outsPriority = append(p.outsPriority, out)
+ }
+ }
return
}
@@ -1021,8 +1024,12 @@ func (p *DuplexRSocket) loopWriteWithKeepaliver(ctx context.Context, leaseChan <
}
default:
}
+
p.drainOutBack()
- if !p.drainWithKeepalive(leaseChan) {
+ if leaseChan == nil && !p.drainWithKeepalive() {
+ break
+ }
+ if leaseChan != nil && !p.drainWithKeepaliveAndLease(leaseChan) {
break
}
}
diff --git a/internal/socket/socket.go b/internal/socket/socket.go
index ef1c31b..829c515 100644
--- a/internal/socket/socket.go
+++ b/internal/socket/socket.go
@@ -33,15 +33,15 @@ type Closeable interface {
// Responder is a contract providing different interaction models for RSocket protocol.
type Responder interface {
// FireAndForget is a single one-way message.
- FireAndForget(msg payload.Payload)
+ FireAndForget(message payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
- MetadataPush(msg payload.Payload)
+ MetadataPush(message payload.Payload)
// RequestResponse request single response.
- RequestResponse(msg payload.Payload) mono.Mono
+ RequestResponse(message payload.Payload) mono.Mono
// RequestStream request a completable stream.
- RequestStream(msg payload.Payload) flux.Flux
+ RequestStream(message payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
- RequestChannel(msgs rx.Publisher) flux.Flux
+ RequestChannel(messages rx.Publisher) flux.Flux
}
// ClientSocket represents a client-side socket.
@@ -78,45 +78,45 @@ type AbstractRSocket struct {
}
// MetadataPush starts a request of MetadataPush.
-func (p AbstractRSocket) MetadataPush(msg payload.Payload) {
+func (p AbstractRSocket) MetadataPush(message payload.Payload) {
if p.MP == nil {
logger.Errorf("%s\n", errUnsupportedMetadataPush)
return
}
- p.MP(msg)
+ p.MP(message)
}
// FireAndForget starts a request of FireAndForget.
-func (p AbstractRSocket) FireAndForget(msg payload.Payload) {
+func (p AbstractRSocket) FireAndForget(message payload.Payload) {
if p.FF == nil {
logger.Errorf("%s\n", errUnsupportedFireAndForget)
return
}
- p.FF(msg)
+ p.FF(message)
}
// RequestResponse starts a request of RequestResponse.
-func (p AbstractRSocket) RequestResponse(msg payload.Payload) mono.Mono {
+func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono {
if p.RR == nil {
return mono.Error(errUnsupportedRequestResponse)
}
- return p.RR(msg)
+ return p.RR(message)
}
// RequestStream starts a request of RequestStream.
-func (p AbstractRSocket) RequestStream(msg payload.Payload) flux.Flux {
+func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux {
if p.RS == nil {
return flux.Error(errUnsupportedRequestStream)
}
- return p.RS(msg)
+ return p.RS(message)
}
// RequestChannel starts a request of RequestChannel.
-func (p AbstractRSocket) RequestChannel(msgs rx.Publisher) flux.Flux {
+func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux {
if p.RC == nil {
return flux.Error(errUnsupportedRequestChannel)
}
- return p.RC(msgs)
+ return p.RC(messages)
}
type baseSocket struct {
@@ -135,36 +135,36 @@ func (p *baseSocket) refreshLease(ttl time.Duration, n int64) {
}
}
-func (p *baseSocket) FireAndForget(msg payload.Payload) {
+func (p *baseSocket) FireAndForget(message payload.Payload) {
if err := p.reqLease.allow(); err != nil {
logger.Warnf("request FireAndForget failed: %v\n", err)
}
- p.socket.FireAndForget(msg)
+ p.socket.FireAndForget(message)
}
-func (p *baseSocket) MetadataPush(msg payload.Payload) {
- p.socket.MetadataPush(msg)
+func (p *baseSocket) MetadataPush(message payload.Payload) {
+ p.socket.MetadataPush(message)
}
-func (p *baseSocket) RequestResponse(msg payload.Payload) mono.Mono {
+func (p *baseSocket) RequestResponse(message payload.Payload) mono.Mono {
if err := p.reqLease.allow(); err != nil {
return mono.Error(err)
}
- return p.socket.RequestResponse(msg)
+ return p.socket.RequestResponse(message)
}
-func (p *baseSocket) RequestStream(msg payload.Payload) flux.Flux {
+func (p *baseSocket) RequestStream(message payload.Payload) flux.Flux {
if err := p.reqLease.allow(); err != nil {
return flux.Error(err)
}
- return p.socket.RequestStream(msg)
+ return p.socket.RequestStream(message)
}
-func (p *baseSocket) RequestChannel(msgs rx.Publisher) flux.Flux {
+func (p *baseSocket) RequestChannel(messages rx.Publisher) flux.Flux {
if err := p.reqLease.allow(); err != nil {
return flux.Error(err)
}
- return p.socket.RequestChannel(msgs)
+ return p.socket.RequestChannel(messages)
}
func (p *baseSocket) OnClose(fn func(error)) {
diff --git a/internal/transport/connection_tcp.go b/internal/transport/connection_tcp.go
index 9f076b6..e045278 100644
--- a/internal/transport/connection_tcp.go
+++ b/internal/transport/connection_tcp.go
@@ -44,7 +44,7 @@ func (p *tcpConn) Read() (f framing.Frame, err error) {
return
}
base := framing.NewBaseFrame(h, bf)
- if p.counter != nil && base.IsResumable() {
+ if p.counter != nil && base.CanResume() {
p.counter.incrReadBytes(base.Len())
}
f, err = framing.NewFromBase(base)
@@ -73,7 +73,7 @@ func (p *tcpConn) Flush() (err error) {
func (p *tcpConn) Write(frame framing.Frame) (err error) {
size := frame.Len()
- if p.counter != nil && frame.IsResumable() {
+ if p.counter != nil && frame.CanResume() {
p.counter.incrWriteBytes(size)
}
_, err = common.NewUint24(size).WriteTo(p.writer)
diff --git a/internal/transport/connection_ws.go b/internal/transport/connection_ws.go
index 9a03aaf..1bbd2d5 100644
--- a/internal/transport/connection_ws.go
+++ b/internal/transport/connection_ws.go
@@ -31,7 +31,7 @@ func (p *wsConnection) Read() (f framing.Frame, err error) {
return
}
if t != websocket.BinaryMessage {
- logger.Warnf("omit non-binary messsage %d\n", t)
+ logger.Warnf("omit non-binary message %d\n", t)
return p.Read()
}
// validate min length
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index c89f064..58690f2 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -56,9 +56,6 @@ type Transport struct {
hKeepalive FrameHandler
}
-func (p *Transport) SetRcvLease(ttl time.Duration, n uint32) {
-}
-
// HandleDisaster registers handler when receiving frame of DISASTER Error with zero StreamID.
func (p *Transport) HandleDisaster(handler FrameHandler) {
p.hError0 = handler
@@ -237,7 +234,7 @@ func (p *Transport) HandleKeepalive(handler FrameHandler) {
}
// DeliveryFrame delivery incoming frames.
-func (p *Transport) DeliveryFrame(ctx context.Context, frame framing.Frame) (err error) {
+func (p *Transport) DeliveryFrame(_ context.Context, frame framing.Frame) (err error) {
header := frame.Header()
t := header.Type()
sid := header.StreamID()
diff --git a/internal/transport/uri.go b/internal/transport/uri.go
index 88882a4..d7dd366 100644
--- a/internal/transport/uri.go
+++ b/internal/transport/uri.go
@@ -86,10 +86,10 @@ func (p *URI) pp() *url.URL {
}
// ParseURI parse URI string and returns a URI.
-func ParseURI(rawurl string) (*URI, error) {
- u, err := url.Parse(rawurl)
+func ParseURI(rawUrl string) (*URI, error) {
+ u, err := url.Parse(rawUrl)
if err != nil {
- return nil, errors.Wrapf(err, "parse url failed: %s", rawurl)
+ return nil, errors.Wrapf(err, "parse url failed: %s", rawUrl)
}
return (*URI)(u), nil
}
diff --git a/lease/lease.go b/lease/lease.go
index fbdc81f..ef5b06d 100644
--- a/lease/lease.go
+++ b/lease/lease.go
@@ -8,9 +8,9 @@ import (
)
var (
- ErrLeaseNotRcv = errors.New("lease was not received yet")
- ErrLeaseExpired = errors.New("lease expired")
- ErrLeaseNoMoreRequests = errors.New("no more lease")
+ ErrLeaseNotRcv = errors.New("rsocket: lease was not received yet")
+ ErrLeaseExpired = errors.New("rsocket: lease expired")
+ ErrLeaseNoMoreRequests = errors.New("rsocket: no more lease")
)
type Leases interface {
diff --git a/rsocket.go b/rsocket.go
index 4dae749..5ecf0e4 100644
--- a/rsocket.go
+++ b/rsocket.go
@@ -1,6 +1,7 @@
package rsocket
import (
+ "github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/internal/socket"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
@@ -8,22 +9,59 @@ import (
"github.com/rsocket/rsocket-go/rx/mono"
)
+const (
+ // ErrorCodeInvalidSetup means the setup frame is invalid for the server.
+ ErrorCodeInvalidSetup = common.ErrorCodeInvalidSetup
+ // ErrorCodeUnsupportedSetup means some (or all) of the parameters specified by the client are unsupported by the server.
+ ErrorCodeUnsupportedSetup = common.ErrorCodeUnsupportedSetup
+ // ErrorCodeRejectedSetup means server rejected the setup, it can specify the reason in the payload.
+ ErrorCodeRejectedSetup = common.ErrorCodeRejectedSetup
+ // ErrorCodeRejectedResume means server rejected the resume, it can specify the reason in the payload.
+ ErrorCodeRejectedResume = common.ErrorCodeRejectedResume
+ // ErrorCodeConnectionError means the connection is being terminated.
+ ErrorCodeConnectionError = common.ErrorCodeConnectionError
+ // ErrorCodeConnectionClose means the connection is being terminated.
+ ErrorCodeConnectionClose = common.ErrorCodeConnectionClose
+ // ErrorCodeApplicationError means application layer logic generating a Reactive Streams onError event.
+ ErrorCodeApplicationError = common.ErrorCodeApplicationError
+ // ErrorCodeRejected means Responder reject it.
+ ErrorCodeRejected = common.ErrorCodeRejected
+ // ErrorCodeCanceled means the Responder canceled the request but may have started processing it (similar to REJECTED but doesn't guarantee lack of side-effects).
+ ErrorCodeCanceled = common.ErrorCodeCanceled
+ // ErrorCodeInvalid means the request is invalid.
+ ErrorCodeInvalid = common.ErrorCodeInvalid
+)
+
+type (
+ // ErrorCode is code for RSocket error.
+ ErrorCode = common.ErrorCode
+
+ // Error provides a method of accessing code and data.
+ Error interface {
+ error
+ // ErrorCode returns error code.
+ ErrorCode() ErrorCode
+ // ErrorData returns error data bytes.
+ ErrorData() []byte
+ }
+)
+
type (
- // ServerAcceptor is alias for server accepter.
+ // ServerAcceptor is alias for server acceptor.
ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) (RSocket, error)
// RSocket is a contract providing different interaction models for RSocket protocol.
RSocket interface {
// FireAndForget is a single one-way message.
- FireAndForget(msg payload.Payload)
+ FireAndForget(message payload.Payload)
// MetadataPush sends asynchronous Metadata frame.
- MetadataPush(msg payload.Payload)
+ MetadataPush(message payload.Payload)
// RequestResponse request single response.
- RequestResponse(msg payload.Payload) mono.Mono
+ RequestResponse(message payload.Payload) mono.Mono
// RequestStream request a completable stream.
- RequestStream(msg payload.Payload) flux.Flux
+ RequestStream(message payload.Payload) flux.Flux
// RequestChannel request a completable stream in both directions.
- RequestChannel(msgs rx.Publisher) flux.Flux
+ RequestChannel(messages rx.Publisher) flux.Flux
}
// CloseableRSocket is a RSocket which support more events.
diff --git a/rx/flux/flux.go b/rx/flux/flux.go
index 6e5aca3..91607e6 100644
--- a/rx/flux/flux.go
+++ b/rx/flux/flux.go
@@ -44,7 +44,7 @@ type Flux interface {
// DoOnSubscribe add behavior triggered when the Flux is done being subscribed.
DoOnSubscribe(rx.FnOnSubscribe) Flux
// Map transform the items emitted by this Flux by applying a synchronous function to each item.
- Map(fn func(in payload.Payload) payload.Payload) Flux
+ Map(func(payload.Payload) payload.Payload) Flux
// SwitchOnFirst transform the current Flux once it emits its first element, making a conditional transformation possible.
SwitchOnFirst(FnSwitchOnFirst) Flux
// SubscribeOn run subscribe, onSubscribe and request on a specified scheduler.
@@ -52,10 +52,10 @@ type Flux interface {
// Raw returns Native Flux in reactor-go.
Raw() flux.Flux
// BlockFirst subscribe to this Flux and block indefinitely until the upstream signals its first value or completes.
- // Returns that value, error if Flux completes errror, or nil if the Flux completes empty.
+ // Returns that value, error if Flux completes error, or nil if the Flux completes empty.
BlockFirst(context.Context) (payload.Payload, error)
// BlockLast subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.
- // Returns that value, error if Flux completes errror, or nil if the Flux completes empty.
+ // Returns that value, error if Flux completes error, or nil if the Flux completes empty.
BlockLast(context.Context) (payload.Payload, error)
// ToChan subscribe to this Flux and puts items into a chan.
// It also puts errors into another chan.
diff --git a/rx/flux/proxy.go b/rx/flux/proxy.go
index b17532a..e7b3553 100644
--- a/rx/flux/proxy.go
+++ b/rx/flux/proxy.go
@@ -21,11 +21,11 @@ func (p proxy) Raw() flux.Flux {
}
func (p proxy) mustProcessor() flux.Processor {
- proc, ok := p.Flux.(flux.Processor)
+ processor, ok := p.Flux.(flux.Processor)
if !ok {
panic(errors.New("require flux.Processor"))
}
- return proc
+ return processor
}
func (p proxy) Next(v payload.Payload) {
diff --git a/server.go b/server.go
index 93accae..1a05b5a 100644
--- a/server.go
+++ b/server.go
@@ -348,11 +348,11 @@ L:
func (p *server) destroySessions() {
for p.sm.Len() > 0 {
- session := p.sm.Pop()
- if err := session.Close(); err != nil {
+ nextSession := p.sm.Pop()
+ if err := nextSession.Close(); err != nil {
logger.Warnf("kill session failed: %s\n", err)
} else if logger.IsDebugEnabled() {
- logger.Debugf("kill session success: %s\n", session)
+ logger.Debugf("kill session success: %s\n", nextSession)
}
}
}