Skip to content

Commit

Permalink
optimize: lockfree
Browse files Browse the repository at this point in the history
  • Loading branch information
felix.fengmin committed Nov 7, 2023
1 parent 92ff0af commit 54bef26
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 35 deletions.
43 changes: 34 additions & 9 deletions internal/loader/funcdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package loader

import (
`sync`
_ `unsafe`
`sync/atomic`
`unsafe`

`github.com/cloudwego/frugal/internal/utils`
)

const (
Expand All @@ -35,8 +37,8 @@ var lastmoduledatap *_ModuleData
func moduledataverify1(_ *_ModuleData)

var (
modLock sync.Mutex
modList []*_ModuleData
/* retains local reference of all modules to bypass gc */
modList = utils.ListNode{}
)

func toZigzag(v int) int {
Expand Down Expand Up @@ -73,9 +75,32 @@ func encodeVariant(v int) []byte {
}

func registerModule(mod *_ModuleData) {
modLock.Lock()
modList = append(modList, mod)
lastmoduledatap.next = mod
lastmoduledatap = mod
modLock.Unlock()
modList.Prepend(unsafe.Pointer(mod))
registerModuleLockFree(&lastmoduledatap, mod)
}

func registerModuleLockFree(tail **_ModuleData, mod *_ModuleData) {
for {
oldTail := loadModule(tail)
if casModule(tail, oldTail, mod) {
storeModule(&oldTail.next, mod)
break
}
}
}

func loadModule(p **_ModuleData) *_ModuleData {
return (*_ModuleData)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(p))))
}

func storeModule(p **_ModuleData, value *_ModuleData) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(p)), unsafe.Pointer(value))
}

func casModule(p **_ModuleData, oldValue *_ModuleData, newValue *_ModuleData) bool {
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(p)),
unsafe.Pointer(oldValue),
unsafe.Pointer(newValue),
)
}
17 changes: 6 additions & 11 deletions internal/loader/funcdata_go116_117.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package loader

import (
`sync`
`unsafe`

`github.com/cloudwego/frugal/internal/rt`
`github.com/cloudwego/frugal/internal/utils`
)

type _Func struct {
Expand Down Expand Up @@ -115,16 +115,11 @@ var modHeader = &_PCHeader {
}

var (
emptyByte byte
bucketList []*_FindFuncBucket
bucketLock sync.Mutex
)
emptyByte byte

func appendToBucketList(ftab *_FindFuncBucket) {
bucketLock.Lock()
bucketList = append(bucketList, ftab)
bucketLock.Unlock()
}
/* retain local reference of all buckets to bypass gc */
bucketList = &utils.ListNode{}
)

