Skip to content

Commit

Permalink
decouple callback from event
Browse files Browse the repository at this point in the history
  • Loading branch information
jyyi1 committed Jan 18, 2025
1 parent 4a2c58d commit 3ced93e
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 126 deletions.
97 changes: 97 additions & 0 deletions client/go/outline/callback/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2025 The Outline Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package callback provides a thread-safe mechanism for managing and invoking callbacks.
package callback

import (
"fmt"
"log/slog"
"sync"
)

// Token can be used to uniquely identify a registered callback.
type Token string

// Callback is an interface that can be implemented to receive callbacks.
type Callback interface {
OnCall(data string)
}

var (
mu sync.RWMutex
callbacks = make(map[uint32]Callback)
nextCbID uint32 = 1
)

// New registers a new callback and returns a unique callback token.
func New(c Callback) Token {
mu.Lock()
defer mu.Unlock()

id := nextCbID
nextCbID++
callbacks[id] = c
slog.Debug("callback created", "id", id)
return getTokenByID(id)
}

// Delete removes a callback identified by the token.
//
// Calling this function is safe even if the callback has not been registered.
func Delete(token Token) {
mu.Lock()
defer mu.Unlock()

if id, err := getIDByToken(token); err == nil {
delete(callbacks, id)
slog.Debug("callback deleted", "id", id)
} else {
slog.Warn("invalid callback token", "err", err, "token", token)
}
}

// Call executes a callback identified by the token.
//
// Calling this function is safe even if the callback has not been registered.
func Call(token Token, data string) {
id, err := getIDByToken(token)
if err != nil {
slog.Warn("invalid callback token", "err", err, "token", token)
return
}

mu.RLock()
cb, ok := callbacks[id]
mu.RUnlock()

if !ok {
slog.Warn("callback not yet created", "id", id, "token", token)
return
}
slog.Debug("invoking callback", "id", id, "data", data)
cb.OnCall(data)
}

// getTokenByID creates a string-based callback token from a number-based internal ID.
func getTokenByID(id uint32) Token {
return Token(fmt.Sprintf("cbid-%d", id))
}

// getIDByToken parses a number-based internal ID from a string-based callback token.
func getIDByToken(token Token) (uint32, error) {
var id uint32
_, err := fmt.Sscanf(string(token), "cbid-%d", &id)
return id, err
}
183 changes: 183 additions & 0 deletions client/go/outline/callback/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2025 The Outline Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package callback

import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
)

func Test_New(t *testing.T) {
curID := nextCbID
token := New(&testCallback{})
require.Equal(t, curID+1, nextCbID)
require.Contains(t, callbacks, curID)

require.NotEmpty(t, token)
require.Equal(t, fmt.Sprintf("cbid-%d", curID), string(token))

id, err := getIDByToken(token)
require.NoError(t, err)
require.Contains(t, callbacks, id)
require.Equal(t, id, curID)
}

func Test_Delete(t *testing.T) {
curID := nextCbID
token := New(&testCallback{})
require.Contains(t, callbacks, curID)

Delete(token)
require.NotContains(t, callbacks, curID)
require.Equal(t, curID+1, nextCbID)

Delete("invalid-token")
require.NotContains(t, callbacks, curID)
require.Equal(t, curID+1, nextCbID)

Delete("cbid-99999999")
require.NotContains(t, callbacks, curID)
require.Equal(t, curID+1, nextCbID)
}

func Test_Call(t *testing.T) {
c := &testCallback{}
token := New(c)
c.requireEqual(t, 0, "")

Call(token, "arg1")
c.requireEqual(t, 1, "arg1")

Call("invalid-token", "arg1")
c.requireEqual(t, 1, "arg1") // No change

Call(token, "arg2")
c.requireEqual(t, 2, "arg2")

Call("cbid-99999999", "arg3")
c.requireEqual(t, 2, "arg2") // No change
}

func Test_ConcurrentCreate(t *testing.T) {
const numTokens = 1000

curID := nextCbID
originalLen := len(callbacks)
var wg sync.WaitGroup

tokens := make([]Token, numTokens)
wg.Add(numTokens)
for i := 0; i < numTokens; i++ {
go func(i int) {
defer wg.Done()
tokens[i] = New(&testCallback{})
require.NotEmpty(t, tokens[i])
require.Regexp(t, `^cbid-\d+$`, tokens[i])
}(i)
}
wg.Wait()

require.Len(t, callbacks, originalLen+numTokens)
require.Equal(t, curID+numTokens, nextCbID)
tokenSet := make(map[Token]bool)
for _, token := range tokens {
require.False(t, tokenSet[token], "Duplicate token found: %s", token)
tokenSet[token] = true

id, err := getIDByToken(token)
require.NoError(t, err)
require.Contains(t, callbacks, id)
}
}

func Test_ConcurrentCall(t *testing.T) {
const numInvocations = 1000

curID := nextCbID
originalLen := len(callbacks)

c := &testCallback{}
token := New(c)

var wg sync.WaitGroup
wg.Add(numInvocations)
for i := 0; i < numInvocations; i++ {
go func(i int) {
defer wg.Done()
Call(token, fmt.Sprintf("data-%d", i))
}(i)
}
wg.Wait()

require.Equal(t, int32(numInvocations), c.cnt.Load())
require.Regexp(t, `^data-\d+$`, c.lastData.Load())

require.Len(t, callbacks, originalLen+1)
require.Equal(t, curID+1, nextCbID)
}

