Skip to content

Commit

Permalink
fix(monitor): modify the logic of atproto monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
polebug committed Dec 25, 2024
1 parent 4204580 commit 942c098
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 83 deletions.
4 changes: 2 additions & 2 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work
response.Data.Decentralized = append(response.Data.Decentralized, workerInfo)
zap.L().Debug("added decentralized worker info",
zap.String("network", workerInfo.Network.String()))
case network.ActivityPubProtocol:
case network.ActivityPubProtocol, network.ATProtocol:
response.Data.Federated = append(response.Data.Federated, workerInfo)
zap.L().Debug("added federated worker info",
zap.String("network", workerInfo.Network.String()))
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module)
}

switch module.Network.Protocol() {
case network.ActivityPubProtocol:
case network.ActivityPubProtocol, network.ATProtocol:
if federatedWorker, ok := module.Worker.(federated.Worker); ok {
workerInfo.Platform = federated.ToPlatformMap[federatedWorker].String()
workerInfo.Tags = federated.ToTagsMap[federatedWorker]
Expand Down
113 changes: 67 additions & 46 deletions internal/node/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ type Client interface {
LatestState(ctx context.Context) (uint64, uint64, error)
}

// activitypubClient is a client implementation for ActivityPub.
type activitypubClient struct {
httpClient httpx.Client
relayURLs []string
}

// set a default client
var _ Client = (*activitypubClient)(nil)

// ethereumClient is a client implementation for ethereum.
type ethereumClient struct {
ethereumClient ethereum.Client
Expand Down Expand Up @@ -166,43 +157,6 @@ func (c *farcasterClient) LatestState(_ context.Context) (uint64, uint64, error)
return uint64(time.Now().UnixMilli()), 0, nil
}

// getTargetBlockFromParam returns the target block number/height from the parameters.
func getTargetBlockFromParam(param *config.Parameters) uint64 {
if param == nil {
return 0
}

targetBlock, exists := (*param)["block_target"]
if !exists || targetBlock == nil {
return 0
}

targetBlockUint, err := convertToUint64(targetBlock)
if err != nil {
return 0
}

return targetBlockUint
}

// convertToUint64 a helper func which converts the value to uint64.
func convertToUint64(value interface{}) (uint64, error) {
switch v := value.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int:
return uint64(v), nil
case int64:
return uint64(v), nil
case uint:
return uint64(v), nil
case uint64:
return v, nil
default:
return 0, fmt.Errorf("unsupported type: %T", v)
}
}

// NewFarcasterClient returns a new farcaster client.
func NewFarcasterClient() (Client, error) {
return &farcasterClient{}, nil
Expand All @@ -216,6 +170,15 @@ func (c *activitypubClient) TargetState(_ *config.Parameters) (uint64, uint64) {
return 0, 0
}

// activitypubClient is a client implementation for ActivityPub.
type activitypubClient struct {
httpClient httpx.Client
relayURLs []string
}

// set a default client
var _ Client = (*activitypubClient)(nil)

// LatestState checks the health of the ActivityPub connection.
// Returns current timestamp if healthy, error otherwise.
func (c *activitypubClient) LatestState(ctx context.Context) (uint64, uint64, error) {
Expand Down Expand Up @@ -271,3 +234,61 @@ func NewActivityPubClient(network network.Network, param *config.Parameters) (Cl
relayURLs: relayURLList,
}, nil
}

// atprotoClient is a client implementation for atproto.
type atprotoClient struct{}

// make sure client implements Client
var _ Client = (*atprotoClient)(nil)

func (c *atprotoClient) CurrentState(state CheckpointState) (uint64, uint64) {
return uint64(state.AtprotoState.SubscribeTimestamp), 0
}

func (c *atprotoClient) TargetState(_ *config.Parameters) (uint64, uint64) {
return uint64(time.Now().Unix()), 0
}

func (c *atprotoClient) LatestState(_ context.Context) (uint64, uint64, error) {
return uint64(time.Now().Unix()), 0, nil
}

// NewAtprotoClient returns a new atproto client.
func NewAtprotoClient() (Client, error) { return &atprotoClient{}, nil }

// getTargetBlockFromParam returns the target block number/height from the parameters.
func getTargetBlockFromParam(param *config.Parameters) uint64 {
if param == nil {
return 0
}

targetBlock, exists := (*param)["block_target"]
if !exists || targetBlock == nil {
return 0
}

targetBlockUint, err := convertToUint64(targetBlock)
if err != nil {
return 0
}

return targetBlockUint
}

// convertToUint64 a helper func which converts the value to uint64.
func convertToUint64(value interface{}) (uint64, error) {
switch v := value.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int:
return uint64(v), nil
case int64:
return uint64(v), nil
case uint:
return uint64(v), nil
case uint64:
return v, nil
default:
return 0, fmt.Errorf("unsupported type: %T", v)
}
}
52 changes: 38 additions & 14 deletions internal/node/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/rss3-network/node/config"
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/internal/engine/protocol/atproto"
"github.com/rss3-network/node/internal/engine/protocol/farcaster"
workerx "github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/protocol-go/schema/network"
"go.uber.org/zap"
)

