Skip to content

Commit

Permalink
imporve: use sync map to store handlers (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii authored Jun 22, 2022
1 parent 0d1b1c9 commit 11ad065
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 306 deletions.
154 changes: 0 additions & 154 deletions internal/map32/map32.go

This file was deleted.

114 changes: 0 additions & 114 deletions internal/map32/map32_test.go

This file was deleted.

15 changes: 0 additions & 15 deletions internal/map32/types.go.go

This file was deleted.

33 changes: 10 additions & 23 deletions internal/socket/duplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/rsocket/rsocket-go/internal/bytesconv"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/internal/fragmentation"
"github.com/rsocket/rsocket-go/internal/map32"
"github.com/rsocket/rsocket-go/internal/misc"
"github.com/rsocket/rsocket-go/internal/queue"
"github.com/rsocket/rsocket-go/lease"
Expand Down Expand Up @@ -60,10 +59,10 @@ type DuplexConnection struct {
sndQueue chan core.WriteableFrame
sndBacklog []core.WriteableFrame
responder Responder
messages map32.Map32 // key=streamID, value=callback
messages sync.Map // key=streamID, value=callback
sids StreamID
mtu int
fragments map32.Map32 // key=streamID, value=Joiner
fragments sync.Map // key=streamID, value=Joiner
writeDone chan struct{}
keepaliver *Keepaliver
cond sync.Cond
Expand Down Expand Up @@ -162,11 +161,10 @@ func (dc *DuplexConnection) destroyTransport() {
}

func (dc *DuplexConnection) destroyHandler(err error) {
defer dc.messages.Destroy()
// TODO: optimize callback map
var callbacks []callback
dc.messages.Range(func(sid uint32, v interface{}) bool {
callbacks = append(callbacks, v.(callback))
dc.messages.Range(func(_, value interface{}) bool {
callbacks = append(callbacks, value.(callback))
return true
})
for _, next := range callbacks {
Expand All @@ -175,11 +173,10 @@ func (dc *DuplexConnection) destroyHandler(err error) {
}

func (dc *DuplexConnection) destroyFragment() {
dc.fragments.Range(func(u uint32, i interface{}) bool {
dc.fragments.Range(func(_, i interface{}) bool {
common.TryRelease(i)
return true
})
dc.fragments.Destroy()
}

func (dc *DuplexConnection) destroySndQueue() {
Expand Down Expand Up @@ -1360,21 +1357,11 @@ func newDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Schedul
leases: leases,
sndQueue: make(chan core.WriteableFrame, _outChanSize),
mtu: mtu,
messages: map32.New(map32.WithCap(32), map32.WithHasher(func(key uint32, _ int) int {
var n int
if key&1 == 0 {
n = int(key) >> 1
} else {
n = (int(key)-1)>>1 + 1
}
return n & 31
})),
sids: sids,
fragments: map32.New(map32.WithCap(1)),
counter: core.NewTrafficCounter(),
keepaliver: ka,
closed: atomic.NewBool(false),
ready: atomic.NewBool(false),
sids: sids,
counter: core.NewTrafficCounter(),
keepaliver: ka,
closed: atomic.NewBool(false),
ready: atomic.NewBool(false),
}

c.cond.L = &c.locker
Expand Down

0 comments on commit 11ad065

Please sign in to comment.