Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(monitor): modify the logic of atproto monitor #687

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/engine/protocol/atproto/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ func (s *dataSource) Start(ctx context.Context, tasksChan chan<- *engine.Tasks,
func (s *dataSource) pollSubscribeRepos(ctx context.Context, tasksChan chan<- *engine.Tasks) error {
uri := bluesky.BskySubscribeURI

if lo.IsEmpty(s.state.SubscribeCursor) {
if lo.IsNotEmpty(s.state.SubscribeCursor) {
uri = fmt.Sprintf("%s?cursor=%d", uri, s.state.SubscribeCursor)
} else {
uri = fmt.Sprintf("%s?cursor=1", uri)
}

conn, _, err := websocket.DefaultDialer.Dial(uri, nil)
Expand Down
4 changes: 2 additions & 2 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work
response.Data.Decentralized = append(response.Data.Decentralized, workerInfo)
zap.L().Debug("added decentralized worker info",
zap.String("network", workerInfo.Network.String()))
case network.ActivityPubProtocol:
case network.ActivityPubProtocol, network.ATProtocol:
response.Data.Federated = append(response.Data.Federated, workerInfo)
zap.L().Debug("added federated worker info",
zap.String("network", workerInfo.Network.String()))
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module)
}

switch module.Network.Protocol() {
case network.ActivityPubProtocol:
case network.ActivityPubProtocol, network.ATProtocol:
if federatedWorker, ok := module.Worker.(federated.Worker); ok {
workerInfo.Platform = federated.ToPlatformMap[federatedWorker].String()
workerInfo.Tags = federated.ToTagsMap[federatedWorker]
Expand Down
113 changes: 67 additions & 46 deletions internal/node/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ type Client interface {
LatestState(ctx context.Context) (uint64, uint64, error)
}

// activitypubClient is a client implementation for ActivityPub.
type activitypubClient struct {
httpClient httpx.Client
relayURLs []string
}

// set a default client
var _ Client = (*activitypubClient)(nil)

// ethereumClient is a client implementation for ethereum.
type ethereumClient struct {
ethereumClient ethereum.Client
Expand Down Expand Up @@ -166,43 +157,6 @@ func (c *farcasterClient) LatestState(_ context.Context) (uint64, uint64, error)
return uint64(time.Now().UnixMilli()), 0, nil
}

// getTargetBlockFromParam returns the target block number/height from the parameters.
func getTargetBlockFromParam(param *config.Parameters) uint64 {
if param == nil {
return 0
}

targetBlock, exists := (*param)["block_target"]
if !exists || targetBlock == nil {
return 0
}

targetBlockUint, err := convertToUint64(targetBlock)
if err != nil {
return 0
}

return targetBlockUint
}

// convertToUint64 a helper func which converts the value to uint64.
func convertToUint64(value interface{}) (uint64, error) {
switch v := value.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int:
return uint64(v), nil
case int64:
return uint64(v), nil
case uint:
return uint64(v), nil
case uint64:
return v, nil
default:
return 0, fmt.Errorf("unsupported type: %T", v)
}
}

// NewFarcasterClient returns a new farcaster client.
func NewFarcasterClient() (Client, error) {
return &farcasterClient{}, nil
Expand All @@ -216,6 +170,15 @@ func (c *activitypubClient) TargetState(_ *config.Parameters) (uint64, uint64) {
return 0, 0
}

// activitypubClient is a client implementation for ActivityPub.
type activitypubClient struct {
httpClient httpx.Client
relayURLs []string
}

// set a default client
var _ Client = (*activitypubClient)(nil)

// LatestState checks the health of the ActivityPub connection.
// Returns current timestamp if healthy, error otherwise.
func (c *activitypubClient) LatestState(ctx context.Context) (uint64, uint64, error) {
Expand Down Expand Up @@ -271,3 +234,61 @@ func NewActivityPubClient(network network.Network, param *config.Parameters) (Cl
relayURLs: relayURLList,
}, nil
}

// atprotoClient is a client implementation for atproto.
type atprotoClient struct{}

// make sure client implements Client
var _ Client = (*atprotoClient)(nil)

func (c *atprotoClient) CurrentState(state CheckpointState) (uint64, uint64) {
return uint64(state.AtprotoState.SubscribeTimestamp), 0
}

func (c *atprotoClient) TargetState(_ *config.Parameters) (uint64, uint64) {
return uint64(time.Now().Unix()), 0
}

func (c *atprotoClient) LatestState(_ context.Context) (uint64, uint64, error) {
return uint64(time.Now().Unix()), 0, nil
}

// NewAtprotoClient returns a new atproto client.
func NewAtprotoClient() (Client, error) { return &atprotoClient{}, nil }

// getTargetBlockFromParam returns the target block number/height from the parameters.
func getTargetBlockFromParam(param *config.Parameters) uint64 {
if param == nil {
return 0
}

targetBlock, exists := (*param)["block_target"]
if !exists || targetBlock == nil {
return 0
}

targetBlockUint, err := convertToUint64(targetBlock)
if err != nil {
return 0
}

return targetBlockUint
}

// convertToUint64 a helper func which converts the value to uint64.
func convertToUint64(value interface{}) (uint64, error) {
switch v := value.(type) {
case string:
return strconv.ParseUint(v, 10, 64)
case int:
return uint64(v), nil
case int64:
return uint64(v), nil
case uint:
return uint64(v), nil
case uint64:
return v, nil
default:
return 0, fmt.Errorf("unsupported type: %T", v)
}
}
52 changes: 38 additions & 14 deletions internal/node/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/rss3-network/node/config"
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/internal/engine/protocol/atproto"
"github.com/rss3-network/node/internal/engine/protocol/farcaster"
workerx "github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/protocol-go/schema/network"
"go.uber.org/zap"
)

type AtprotoState atproto.State

type FarcastState farcaster.State

type CheckpointState struct {
BlockHeight uint64 `json:"block_height"`
BlockTimestamp uint64 `json:"block_timestamp"`
BlockNumber uint64 `json:"block_number"`
EventID uint64 `json:"event_id"`
CastsBackfill bool `json:"casts_backfill"`
ReactionBackfill bool `json:"reaction_backfill"`
BlockHeight uint64 `json:"block_height"`
BlockTimestamp uint64 `json:"block_timestamp"`
BlockNumber uint64 `json:"block_number"`

AtprotoState
FarcastState
}

type WorkerProgress struct {
Expand Down Expand Up @@ -109,25 +116,42 @@ func (m *Monitor) processDecentralizedWorker(ctx context.Context, w *config.Modu
// processFederatedWorker processes the federated worker status.
func (m *Monitor) processFederatedWorker(ctx context.Context, w *config.Module) error {
// get checkpoint info from database
indexCount, _, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name())
indexCount, workerState, err := m.getCheckpointState(ctx, w.ID, w.Network, w.Worker.Name())
if err != nil {
zap.L().Error("get checkpoint info", zap.Error(err))
return err
}

if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

client, ok := m.clients[w.Network]
if !ok {
return fmt.Errorf("client not exist")
}

// Check health with timeout context
targetStatus := workerx.StatusReady
if _, _, err := client.LatestState(ctx); err != nil {
targetStatus = workerx.StatusUnhealthy

switch w.Network {
case network.Mastodon:
if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(0, 0, 0, indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

if _, _, err = client.LatestState(ctx); err != nil {
targetStatus = workerx.StatusUnhealthy
}
case network.Bluesky:
if err = m.UpdateWorkerProgress(ctx, w.ID, ConstructWorkerProgress(uint64(workerState.SubscribeTimestamp), 0, uint64(time.Now().Unix()), indexCount)); err != nil {
return fmt.Errorf("update worker progress: %w", err)
}

if workerState.SubscribeTimestamp == 0 {
targetStatus = workerx.StatusUnhealthy
}

if time.Unix(workerState.SubscribeTimestamp, 0).Add(time.Hour * 1).Before(time.Now()) {
targetStatus = workerx.StatusIndexing
}
default:
return fmt.Errorf("unsupported network")
}

return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String())
Expand Down
56 changes: 35 additions & 21 deletions internal/node/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
initialStatus: worker.StatusReady,
Expand All @@ -787,9 +789,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
initialStatus: worker.StatusReady,
Expand All @@ -813,9 +817,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
Expand All @@ -840,9 +846,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand All @@ -867,9 +875,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand All @@ -894,9 +904,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 1714972833273 - 1,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] - 1,
Expand All @@ -921,9 +933,11 @@ func TestMonitor(t *testing.T) {
},
},
currentState: monitor.CheckpointState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionBackfill: true,
FarcastState: monitor.FarcastState{
EventID: 432183841886217,
CastsBackfill: true,
ReactionsBackfill: true,
},
},
lastState: 0,
latestState: 1714972833273 + parameter.CurrentNetworkTolerance[network.Farcaster] + 1,
Expand Down
2 changes: 2 additions & 0 deletions internal/node/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func initNetworkClient(m *config.Module) (Client, error) {
client, err = NewEthereumClient(m.Endpoint)
case network.NearProtocol:
client, err = NewNearClient(m.Endpoint)
case network.ATProtocol:
client, err = NewAtprotoClient()
default:
return nil, fmt.Errorf("unsupported network protocol: %s", m.Network)
}
Expand Down
Loading