func registerFunction(name string, pc uintptr, size uintptr, frame rt.Frame) {
var pbase uintptr
Expand Down Expand Up @@ -171,7 +166,7 @@ func registerFunction(name string, pc uintptr, size uintptr, frame rt.Frame) {
/* pin the find function bucket */
ftab := &ffunc[0]
pctab = append(pctab, 0)
appendToBucketList(ftab)
bucketList.Prepend(unsafe.Pointer(ftab))

/* function entry */
fn := _Func {
Expand Down
18 changes: 7 additions & 11 deletions internal/loader/funcdata_go118_121.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package loader

import (
`sync`
`unsafe`

`github.com/cloudwego/frugal/internal/rt`
`github.com/cloudwego/frugal/internal/utils`
)

type _FuncTab struct {
Expand Down Expand Up @@ -60,16 +60,11 @@ const minfunc = 16
const pcbucketsize = 256 * minfunc

var (
emptyByte byte
bucketList []*_FindFuncBucket
bucketLock sync.Mutex
)
emptyByte byte

func appendToBucketList(ftab *_FindFuncBucket) {
bucketLock.Lock()
bucketList = append(bucketList, ftab)
bucketLock.Unlock()
}
/* retain local reference of all buckets to bypass gc */
bucketList = &utils.ListNode{}
)

func registerFunction(name string, pc uintptr, size uintptr, frame rt.Frame) {
var pbase uintptr
Expand Down Expand Up @@ -111,7 +106,8 @@ func registerFunction(name string, pc uintptr, size uintptr, frame rt.Frame) {
/* pin the find function bucket */
ftab := &ffunc[0]
pctab = append(pctab, 0)
appendToBucketList(ftab)
bucketList.Prepend(unsafe.Pointer(ftab))


/* pin the pointer maps */
argptrs := frame.ArgPtrs.Pin()
Expand Down
48 changes: 48 additions & 0 deletions internal/loader/funcdata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 ByteDance Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loader

import (
"testing"
"sync"
)

func Test_registerModuleLockFree(t *testing.T) {
n, parallel := 1000, 8
head := _ModuleData{}
tail := &head
wg := sync.WaitGroup{}
wg.Add(parallel)
filler := func(n int) {
defer wg.Done()
for i := 0; i < n; i++ {
m := &_ModuleData{}
registerModuleLockFree(&tail, m)
}
}
for i := 0; i < parallel; i++ {
go filler(n)
}
wg.Wait()
i := 0
for p := head.next; p != nil; p = p.next {
i += 1
}
if i != parallel * n {
t.Errorf("got %v, expected %v", i, parallel * n)
}
}
6 changes: 5 additions & 1 deletion internal/loader/loader_amd64_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,18 @@ func TestLoader_StackMap(t *testing.T) {
require.NoError(t, asm.Assemble(src))
smb.AddField(true)
cc := asm.Code()
smb1 := new(rt.StackMapBuilder)
smb1.AddField(false)
fp := Loader(cc).Load("test_with_stackmap", rt.Frame {
SpTab: []rt.Stack {
{ Sp: 0, Nb: 4 },
{ Sp: 24, Nb: uintptr(len(cc) - 5) },
{ Sp: 0, Nb: 0 },
},
ArgSize : 0,
ArgPtrs : new(rt.StackMapBuilder).Build(),
ArgPtrs : smb1.Build(), // Build() should be called on non-empty Builder,
// otherwise mallocgc will allocate less than enough
// which leads to fatal error when run with -race
LocalPtrs : smb.Build(),
})
dumpfunction(*(*func())(unsafe.Pointer(&fp)))
Expand Down
47 changes: 47 additions & 0 deletions internal/utils/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2023 ByteDance Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
`sync/atomic`
`unsafe`
)

/* ListNode can be used to save references */
type ListNode struct{
value unsafe.Pointer
next *ListNode
}

/* Prepend creates a new node with value=p and adds it at the beginning of this list */
func (n *ListNode) Prepend(p unsafe.Pointer) {
for {
oldNext := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&n.next)))
newNode := &ListNode{
value: p,
next: (*ListNode)(oldNext),
}
success := atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&n.next)),
oldNext,
unsafe.Pointer(newNode),
)
if success {
break
}
}
}
48 changes: 48 additions & 0 deletions internal/utils/list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 ByteDance Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
"testing"
"sync"
"unsafe"
)

func Test_ListNode_Prepend(t *testing.T) {
n, parallel := 1000, 8
head := ListNode{}
wg := sync.WaitGroup{}
wg.Add(parallel)
filler := func(n int) {
defer wg.Done()
for i := 0; i < n; i++ {
v := i
head.Prepend((unsafe.Pointer)(&v))
}
}
for i := 0; i < parallel; i++ {
go filler(n)
}
wg.Wait()
i := 0
for p := head.next; p != nil; p = p.next {
i += 1
}
if i != parallel * n {
t.Errorf("got %v, expected %v", i, parallel * n)
}
}
9 changes: 6 additions & 3 deletions tests/allsize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var samples []Sample
var (
bytesCount = 16
stringCount = 16
listCount = 8
mapCount = 8
listCount = 8
mapCount = 8
)

func getSamples() []Sample {
Expand Down Expand Up @@ -575,7 +575,10 @@ func BenchmarkUnmarshalAllSize_Parallel_Frugal(b *testing.B) {
var v = reflect.New(rtype).Interface()
for pb.Next() {
objectmemclr(v)
_, err = frugal.DecodeObject(buf, v)
// new error object to avoid concurrent write to `err` which fails with -race
if _, errDecode := frugal.DecodeObject(buf, v); errDecode != nil {
b.Fatal(errDecode)
}
}
})
})
Expand Down

0 comments on commit 54bef26

Please sign in to comment.