func Test_ConcurrentDelete(t *testing.T) {
const (
numTokens = 50
numDeletes = 1000
)

curID := nextCbID
originalLen := len(callbacks)

tokens := make([]Token, numTokens)
for i := 0; i < numTokens; i++ {
tokens[i] = New(&testCallback{})
}
require.Len(t, callbacks, originalLen+numTokens)
require.Equal(t, curID+numTokens, nextCbID)

var wg sync.WaitGroup
wg.Add(numDeletes)
for i := 0; i < numDeletes; i++ {
go func(i int) {
defer wg.Done()
Delete(tokens[i%numTokens])
}(i)
}
wg.Wait()

require.Len(t, callbacks, originalLen)
require.Equal(t, curID+numTokens, nextCbID)
}

// testCallback is a mock implementation of callback.Callback for testing.
type testCallback struct {
cnt atomic.Int32
lastData atomic.Value
}

func (tc *testCallback) OnCall(data string) {
tc.cnt.Add(1)
tc.lastData.Store(data)
}

func (tc *testCallback) requireEqual(t *testing.T, cnt int32, data string) {
require.Equal(t, cnt, tc.cnt.Load())
if cnt == 0 {
require.Nil(t, tc.lastData.Load())
} else {
require.Equal(t, data, tc.lastData.Load())
}
}
66 changes: 25 additions & 41 deletions client/go/outline/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,79 +12,63 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package event provides a thread-safe event system for Outline.
// It allows components to subscribe to events and be notified when they occur.
//
// This package is also designed for cross-language invocations between Go and TypeScript.
// All data structures related to an event should be designed to be compatible between both languages.
// Package event provides a thread-safe mechanism for managing and triggering events.
package event

import (
"log/slog"
"slices"
"sync"

"github.com/Jigsaw-Code/outline-apps/client/go/outline/callback"
)

// EventName is a type alias for string that represents the name of an event.
// Using a dedicated type improves type safety when working with event names.
type EventName string

// Listener is the interface that must be implemented by the object that wants to subscribe to events.
type Listener interface {
// When an event is triggered, Handle is called with the event data as well as the optional parameter
// that was passed during [Subscribe].
Handle(eventData, param string)
}

// listenerInfo holds the listener callback and the optional parameter provided during [Subscribe].
type listenerInfo struct {
cb Listener
param string
}

var (
mu sync.RWMutex // Protects the listeners map
listeners = make(map[EventName]listenerInfo) // A map containing all event listeners
mu sync.RWMutex
listeners = make(map[EventName][]callback.Token)
)

// Subscribe registers a [Listener] for a given [EventName].
//
// This function overwrites any existing listeners for the specified [EventName].
// AddListener adds a [callback.Callback] to a given [EventName].
//
// The provided [Listener] will be called when the event is invoked, along with the event data and the supplied param.
func Subscribe(evt EventName, cb Listener, param string) {
if evt == "" || cb == nil {
slog.Warn("empty event or listener is ignored")
// The provided callback will be called when the event is invoked, along with the event data.
func AddListener(evt EventName, cb callback.Token) {
if evt == "" || cb == "" {
slog.Warn("empty event or callback are ignored")
return
}
mu.Lock()
defer mu.Unlock()

listeners[evt] = listenerInfo{cb, param}
slog.Debug("successfully subscribed to event", "event", evt, "param", param)
listeners[evt] = append(listeners[evt], cb)
slog.Debug("successfully subscribed to event", "event", evt, "callback", cb)
}

// Unsubscribe removes the listener for the specified [EventName].
// RemoveListener removes a [callback.Callback] from the specified [EventName].
//
// Calling this function is safe even if the event has not been registered.
func Unsubscribe(evt EventName) {
func RemoveListener(evt EventName, cb callback.Token) {
mu.Lock()
defer mu.Unlock()

delete(listeners, evt)
slog.Debug("successfully ubsubscribed from event", "event", evt)
if cbs, ok := listeners[evt]; ok {
listeners[evt] = slices.DeleteFunc(cbs, func(t callback.Token) bool { return t == cb })
}
slog.Debug("successfully ubsubscribed from event", "event", evt, "callback", cb)
}

// Raise triggers the specified [EventName] with the given data.
// Fire triggers the specified [EventName], invoking all associated callbacks with the given data.
//
// This function will do nothing if no listener is registered.
func Raise(evt EventName, data string) {
// Calling this function is safe even if the event has not been registered.
func Fire(evt EventName, data string) {
mu.RLock()
defer mu.RUnlock()

if l, ok := listeners[evt]; ok {
slog.Debug("firing event", "event", evt, "data", data, "param", l.param)
l.cb.Handle(data, l.param)
} else {
slog.Debug("event fired but no handlers are found", "event", evt, "data", data)
slog.Debug("firing event", "event", evt, "data", data)
for _, cb := range listeners[evt] {
callback.Call(cb, data)
}
}
Loading

0 comments on commit 3ced93e

Please sign in to comment.