From 082a912f1d5a4b6cc3314e96c1d4e60beef2d800 Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 2 Feb 2024 20:04:23 +0100 Subject: [PATCH] Add channel name to websocket message --- cmd/api/bus/dispatcher.go | 3 - cmd/api/handler/websocket/channel.go | 6 +- cmd/api/handler/websocket/filters.go | 12 +- cmd/api/handler/websocket/messages.go | 28 +++- cmd/api/handler/websocket/processors.go | 12 +- cmd/api/handler/ws_test.go | 164 ++++++++++++++++++++++++ cmd/api/init.go | 2 +- cmd/api/markdown/websocket.md | 15 ++- 8 files changed, 220 insertions(+), 22 deletions(-) create mode 100644 cmd/api/handler/ws_test.go diff --git a/cmd/api/bus/dispatcher.go b/cmd/api/bus/dispatcher.go index ca07aae..d162e6f 100644 --- a/cmd/api/bus/dispatcher.go +++ b/cmd/api/bus/dispatcher.go @@ -18,7 +18,6 @@ import ( type Dispatcher struct { listener storage.Listener - state storage.IState blocks storage.IBlock mx *sync.RWMutex @@ -29,7 +28,6 @@ type Dispatcher struct { func NewDispatcher( factory storage.ListenerFactory, - state storage.IState, blocks storage.IBlock, ) (*Dispatcher, error) { if factory == nil { @@ -38,7 +36,6 @@ func NewDispatcher( listener := factory.CreateListener() return &Dispatcher{ listener: listener, - state: state, blocks: blocks, observers: make([]*Observer, 0), mx: new(sync.RWMutex), diff --git a/cmd/api/handler/websocket/channel.go b/cmd/api/handler/websocket/channel.go index 45a49d3..2c10852 100644 --- a/cmd/api/handler/websocket/channel.go +++ b/cmd/api/handler/websocket/channel.go @@ -9,15 +9,15 @@ import ( "github.com/pkg/errors" ) -type processor[I, M any] func(data I) M +type processor[I any, M INotification] func(data I) Notification[M] -type Channel[I, M any] struct { +type Channel[I any, M INotification] struct { clients *sdkSync.Map[uint64, client] processor processor[I, M] filters Filterable[M] } -func NewChannel[I, M any](processor processor[I, M], filters Filterable[M]) *Channel[I, M] { +func NewChannel[I any, M INotification](processor processor[I, M], filters Filterable[M]) *Channel[I, M] { return &Channel[I, M]{ clients: sdkSync.NewMap[uint64, client](), processor: processor, diff --git a/cmd/api/handler/websocket/filters.go b/cmd/api/handler/websocket/filters.go index b30a7ad..368f27c 100644 --- a/cmd/api/handler/websocket/filters.go +++ b/cmd/api/handler/websocket/filters.go @@ -7,14 +7,14 @@ import ( "github.com/celenium-io/astria-indexer/cmd/api/handler/responses" ) -type Filterable[M any] interface { - Filter(c client, msg M) bool +type Filterable[M INotification] interface { + Filter(c client, msg Notification[M]) bool } type HeadFilter struct{} -func (hf HeadFilter) Filter(c client, msg *responses.State) bool { - if msg == nil { +func (f HeadFilter) Filter(c client, msg Notification[*responses.State]) bool { + if msg.Body == nil { return false } fltrs := c.Filters() @@ -26,8 +26,8 @@ func (hf HeadFilter) Filter(c client, msg *responses.State) bool { type BlockFilter struct{} -func (hf BlockFilter) Filter(c client, msg *responses.Block) bool { - if msg == nil { +func (f BlockFilter) Filter(c client, msg Notification[*responses.Block]) bool { + if msg.Body == nil { return false } fltrs := c.Filters() diff --git a/cmd/api/handler/websocket/messages.go b/cmd/api/handler/websocket/messages.go index 2a002fe..f05bf17 100644 --- a/cmd/api/handler/websocket/messages.go +++ b/cmd/api/handler/websocket/messages.go @@ -3,7 +3,10 @@ package websocket -import "github.com/goccy/go-json" +import ( + "github.com/celenium-io/astria-indexer/cmd/api/handler/responses" + "github.com/goccy/go-json" +) // methods const ( @@ -35,3 +38,26 @@ type TransactionFilters struct { Status []string `json:"status,omitempty"` Actions []string `json:"action_type,omitempty"` } + +type INotification interface { + *responses.Block | *responses.State +} + +type Notification[T INotification] struct { + Channel string `json:"channel"` + Body T `json:"body"` +} + +func NewBlockNotification(block responses.Block) Notification[*responses.Block] { + return Notification[*responses.Block]{ + Channel: ChannelBlocks, + Body: &block, + } +} + +func NewStateNotification(state responses.State) Notification[*responses.State] { + return Notification[*responses.State]{ + Channel: ChannelHead, + Body: &state, + } +} diff --git a/cmd/api/handler/websocket/processors.go b/cmd/api/handler/websocket/processors.go index 9a75481..b2937e1 100644 --- a/cmd/api/handler/websocket/processors.go +++ b/cmd/api/handler/websocket/processors.go @@ -8,12 +8,12 @@ import ( "github.com/celenium-io/astria-indexer/internal/storage" ) -func headProcessor(state storage.State) *responses.State { - response := responses.NewState(state) - return &response +func blockProcessor(block storage.Block) Notification[*responses.Block] { + response := responses.NewBlock(block) + return NewBlockNotification(response) } -func blockProcessor(block storage.Block) *responses.Block { - response := responses.NewBlock(block) - return &response +func headProcessor(state storage.State) Notification[*responses.State] { + response := responses.NewState(state) + return NewStateNotification(response) } diff --git a/cmd/api/handler/ws_test.go b/cmd/api/handler/ws_test.go new file mode 100644 index 0000000..fb648a3 --- /dev/null +++ b/cmd/api/handler/ws_test.go @@ -0,0 +1,164 @@ +package handler + +import ( + "context" + "crypto/rand" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" + + "github.com/celenium-io/astria-indexer/pkg/types" + + "github.com/celenium-io/astria-indexer/cmd/api/bus" + "github.com/celenium-io/astria-indexer/cmd/api/handler/responses" + ws "github.com/celenium-io/astria-indexer/cmd/api/handler/websocket" + "github.com/celenium-io/astria-indexer/internal/storage" + "github.com/celenium-io/astria-indexer/internal/storage/mock" + "github.com/goccy/go-json" + "github.com/gorilla/websocket" + "github.com/labstack/echo/v4" + "github.com/lib/pq" + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestWebsocket(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + listenerFactory := mock.NewMockListenerFactory(ctrl) + listener := mock.NewMockListener(ctrl) + + listenerFactory.EXPECT().CreateListener().Return(listener).Times(1) + + headChannel := make(chan *pq.Notification, 10) + listener.EXPECT().Listen().Return(headChannel).AnyTimes() + listener.EXPECT().Subscribe(gomock.Any(), storage.ChannelHead).Return(nil).Times(1) + listener.EXPECT().Close().Return(nil).MaxTimes(1) + + ctx, cancel := context.WithCancel(context.Background()) + + blockMock := mock.NewMockIBlock(ctrl) + dispatcher, err := bus.NewDispatcher(listenerFactory, blockMock) + require.NoError(t, err) + dispatcher.Start(ctx) + observer := dispatcher.Observe(storage.ChannelHead, storage.ChannelBlock) + + for i := 0; i < 10; i++ { + hash := make([]byte, 32) + _, err := rand.Read(hash) + require.NoError(t, err) + + blockMock.EXPECT().ByIdWithRelations(ctx, uint64(i)).Return(storage.Block{ + Id: uint64(i), + Height: types.Level(i), + Time: time.Now(), + Hash: hash, + Stats: testBlock.Stats, + }, nil).MaxTimes(1) + } + + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + var id uint64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + id++ + + headChannel <- &pq.Notification{ + Channel: storage.ChannelBlock, + Extra: strconv.FormatUint(id, 10), + } + } + } + }() + manager := ws.NewManager(observer) + manager.Start(ctx) + + server := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + e := echo.New() + c := e.NewContext(r, w) + err := manager.Handle(c) + require.NoError(t, err, "handle") + <-ctx.Done() + }, + )) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/ws" + dialed, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + require.NoError(t, err, "dial") + + body, err := json.Marshal(ws.Subscribe{ + Channel: ws.ChannelBlocks, + }) + require.NoError(t, err, "marshal subscribe") + + err = dialed.WriteJSON(ws.Message{ + Method: ws.MethodSubscribe, + Body: body, + }) + require.NoError(t, err, "send subscribe message") + + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + body, err := json.Marshal(ws.Unsubscribe{ + Channel: ws.ChannelHead, + }) + require.NoError(t, err, "marshal unsubscribe") + + err = dialed.WriteJSON(ws.Message{ + Method: ws.MethodUnsubscribe, + Body: body, + }) + require.NoError(t, err, "send unsubscribe message") + + err = dialed.Close() + require.NoError(t, err, "closing connection") + + time.Sleep(time.Second) + cancel() + + err = manager.Close() + require.NoError(t, err, "closing manager") + + close(headChannel) + return + default: + err := dialed.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + + _, msg, err := dialed.ReadMessage() + require.NoError(t, err, err) + + var notification ws.Notification[*responses.Block] + err = json.Unmarshal(msg, ¬ification) + require.NoError(t, err, err) + + require.Equal(t, ws.ChannelBlocks, notification.Channel) + require.Greater(t, notification.Body.Id, uint64(0)) + require.Greater(t, notification.Body.Height, uint64(0)) + require.False(t, notification.Body.Time.IsZero()) + require.Len(t, notification.Body.Hash, 32) + + log.Info(). + Uint64("height", notification.Body.Height). + Time("block_time", notification.Body.Time). + Msg("new block") + } + } +} diff --git a/cmd/api/init.go b/cmd/api/init.go index 2ca01a4..2005472 100644 --- a/cmd/api/init.go +++ b/cmd/api/init.go @@ -228,7 +228,7 @@ func initEcho(cfg ApiConfig, env string) *echo.Echo { var dispatcher *bus.Dispatcher func initDispatcher(ctx context.Context, db postgres.Storage) { - d, err := bus.NewDispatcher(db, db.State, db.Blocks) + d, err := bus.NewDispatcher(db, db.Blocks) if err != nil { panic(err) } diff --git a/cmd/api/markdown/websocket.md b/cmd/api/markdown/websocket.md index 1bf2d4d..53a3f4b 100644 --- a/cmd/api/markdown/websocket.md +++ b/cmd/api/markdown/websocket.md @@ -1,5 +1,16 @@ ## Documentation for websocket API +### Notification + +The structure of notification is following in all channels: + +```json +{ + "channel": "channel_name", + "body": "" // depends on channel +} +``` + ### Subscribe To receive updates from websocket API send `subscribe` request to server. @@ -29,7 +40,7 @@ Now 2 channels are supported: } ``` -In that channel messages of `responses.State` type will be sent. +Notification body of `responses.State` type will be sent to the channel. * `blocks` - receive information about new blocks. Channel does not have any filters. Subscribe message should looks like: @@ -42,7 +53,7 @@ In that channel messages of `responses.State` type will be sent. } ``` -In that channel messages of `responses.Block` type will be sent. +Notification body of `responses.Block` type will be sent to the channel. ### Unsubscribe