type AtprotoState atproto.State

type FarcastState farcaster.State

type CheckpointState struct {
BlockHeight uint64 `json:"block_height"`
BlockTimestamp uint64 `json:"block_timestamp"`
BlockNumber uint64 `json:"block_number"`
EventID uint64 `json:"event_id"`
CastsBackfill bool `json:"casts_backfill"`
ReactionBackfill bool `json:"reaction_backfill"`
BlockHeight uint64 `json:"block_height"`
BlockTimestamp uint64 `json:"block_timestamp"`
BlockNumber uint64 `json:"block_number"`

AtprotoState
FarcastState
}

type WorkerProgress struct {
Expand Down Expand Up @@ -109,25 +116,42 @@ func (m *Monitor) processDecentralizedWorker(ctx context.Context, w *config.Modu
// processFederatedWorker processes the federated worker status.
func (m *Monitor) processFederatedWorker(ctx context.Context, w *config.Module) error {
// get checkpoint info from database
indexCount, _, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name())
indexCount, workerState, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name())
if err != nil {
zap.L().Error("get checkpoint info", zap.Error(err))
return err
}

if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

client, ok := m.clients[w.Network]
if !ok {
return fmt.Errorf("client not exist")
}

// Check health with timeout context
targetStatus := workerx.StatusReady
if _, _, err := client.LatestState(ctx); err != nil {
targetStatus = workerx.StatusUnhealthy

switch w.Network {
case network.Mastodon:
if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

if _, _, err = client.LatestState(ctx); err != nil {
targetStatus = workerx.StatusUnhealthy
}
case network.Bluesky:
if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(uint64(workerState.SubscribeTimestamp), 0, uint64(time.Now().Unix()), indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

if workerState.SubscribeTimestamp == 0 {
targetStatus = workerx.StatusUnhealthy
}

if time.Unix(workerState.SubscribeTimestamp, 0).Add(time.Hour * 1).Before(time.Now()) {
targetStatus = workerx.StatusIndexing
}
default:
return fmt.Errorf("unsupported network")
}

return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String())
Expand Down
56 changes: 35 additions & 21 deletions internal/node/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
initialStatus: worker.StatusReady,
Expand All @@ -787,9 +789,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
initialStatus: worker.StatusReady,
Expand All @@ -813,9 +817,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
Expand All @@ -840,9 +846,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand All @@ -867,9 +875,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand All @@ -894,9 +904,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
Expand All @@ -921,9 +933,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 0,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand Down
2 changes: 2 additions & 0 deletions internal/node/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func initNetworkClient(m *config.Module) (Client, error) {
client, err = NewEthereumClient(m.Endpoint)
case network.NearProtocol:
client, err = NewNearClient(m.Endpoint)
case network.ATProtocol:
client, err = NewAtprotoClient()
default:
return nil, fmt.Errorf("unsupported network protocol: %s", m.Network)
}
Expand Down

0 comments on commit 942c098

Please sign in to comment.