Skip to content

Commit

Permalink
Query - ensure finalizer is only called after a native call returns
Browse files Browse the repository at this point in the history
  • Loading branch information
vaind committed Mar 17, 2020
1 parent de02d9a commit 459ae65
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
24 changes: 24 additions & 0 deletions objectbox/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func queryFinalizer(query *Query) {

// The native query object in the ObjectBox core is not tied with other resources.
// Thus timing of the Close call is independent from other resources.
// Warning: it's important the object is kept around until a native call returns, e.g. using `runtime.KeepAlive(query)`.
func (query *Query) installFinalizer() {
runtime.SetFinalizer(query, queryFinalizer)
}
Expand All @@ -80,6 +81,8 @@ func (query *Query) errorClosed() error {

// Find returns all objects matching the query
func (query *Query) Find() (objects interface{}, err error) {
defer runtime.KeepAlive(query)

if query.cQuery == nil {
return 0, query.errorClosed()
}
Expand Down Expand Up @@ -113,6 +116,8 @@ func (query *Query) Limit(limit uint64) *Query {

// FindIds returns IDs of all objects matching the query
func (query *Query) FindIds() ([]uint64, error) {
defer runtime.KeepAlive(query)

if query.cQuery == nil {
return nil, query.errorClosed()
}
Expand All @@ -137,6 +142,7 @@ func (query *Query) Count() (uint64, error) {
if err := cCall(func() C.obx_err { return C.obx_query_count(query.cQuery, &cResult) }); err != nil {
return 0, err
}
runtime.KeepAlive(query)
return uint64(cResult), nil
}

Expand All @@ -155,6 +161,8 @@ func (query *Query) Remove() (count uint64, err error) {
if err := cCall(func() C.obx_err { return C.obx_query_remove(query.cQuery, &cResult) }); err != nil {
return 0, err
}

runtime.KeepAlive(query)
return uint64(cResult), nil
}

Expand All @@ -163,9 +171,11 @@ func (query *Query) DescribeParams() (string, error) {
if query.cQuery == nil {
return "", query.errorClosed()
}

// no need to free, it's handled by the cQuery internally
cResult := C.obx_query_describe_params(query.cQuery)

runtime.KeepAlive(query)
return C.GoString(cResult), nil
}

Expand Down Expand Up @@ -205,6 +215,8 @@ type propertyOrAlias interface {

// SetStringParams changes query parameter values on the given property
func (query *Query) SetStringParams(identifier propertyOrAlias, values ...string) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand Down Expand Up @@ -236,6 +248,8 @@ func (query *Query) SetStringParams(identifier propertyOrAlias, values ...string

// SetStringParamsIn changes query parameter values on the given property
func (query *Query) SetStringParamsIn(identifier propertyOrAlias, values ...string) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand Down Expand Up @@ -263,6 +277,8 @@ func (query *Query) SetStringParamsIn(identifier propertyOrAlias, values ...stri

// SetInt64Params changes query parameter values on the given property
func (query *Query) SetInt64Params(identifier propertyOrAlias, values ...int64) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand Down Expand Up @@ -299,6 +315,8 @@ func (query *Query) SetInt64Params(identifier propertyOrAlias, values ...int64)

// SetInt64ParamsIn changes query parameter values on the given property
func (query *Query) SetInt64ParamsIn(identifier propertyOrAlias, values ...int64) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand All @@ -323,6 +341,8 @@ func (query *Query) SetInt64ParamsIn(identifier propertyOrAlias, values ...int64

// SetInt32ParamsIn changes query parameter values on the given property
func (query *Query) SetInt32ParamsIn(identifier propertyOrAlias, values ...int32) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand All @@ -347,6 +367,8 @@ func (query *Query) SetInt32ParamsIn(identifier propertyOrAlias, values ...int32

// SetFloat64Params changes query parameter values on the given property
func (query *Query) SetFloat64Params(identifier propertyOrAlias, values ...float64) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand Down Expand Up @@ -384,6 +406,8 @@ func (query *Query) SetFloat64Params(identifier propertyOrAlias, values ...float

// SetBytesParams changes query parameter values on the given property
func (query *Query) SetBytesParams(identifier propertyOrAlias, values ...[]byte) error {
defer runtime.KeepAlive(query)

if err := query.checkIdentifier(identifier); err != nil {
return err
}
Expand Down
81 changes: 77 additions & 4 deletions test/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ func concurrentInsert(t *testing.T, count, concurrency int, putAsync bool) {

assert.NoErr(t, env.ObjectBox.AwaitAsyncCompletion())

t.Log("validating counts")
if len(errors) != 0 {
t.Errorf("encountered %d errors", len(errors))
for err := range errors {
t.Log(err)
t.Errorf("encountered %d errors:", len(errors))
for i := 0; i < len(errors); i++ {
t.Log(" ", <-errors)
}
}

t.Log("validating counts")
assert.Eq(t, 0, len(errors))
assert.Eq(t, count, len(ids))

Expand All @@ -121,3 +122,75 @@ func concurrentInsert(t *testing.T, count, concurrency int, putAsync bool) {
}
}
}

// TestConcurrentQuery checks concurrently running queries.
// Previously there was an issue with finalizers, with query being closed during the native call.
func TestConcurrentQuery(t *testing.T) {
env := iot.NewTestEnv()
defer env.Close()

box := iot.BoxForEvent(env.ObjectBox)

err := box.RemoveAll()
assert.NoErr(t, err)

var objects = 10000
var queries = 500
var concurrency = 4

if testing.Short() || strings.Contains(strings.ToLower(runtime.GOARCH), "arm") {
objects = 5000
queries = 200
concurrency = 2
}

assert.NoErr(t, env.ObjectBox.RunInWriteTx(func() error {
for i := objects; i > 0; i-- {
if _, e := box.Put(&iot.Event{
Device: "my device",
}); e != nil {
return e
}
}
return nil
}))

// prepare channels and launch the goroutines
errors := make(chan error, queries)

t.Logf("launching %d routines to execute %d queries each, over %d objects", concurrency, queries, objects)

var wg sync.WaitGroup
wg.Add(concurrency)
for i := concurrency; i > 0; i-- {
go func() {
defer wg.Done()
for j := queries; j > 0; j-- {
var e error
if j%2 == 0 {
_, e = box.Query(iot.Event_.Id.GreaterThan(0)).Find()
} else {
_, e = box.Query(iot.Event_.Id.GreaterThan(0)).FindIds()
}
if e != nil {
errors <- e
break
}
}
}()
}

// collect and check results after everything is done
t.Log("waiting for all goroutines to finish")
wg.Wait()

assert.NoErr(t, env.ObjectBox.AwaitAsyncCompletion())

if len(errors) != 0 {
t.Errorf("encountered %d errors:", len(errors))
for i := 0; i < len(errors); i++ {
t.Log(" ", <-errors)
}
}
assert.Eq(t, 0, len(errors))
}

0 comments on commit 459ae65

Please sign in to comment.