From 942c09865acdbf009dc1131d61b48a9398693615 Mon Sep 17 00:00:00 2001 From: polebug Date: Wed, 25 Dec 2024 20:38:40 +0800 Subject: [PATCH 1/2] fix(monitor): modify the logic of atproto monitor --- .../node/component/info/handler_worker.go | 4 +- internal/node/monitor/client.go | 113 +++++++++++------- internal/node/monitor/monitor.go | 52 +++++--- internal/node/monitor/monitor_test.go | 56 +++++---- internal/node/monitor/server.go | 2 + 5 files changed, 144 insertions(+), 83 deletions(-) diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index cbc47012f..dbffa84c1 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -144,7 +144,7 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work response.Data.Decentralized = append(response.Data.Decentralized, workerInfo) zap.L().Debug("added decentralized worker info", zap.String("network", workerInfo.Network.String())) - case network.ActivityPubProtocol: + case network.ActivityPubProtocol, network.ATProtocol: response.Data.Federated = append(response.Data.Federated, workerInfo) zap.L().Debug("added federated worker info", zap.String("network", workerInfo.Network.String())) @@ -200,7 +200,7 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) } switch module.Network.Protocol() { - case network.ActivityPubProtocol: + case network.ActivityPubProtocol, network.ATProtocol: if federatedWorker, ok := module.Worker.(federated.Worker); ok { workerInfo.Platform = federated.ToPlatformMap[federatedWorker].String() workerInfo.Tags = federated.ToTagsMap[federatedWorker] diff --git a/internal/node/monitor/client.go b/internal/node/monitor/client.go index 15cf2ca85..be9e03e23 100644 --- a/internal/node/monitor/client.go +++ b/internal/node/monitor/client.go @@ -26,15 +26,6 @@ type Client interface { LatestState(ctx context.Context) (uint64, uint64, error) } -// activitypubClient is a client implementation for ActivityPub. -type activitypubClient struct { - httpClient httpx.Client - relayURLs []string -} - -// set a default client -var _ Client = (*activitypubClient)(nil) - // ethereumClient is a client implementation for ethereum. type ethereumClient struct { ethereumClient ethereum.Client @@ -166,43 +157,6 @@ func (c *farcasterClient) LatestState(_ context.Context) (uint64, uint64, error) return uint64(time.Now().UnixMilli()), 0, nil } -// getTargetBlockFromParam returns the target block number/height from the parameters. -func getTargetBlockFromParam(param *config.Parameters) uint64 { - if param == nil { - return 0 - } - - targetBlock, exists := (*param)["block_target"] - if !exists || targetBlock == nil { - return 0 - } - - targetBlockUint, err := convertToUint64(targetBlock) - if err != nil { - return 0 - } - - return targetBlockUint -} - -// convertToUint64 a helper func which converts the value to uint64. -func convertToUint64(value interface{}) (uint64, error) { - switch v := value.(type) { - case string: - return strconv.ParseUint(v, 10, 64) - case int: - return uint64(v), nil - case int64: - return uint64(v), nil - case uint: - return uint64(v), nil - case uint64: - return v, nil - default: - return 0, fmt.Errorf("unsupported type: %T", v) - } -} - // NewFarcasterClient returns a new farcaster client. func NewFarcasterClient() (Client, error) { return &farcasterClient{}, nil @@ -216,6 +170,15 @@ func (c *activitypubClient) TargetState(_ *config.Parameters) (uint64, uint64) { return 0, 0 } +// activitypubClient is a client implementation for ActivityPub. +type activitypubClient struct { + httpClient httpx.Client + relayURLs []string +} + +// set a default client +var _ Client = (*activitypubClient)(nil) + // LatestState checks the health of the ActivityPub connection. // Returns current timestamp if healthy, error otherwise. func (c *activitypubClient) LatestState(ctx context.Context) (uint64, uint64, error) { @@ -271,3 +234,61 @@ func NewActivityPubClient(network network.Network, param *config.Parameters) (Cl relayURLs: relayURLList, }, nil } + +// atprotoClient is a client implementation for atproto. +type atprotoClient struct{} + +// make sure client implements Client +var _ Client = (*atprotoClient)(nil) + +func (c *atprotoClient) CurrentState(state CheckpointState) (uint64, uint64) { + return uint64(state.AtprotoState.SubscribeTimestamp), 0 +} + +func (c *atprotoClient) TargetState(_ *config.Parameters) (uint64, uint64) { + return uint64(time.Now().Unix()), 0 +} + +func (c *atprotoClient) LatestState(_ context.Context) (uint64, uint64, error) { + return uint64(time.Now().Unix()), 0, nil +} + +// NewAtprotoClient returns a new atproto client. +func NewAtprotoClient() (Client, error) { return &atprotoClient{}, nil } + +// getTargetBlockFromParam returns the target block number/height from the parameters. +func getTargetBlockFromParam(param *config.Parameters) uint64 { + if param == nil { + return 0 + } + + targetBlock, exists := (*param)["block_target"] + if !exists || targetBlock == nil { + return 0 + } + + targetBlockUint, err := convertToUint64(targetBlock) + if err != nil { + return 0 + } + + return targetBlockUint +} + +// convertToUint64 a helper func which converts the value to uint64. +func convertToUint64(value interface{}) (uint64, error) { + switch v := value.(type) { + case string: + return strconv.ParseUint(v, 10, 64) + case int: + return uint64(v), nil + case int64: + return uint64(v), nil + case uint: + return uint64(v), nil + case uint64: + return v, nil + default: + return 0, fmt.Errorf("unsupported type: %T", v) + } +} diff --git a/internal/node/monitor/monitor.go b/internal/node/monitor/monitor.go index b76112a52..326addb3d 100644 --- a/internal/node/monitor/monitor.go +++ b/internal/node/monitor/monitor.go @@ -5,22 +5,29 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/rss3-network/node/config" "github.com/rss3-network/node/config/parameter" + "github.com/rss3-network/node/internal/engine/protocol/atproto" + "github.com/rss3-network/node/internal/engine/protocol/farcaster" workerx "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" "github.com/rss3-network/protocol-go/schema/network" "go.uber.org/zap" ) +type AtprotoState atproto.State + +type FarcastState farcaster.State + type CheckpointState struct { - BlockHeight uint64 `json:"block_height"` - BlockTimestamp uint64 `json:"block_timestamp"` - BlockNumber uint64 `json:"block_number"` - EventID uint64 `json:"event_id"` - CastsBackfill bool `json:"casts_backfill"` - ReactionBackfill bool `json:"reaction_backfill"` + BlockHeight uint64 `json:"block_height"` + BlockTimestamp uint64 `json:"block_timestamp"` + BlockNumber uint64 `json:"block_number"` + + AtprotoState + FarcastState } type WorkerProgress struct { @@ -109,25 +116,42 @@ func (m *Monitor) processDecentralizedWorker(ctx context.Context, w *config.Modu // processFederatedWorker processes the federated worker status. func (m *Monitor) processFederatedWorker(ctx context.Context, w *config.Module) error { // get checkpoint info from database - indexCount, _, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name()) + indexCount, workerState, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name()) if err != nil { zap.L().Error("get checkpoint info", zap.Error(err)) return err } - if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil { - return fmt.Errorf("update worker progress: %w", err) - } - client, ok := m.clients[w.Network] if !ok { return fmt.Errorf("client not exist") } - // Check health with timeout context targetStatus := workerx.StatusReady - if _, _, err := client.LatestState(ctx); err != nil { - targetStatus = workerx.StatusUnhealthy + + switch w.Network { + case network.Mastodon: + if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil { + return fmt.Errorf("update worker progress: %w", err) + } + + if _, _, err = client.LatestState(ctx); err != nil { + targetStatus = workerx.StatusUnhealthy + } + case network.Bluesky: + if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(uint64(workerState.SubscribeTimestamp), 0, uint64(time.Now().Unix()), indexCount)); err != nil { + return fmt.Errorf("update worker progress: %w", err) + } + + if workerState.SubscribeTimestamp == 0 { + targetStatus = workerx.StatusUnhealthy + } + + if time.Unix(workerState.SubscribeTimestamp, 0).Add(time.Hour * 1).Before(time.Now()) { + targetStatus = workerx.StatusIndexing + } + default: + return fmt.Errorf("unsupported network") } return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) diff --git a/internal/node/monitor/monitor_test.go b/internal/node/monitor/monitor_test.go index 2190a40ec..391aacf3b 100644 --- a/internal/node/monitor/monitor_test.go +++ b/internal/node/monitor/monitor_test.go @@ -761,9 +761,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1, initialStatus: worker.StatusReady, @@ -787,9 +789,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1, initialStatus: worker.StatusReady, @@ -813,9 +817,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, lastState: 1714972833273, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1, @@ -840,9 +846,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, lastState: 1714972833273 - 1, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1, @@ -867,9 +875,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, lastState: 1714972833273 - 1, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1, @@ -894,9 +904,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, lastState: 1714972833273 - 1, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1, @@ -921,9 +933,11 @@ func TestMonitor(t *testing.T) { }, }, currentState: monitor.CheckpointState{ - EventID: 432183841886217, - CastsBackfill: true, - ReactionBackfill: true, + FarcastState: monitor.FarcastState{ + EventID: 432183841886217, + CastsBackfill: true, + ReactionsBackfill: true, + }, }, lastState: 0, latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1, diff --git a/internal/node/monitor/server.go b/internal/node/monitor/server.go index 6050ee58f..96b0cfd6a 100644 --- a/internal/node/monitor/server.go +++ b/internal/node/monitor/server.go @@ -111,6 +111,8 @@ func initNetworkClient(m *config.Module) (Client, error) { client, err = NewEthereumClient(m.Endpoint) case network.NearProtocol: client, err = NewNearClient(m.Endpoint) + case network.ATProtocol: + client, err = NewAtprotoClient() default: return nil, fmt.Errorf("unsupported network protocol: %s", m.Network) } From 97f7231db507910e760ef1b0216296d5a451175e Mon Sep 17 00:00:00 2001 From: polebug Date: Thu, 26 Dec 2024 12:42:56 +0800 Subject: [PATCH 2/2] chore --- internal/engine/protocol/atproto/data_source.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/engine/protocol/atproto/data_source.go b/internal/engine/protocol/atproto/data_source.go index d17b5d711..08b77bd4c 100644 --- a/internal/engine/protocol/atproto/data_source.go +++ b/internal/engine/protocol/atproto/data_source.go @@ -71,8 +71,10 @@ func (s *dataSource) Start(ctx context.Context, tasksChan chan<- *engine.Tasks, func (s *dataSource) pollSubscribeRepos(ctx context.Context, tasksChan chan<- *engine.Tasks) error { uri := bluesky.BskySubscribeURI - if lo.IsEmpty(s.state.SubscribeCursor) { + if lo.IsNotEmpty(s.state.SubscribeCursor) { uri = fmt.Sprintf("%s?cursor=%d", uri, s.state.SubscribeCursor) + } else { + uri = fmt.Sprintf("%s?cursor=1", uri) } conn, _, err := websocket.DefaultDialer.Dial(uri, nil)