Skip to content

Commit

Permalink
support websocket transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed May 15, 2019
1 parent 8948296 commit 4b29a47
Show file tree
Hide file tree
Showing 28 changed files with 640 additions and 194 deletions.
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![GoDoc](https://godoc.org/github.com/rsocket/rsocket-go?status.svg)](https://godoc.org/github.com/rsocket/rsocket-go)
[![Go Report Card](https://goreportcard.com/badge/github.com/rsocket/rsocket-go)](https://goreportcard.com/report/github.com/rsocket/rsocket-go)
[![License](https://img.shields.io/github/license/rsocket/rsocket-go.svg)](https://github.com/rsocket/rsocket-go/blob/master/LICENSE)
[![GitHub Release](https://img.shields.io/github/release/rsocket/rsocket-go.svg)](https://github.com/rsocket/rsocket-go/releases)
[![GitHub Release](https://img.shields.io/github/release-pre/rsocket/rsocket-go.svg)](https://github.com/rsocket/rsocket-go/releases)

rsocket-go is an implementation of the [RSocket](http://rsocket.io/) protocol in Go. It is still under development, APIs are unstable and maybe change at any time until release of v1.0.0. **Please do not use it in a production environment**.

Expand Down Expand Up @@ -51,17 +51,18 @@ package main

import (
"context"
"log"

"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"log"
)

func main() {
// Connect to server
client, err := rsocket.Connect().
SetupPayload(payload.NewString("Hello", "World")).
Transport("127.0.0.1:7878").
Transport("tcp://127.0.0.1:7878").
Start()
if err != nil {
panic(err)
Expand Down Expand Up @@ -156,6 +157,7 @@ package main

import (
"context"

"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
)
Expand Down Expand Up @@ -190,14 +192,15 @@ func main() {
`Flux` emits 0 to N elements, and then completes (successfully or with an error).
Here is tiny example:

``` go
```go
package main

import (
"context"
"time"

"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"time"
)

func main() {
Expand All @@ -208,8 +211,7 @@ func main() {
}
producer.Complete()
})
flux.
DoOnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
flux.DoOnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
// Handle and consume elements
// Do something here...
}).
Expand Down Expand Up @@ -245,13 +247,15 @@ flux.Subscribe(
- [ants](https://github.com/panjf2000/ants)
- [bytebufferpool](https://github.com/valyala/bytebufferpool)
- [testify](https://github.com/stretchr/testify)
- [websocket](https://github.com/gorilla/websocket)

### TODO

#### Transport
- [x] TCP
- [ ] Websocket
- [ ] HTTP/2
- [x] Websocket
- [ ] Aeron
- [ ] QUIC

#### Duplex Socket
- [x] MetadataPush
Expand Down
File renamed without changes.
File renamed without changes.
14 changes: 6 additions & 8 deletions example/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ import (
"github.com/rsocket/rsocket-go/rx"
)

func init() {
func main() {
go func() {
log.Println(http.ListenAndServe(":4444", nil))
}()
}

func main() {
logger.SetLoggerLevel(logger.LogLevelInfo)
//logger.SetLoggerLevel(logger.LogLevelDebug)
err := createEchoServer("127.0.0.1", 7878)
//err := createEchoServer("ws://127.0.0.1:7878")
err := createEchoServer("tcp://127.0.0.1:7878")
panic(err)
}

func createEchoServer(host string, port int) error {
func createEchoServer(uri string) error {
responder := rsocket.NewAbstractSocket(
rsocket.MetadataPush(func(item payload.Payload) {
log.Println("GOT METADATA_PUSH:", item)
Expand Down Expand Up @@ -146,11 +144,11 @@ func createEchoServer(host string, port int) error {
// Subscribe(context.Background())

sendingSocket.OnClose(func() {
logger.Infof("***** socket disconnected *****\n")
log.Println("***** socket disconnected *****")
})

return responder
}).
Transport(fmt.Sprintf("%s:%d", host, port)).
Transport(uri).
Serve()
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package benchmark
package main_test

import (
"bytes"
Expand All @@ -18,42 +18,10 @@ import (
"github.com/stretchr/testify/assert"
)

func doOnce(host string, port int, totals int) {
wg := &sync.WaitGroup{}
wg.Add(totals)
data := []byte(strings.Repeat("A", 4096))
md := []byte("benchmark_test")
ctx := context.Background()
log.Printf("CONN: %s:%d\n", host, port)
clients := make([]rsocket.ClientSocket, totals)
now := time.Now()
for i := 0; i < totals; i++ {
clients[i] = createClient(host, port)
}
log.Println("SETUP:", time.Now().Sub(now))
now = time.Now()
for _, client := range clients {
client.RequestResponse(payload.New(data, md)).
DoFinally(func(ctx context.Context, sig rx.SignalType) {
wg.Done()
}).
SubscribeOn(rx.ElasticScheduler()).
Subscribe(ctx)
}
wg.Wait()
cost := time.Now().Sub(now)

log.Println("TOTALS:", totals)
log.Println("COST:", cost)
log.Printf("QPS: %.2f\n", float64(totals)/cost.Seconds())
time.Sleep(10 * time.Hour)
for _, client := range clients {
_ = client.Close()
}
}
const uri = "tcp://127.0.0.1:7878"

func TestClient_RequestResponse(t *testing.T) {
client := createClient("127.0.0.1", 7878)
client := createClient(uri)
defer func() {
_ = client.Close()
}()
Expand Down Expand Up @@ -84,10 +52,10 @@ func TestClient_RequestResponse(t *testing.T) {

func TestClients_RequestResponse(t *testing.T) {
log.Println("---------------")
doOnce("127.0.0.1", 7878, 10000)
doOnce(10000)
}

func createClient(host string, port int) rsocket.ClientSocket {
func createClient(uri string) rsocket.ClientSocket {
client, err := rsocket.Connect().
SetupPayload(payload.NewString("你好", "世界")).
Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
Expand All @@ -101,10 +69,43 @@ func createClient(host string, port int) rsocket.ClientSocket {
}),
)
}).
Transport(fmt.Sprintf("%s:%d", host, port)).
Transport(uri).
Start()
if err != nil {
panic(err)
}
return client
}

func doOnce(totals int) {
wg := &sync.WaitGroup{}
wg.Add(totals)
data := []byte(strings.Repeat("A", 4096))
md := []byte("benchmark_test")
ctx := context.Background()
clients := make([]rsocket.ClientSocket, totals)
now := time.Now()
for i := 0; i < totals; i++ {
clients[i] = createClient(uri)
}
log.Println("SETUP:", time.Now().Sub(now))
now = time.Now()
for _, client := range clients {
client.RequestResponse(payload.New(data, md)).
DoFinally(func(ctx context.Context, sig rx.SignalType) {
wg.Done()
}).
SubscribeOn(rx.ElasticScheduler()).
Subscribe(ctx)
}
wg.Wait()
cost := time.Now().Sub(now)

log.Println("TOTALS:", totals)
log.Println("COST:", cost)
log.Printf("QPS: %.2f\n", float64(totals)/cost.Seconds())
time.Sleep(10 * time.Hour)
for _, client := range clients {
_ = client.Close()
}
}
7 changes: 7 additions & 0 deletions framing/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ type Frame interface {
SetHeader(h FrameHeader)
// SetBody set frame body.
SetBody(body *common.ByteBuff)
// Bytes encodes and returns frame in bytes.
Bytes() []byte
}

// BaseFrame is basic frame implementation.
Expand Down Expand Up @@ -174,6 +176,11 @@ func (p *BaseFrame) WriteTo(w io.Writer) (n int64, err error) {
return
}

// Bytes returns frame in bytes.
func (p *BaseFrame) Bytes() []byte {
return append(p.header[:], p.body.Bytes()...)
}

// Release release resources of frame.
func (p *BaseFrame) Release() {
if p.body == nil {
Expand Down
10 changes: 3 additions & 7 deletions framing/frame_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@ func (p *FrameError) ErrorData() []byte {
// NewFrameError returns a new error frame.
func NewFrameError(streamID uint32, code common.ErrorCode, data []byte) *FrameError {
bf := common.BorrowByteBuffer()
b4 := common.BorrowByteBuffer()
defer common.ReturnByteBuffer(b4)
for range [4]struct{}{} {
_ = b4.WriteByte(0)
}
binary.BigEndian.PutUint32(b4.Bytes(), uint32(code))
_, _ = b4.WriteTo(bf)
var b4 [4]byte
binary.BigEndian.PutUint32(b4[:], uint32(code))
_, _ = bf.Write(b4[:])
_, _ = bf.Write(data)
return &FrameError{
&BaseFrame{
Expand Down
6 changes: 3 additions & 3 deletions framing/frame_keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func NewFrameKeepalive(position uint64, data []byte, respond bool) *FrameKeepali
fg |= FlagRespond
}
bf := common.BorrowByteBuffer()
var blank8 [8]byte
binary.BigEndian.PutUint64(blank8[:], position)
_, _ = bf.Write(blank8[:])
var b8 [8]byte
binary.BigEndian.PutUint64(b8[:], position)
_, _ = bf.Write(b8[:])
if len(data) > 0 {
_, _ = bf.Write(data)
}
Expand Down
7 changes: 3 additions & 4 deletions framing/frame_request_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ func (p *FrameRequestChannel) DataUTF8() string {
func NewFrameRequestChannel(sid uint32, n uint32, data, metadata []byte, flags ...FrameFlag) *FrameRequestChannel {
fg := newFlags(flags...)
bf := common.BorrowByteBuffer()
for range [4]struct{}{} {
_ = bf.WriteByte(0)
}
binary.BigEndian.PutUint32(bf.Bytes(), n)
var b4 [4]byte
binary.BigEndian.PutUint32(b4[:], n)
_, _ = bf.Write(b4[:])
if len(metadata) > 0 {
fg |= FlagMetadata
_ = bf.WriteUint24(len(metadata))
Expand Down
9 changes: 5 additions & 4 deletions framing/frame_request_n.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func (p *FrameRequestN) N() uint32 {
func NewFrameRequestN(sid, n uint32, flags ...FrameFlag) *FrameRequestN {
fg := newFlags(flags...)
bf := common.BorrowByteBuffer()
for i := 0; i < 4; i++ {
_ = bf.WriteByte(0)
}
binary.BigEndian.PutUint32(bf.Bytes(), n)

var b4 [4]byte
binary.BigEndian.PutUint32(b4[:], n)
_, _ = bf.Write(b4[:])

return &FrameRequestN{
&BaseFrame{
header: NewFrameHeader(sid, FrameTypeRequestN, fg),
Expand Down
18 changes: 7 additions & 11 deletions framing/frame_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,15 @@ func NewFrameSetup(version common.Version, timeBetweenKeepalive, maxLifetime tim
var fg FrameFlag
bf := common.BorrowByteBuffer()
_, _ = bf.Write(version.Bytes())
b4 := common.BorrowByteBuffer()
defer common.ReturnByteBuffer(b4)
for i := 0; i < 4; i++ {
_ = b4.WriteByte(0)
}
binary.BigEndian.PutUint32(b4.Bytes(), uint32(timeBetweenKeepalive.Nanoseconds()/1e6))
_, _ = b4.WriteTo(bf)
binary.BigEndian.PutUint32(b4.Bytes(), uint32(maxLifetime.Nanoseconds()/1e6))
_, _ = b4.WriteTo(bf)
var b4 [4]byte
binary.BigEndian.PutUint32(b4[:], uint32(timeBetweenKeepalive.Nanoseconds()/1e6))
_, _ = bf.Write(b4[:])
binary.BigEndian.PutUint32(b4[:], uint32(maxLifetime.Nanoseconds()/1e6))
_, _ = bf.Write(b4[:])
if len(token) > 0 {
fg |= FlagResume
binary.BigEndian.PutUint16(b4.Bytes(), uint16(len(token)))
_, _ = bf.Write(b4.Bytes()[:2])
binary.BigEndian.PutUint16(b4[:2], uint16(len(token)))
_, _ = bf.Write(b4[:2])
_, _ = bf.Write(token)
}
_ = bf.WriteByte(byte(len(mimeMetadata)))
Expand Down
26 changes: 26 additions & 0 deletions framing/frame_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package framing

import (
"encoding/hex"
"log"
"testing"

"github.com/rsocket/rsocket-go/common"
"github.com/stretchr/testify/assert"
)

func TestDecode_Payload(t *testing.T) {
//s := "000000012940000005776f726c6468656c6c6f" // go
s:="00000001296000000966726f6d5f6a617661706f6e67" //java
bs, err := hex.DecodeString(s)
assert.NoError(t, err, "bad bytes")
h := ParseFrameHeader(bs[:HeaderLen])
log.Println(h)
bf := common.BorrowByteBuffer()
_, _ = bf.Write(bs[HeaderLen:])
pl := &FramePayload{
BaseFrame: NewBaseFrame(h, bf),
}
defer pl.Release()
log.Println(pl)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/rsocket/rsocket-go
go 1.12

require (
github.com/gorilla/websocket v1.4.0
github.com/panjf2000/ants v1.0.0
github.com/stretchr/testify v1.3.0
github.com/valyala/bytebufferpool v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/panjf2000/ants v1.0.0 h1:MZBsUt8W6ktQfhIswUZpw17IJlXY6ly2+U5b9jxwad4=
github.com/panjf2000/ants v1.0.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
File renamed without changes.
Loading

0 comments on commit 4b29a47

Please sign in to comment.