Skip to content

Commit

Permalink
Merge pull request #53 from rsocket/fix-issue-51
Browse files Browse the repository at this point in the history
fix #51: support deconstructed error.
  • Loading branch information
jjeffcaii authored May 23, 2020
2 parents 863d63f + dfcdbb8 commit 75d8d33
Show file tree
Hide file tree
Showing 28 changed files with 223 additions and 220 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func init() {
logger.SetLevel(logger.LevelInfo)
}

```
```

#### Dependencies
- [reactor-go](https://github.com/jjeffcaii/reactor-go)
Expand Down Expand Up @@ -297,5 +297,5 @@ func init() {
- [x] Cancel
- [x] Error
- [x] Flow Control: RequestN
- [ ] Flow Control: Lease
- [x] Flow Control: Lease
- [x] Load Balance
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ type (
Start(ctx context.Context) (Client, error)
// Start start a client socket with TLS.
// Here's an example:
// tc:=&tls.Config{
// tc := &tls.Config {
// InsecureSkipVerify: true,
//}
// }
StartTLS(ctx context.Context, tc *tls.Config) (Client, error)
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func Connect() ClientBuilder {
fragment: fragmentation.MaxFragment,
setup: &socket.SetupInfo{
Version: common.DefaultVersion,
KeepaliveInterval: common.DefaultKeepaliveInteval,
KeepaliveInterval: common.DefaultKeepaliveInterval,
KeepaliveLifetime: common.DefaultKeepaliveMaxLifetime,
DataMimeType: defaultMimeType,
MetadataMimeType: defaultMimeType,
Expand Down
3 changes: 1 addition & 2 deletions cmd/rsocket-cli/rsocket-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"time"

"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/logger"
"github.com/rsocket/rsocket-go/rx"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -108,7 +107,7 @@ func newFlags(args *Runner) []cli.Flag {
&cli.StringFlag{
Name: "metadataFormat",
Usage: "Metadata Format",
Value: extension.ApplicationJSON.String(),
Value: "application/json",
Destination: &args.MetadataFormat,
},
&cli.StringFlag{
Expand Down
26 changes: 13 additions & 13 deletions cmd/rsocket-cli/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
return
}
setupPayload := payload.New(setupData, nil)
sendings := p.createPayload()
sendingPayloads := p.createPayload()
c, err := cb.
DataMimeType(p.DataFormat).
MetadataMimeType(p.MetadataFormat).
Expand All @@ -131,7 +131,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
}
var first payload.Payload
if !p.Channel {
first, err = sendings.BlockFirst(ctx)
first, err = sendingPayloads.BlockFirst(ctx)
if err != nil {
return
}
Expand All @@ -144,7 +144,7 @@ func (p *Runner) runClientMode(ctx context.Context) (err error) {
} else if p.Stream {
err = p.execRequestStream(ctx, c, first)
} else if p.Channel {
err = p.execRequestChannel(ctx, c, sendings)
err = p.execRequestChannel(ctx, c, sendingPayloads)
} else if p.MetadataPush {
err = p.execMetadataPush(ctx, c, first)
} else {
Expand All @@ -166,24 +166,24 @@ func (p *Runner) runServerMode(ctx context.Context) error {
}
ch := make(chan error)
go func() {
sendings := p.createPayload()
sendingPayloads := p.createPayload()
ch <- sb.
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
var options []rsocket.OptAbstractSocket
options = append(options, rsocket.RequestStream(func(msg payload.Payload) flux.Flux {
p.showPayload(msg)
return sendings
options = append(options, rsocket.RequestStream(func(message payload.Payload) flux.Flux {
p.showPayload(message)
return sendingPayloads
}))
options = append(options, rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
msgs.Subscribe(ctx, rx.OnNext(func(input payload.Payload) {
options = append(options, rsocket.RequestChannel(func(messages rx.Publisher) flux.Flux {
messages.Subscribe(ctx, rx.OnNext(func(input payload.Payload) {
p.showPayload(input)
}))
return sendings
return sendingPayloads
}))
options = append(options, rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
p.showPayload(msg)
return mono.Create(func(i context.Context, sink mono.Sink) {
first, err := sendings.BlockFirst(i)
first, err := sendingPayloads.BlockFirst(i)
if err != nil {
sink.Error(err)
return
Expand All @@ -207,14 +207,14 @@ func (p *Runner) runServerMode(ctx context.Context) error {
return <-ch
}

func (p *Runner) execMetadataPush(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) {
func (p *Runner) execMetadataPush(_ context.Context, c rsocket.Client, send payload.Payload) (err error) {
c.MetadataPush(send)
m, _ := send.MetadataUTF8()
logger.Infof("%s\n", m)
return
}

func (p *Runner) execFireAndForget(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) {
func (p *Runner) execFireAndForget(_ context.Context, c rsocket.Client, send payload.Payload) (err error) {
c.FireAndForget(send)
return
}
Expand Down
Empty file removed docs/.nojekyll
Empty file.
3 changes: 0 additions & 3 deletions docs/README.md

This file was deleted.

15 changes: 0 additions & 15 deletions docs/_coverpage.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/_media/logo.svg

This file was deleted.

4 changes: 0 additions & 4 deletions docs/_sidebar.md

This file was deleted.

22 changes: 0 additions & 22 deletions docs/index.html

This file was deleted.

2 changes: 1 addition & 1 deletion examples/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func responder() rsocket.RSocket {
// cdl.await();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new Errorf(e);
// throw new Error(e);
// }
//}
s := pl.DataUTF8()
Expand Down
44 changes: 22 additions & 22 deletions extension/mime.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
package extension

// MIME is MIME types in number.
// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md
type MIME int8

func (p MIME) String() string {
return mimeTypes[p]
}

var (
mimeTypes map[MIME]string
mimeTypesR map[string]MIME
_mimeTypes map[MIME]string
_mimeTypesReverse map[string]MIME
)

// ParseMIME parse a string to MIME.
func ParseMIME(str string) (mime MIME, ok bool) {
mime, ok = mimeTypesR[str]
if !ok {
mime = -1
}
return
}
// MIME is MIME types in number.
// Please see: https://github.com/rsocket/rsocket/blob/master/Extensions/WellKnownMimeTypes.md
type MIME int8

// All MIMEs
const (
Expand Down Expand Up @@ -74,7 +61,7 @@ const (
)

func init() {
mimeTypes = map[MIME]string{
_mimeTypes = map[MIME]string{
ApplicationAvro: "application/avro",
ApplicationCBOR: "application/cbor",
ApplicationGraphql: "application/graphql",
Expand Down Expand Up @@ -123,8 +110,21 @@ func init() {
MessageRouting: "message/x.rsocket.routing.v0",
MessageCompositeMetadata: "message/x.rsocket.composite-metadata.v0",
}
mimeTypesR = make(map[string]MIME, len(mimeTypes))
for k, v := range mimeTypes {
mimeTypesR[v] = k
_mimeTypesReverse = make(map[string]MIME, len(_mimeTypes))
for k, v := range _mimeTypes {
_mimeTypesReverse[v] = k
}
}

func (p MIME) String() string {
return _mimeTypes[p]
}

// ParseMIME parse a string to MIME.
func ParseMIME(str string) (mime MIME, ok bool) {
mime, ok = _mimeTypesReverse[str]
if !ok {
mime = -1
}
return
}
39 changes: 23 additions & 16 deletions internal/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,31 @@ import "errors"
// ErrorCode is code for RSocket error.
type ErrorCode uint32

var errorCodeMap = map[ErrorCode]string{
ErrorCodeInvalidSetup: "INVALID_SETUP",
ErrorCodeUnsupportedSetup: "UNSUPPORTED_SETUP",
ErrorCodeRejectedSetup: "REJECTED_SETUP",
ErrorCodeRejectedResume: "REJECTED_RESUME",
ErrorCodeConnectionError: "CONNECTION_ERROR",
ErrorCodeConnectionClose: "CONNECTION_CLOSE",
ErrorCodeApplicationError: "APPLICATION_ERROR",
ErrorCodeRejected: "REJECTED",
ErrorCodeCanceled: "CANCELED",
ErrorCodeInvalid: "INVALID",
}

func (p ErrorCode) String() string {
if s, ok := errorCodeMap[p]; ok {
return s
switch p {
case ErrorCodeInvalidSetup:
return "INVALID_SETUP"
case ErrorCodeUnsupportedSetup:
return "UNSUPPORTED_SETUP"
case ErrorCodeRejectedSetup:
return "REJECTED_SETUP"
case ErrorCodeRejectedResume:
return "REJECTED_RESUME"
case ErrorCodeConnectionError:
return "CONNECTION_ERROR"
case ErrorCodeConnectionClose:
return "CONNECTION_CLOSE"
case ErrorCodeApplicationError:
return "APPLICATION_ERROR"
case ErrorCodeRejected:
return "REJECTED"
case ErrorCodeCanceled:
return "CANCELED"
case ErrorCodeInvalid:
return "INVALID"
default:
return "UNKNOWN"
}
return "UNKNOWN"
}

const (
Expand Down
4 changes: 2 additions & 2 deletions internal/common/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
)

const (
// DefaultKeepaliveInteval is default keepalive interval duration.
DefaultKeepaliveInteval = 20 * time.Second
// DefaultKeepaliveInterval is default keepalive interval duration.
DefaultKeepaliveInterval = 20 * time.Second
// DefaultKeepaliveMaxLifetime is default keepalive max lifetime.
DefaultKeepaliveMaxLifetime = 90 * time.Second
)
2 changes: 1 addition & 1 deletion internal/common/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var DefaultVersion Version = [2]uint16{1, 0}

// Version define the version of protocol.
// It inclues major and minor version.
// It includes major and minor version.
type Version [2]uint16

// Bytes returns raw bytes of current version.
Expand Down
8 changes: 4 additions & 4 deletions internal/framing/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ type Frame interface {
SetBody(body *common.ByteBuff)
// Bytes encodes and returns frame in bytes.
Bytes() []byte
// IsResumable returns true if frame supports resume.
IsResumable() bool
// CanResume returns true if frame supports resume.
CanResume() bool
// Done marks current frame has been sent.
Done() (closed bool)
// DoneNotify notifies when frame done.
Expand Down Expand Up @@ -171,8 +171,8 @@ func (p *BaseFrame) DoneNotify() <-chan struct{} {
return p.done
}

// IsResumable returns true if frame supports resume.
func (p *BaseFrame) IsResumable() bool {
// CanResume returns true if frame supports resume.
func (p *BaseFrame) CanResume() bool {
switch p.header.Type() {
case FrameTypeRequestChannel, FrameTypeRequestStream, FrameTypeRequestResponse, FrameTypeRequestFNF, FrameTypeRequestN, FrameTypeCancel, FrameTypeError, FrameTypePayload:
return true
Expand Down
2 changes: 1 addition & 1 deletion internal/framing/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
HeaderLen = 6
)

// FrameHeader is the header fo a RSocke frame.
// FrameHeader is the header fo a RSocket frame.
// RSocket frames begin with a RSocket Frame Header.
// It includes StreamID, FrameType and Flags.
type FrameHeader [HeaderLen]byte
Expand Down
Loading

0 comments on commit 75d8d33

Please sign in to comment.