Skip to content

Commit

Permalink
feat: rss module config
Browse files Browse the repository at this point in the history
  • Loading branch information
brucexc committed Aug 19, 2024
1 parent b634873 commit f7e07a1
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 120 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (f *File) LoadModulesEndpoint() error {
}
}

assignEndpoint(f.Component.RSS)
assignEndpoint([]*Module{f.Component.RSS})
assignEndpoint(f.Component.Decentralized)
assignEndpoint(f.Component.Federated)

Expand All @@ -85,7 +85,7 @@ type Server struct {
}

type Component struct {
RSS []*Module `mapstructure:"rss" validate:"dive"`
RSS *Module `mapstructure:"rss"`
Federated []*Module `mapstructure:"federated" validate:"dive"`
Decentralized []*Module `mapstructure:"decentralized" validate:"dive"`
}
Expand Down
71 changes: 33 additions & 38 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ observability:
endpoint: localhost:4318
component:
rss:
- network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
network: rss
worker: rsshub
endpoint: https://rsshub.app/
parameters:
authentication:
username: user
password: pass
access_key: abc
access_code: def
decentralized:
- network: ethereum
worker: core
Expand Down Expand Up @@ -135,7 +135,7 @@ component:
}
},
"component": {
"rss": [
"rss":
{
"network": "rss",
"worker": "rsshub",
Expand All @@ -148,8 +148,7 @@ component:
"access_code": "def"
}
}
}
],
},
"decentralized": [
{
"network": "ethereum",
Expand Down Expand Up @@ -215,7 +214,7 @@ enable = true
insecure = true
endpoint = "localhost:4318"
[[component.rss]]
[component.rss]
network = "rss"
worker = "rsshub"
endpoint = "https://rsshub.app/"
Expand Down Expand Up @@ -270,21 +269,19 @@ var configFileExpected = &File{
},
},
Component: &Component{
RSS: []*Module{
{
Network: network.RSS,
EndpointID: "https://rsshub.app/",
Endpoint: Endpoint{
URL: "https://rsshub.app/",
},
Worker: rss.RSSHub,
Parameters: &Parameters{
"authentication": map[string]any{
"access_code": "def",
"access_key": "abc",
"password": "pass",
"username": "user",
},
RSS: &Module{
Network: network.RSS,
EndpointID: "https://rsshub.app/",
Endpoint: Endpoint{
URL: "https://rsshub.app/",
},
Worker: rss.RSSHub,
Parameters: &Parameters{
"authentication": map[string]any{
"access_code": "def",
"access_key": "abc",
"password": "pass",
"username": "user",
},
},
},
Expand Down Expand Up @@ -542,20 +539,18 @@ func AssertConfig(t *testing.T, expect, got *File) {
})

t.Run("decentralized", func(t *testing.T) {
for i, rss := range expect.Component.RSS {
func(_except, got *Module) {
t.Run(fmt.Sprintf("rss-%d", i), func(t *testing.T) {
t.Parallel()
assert.Equal(t, _except, got)
})
}(rss, got.Component.RSS[i])
}
func(_expect, got *Module) {
t.Run("rss", func(t *testing.T) {
t.Parallel()
assert.Equal(t, _expect, got)
})
}(expect.Component.RSS, got.Component.RSS)

for i, indexer := range got.Component.Decentralized {
func(_except, got *Module) {
func(_expect, got *Module) {
t.Run(fmt.Sprintf("%s-%s", indexer.Network, indexer.Worker), func(t *testing.T) {
t.Parallel()
AssertIndexer(t, _except, got)
AssertIndexer(t, _expect, got)
})
}(configFileExpected.Component.Decentralized[i], indexer)
}
Expand Down
16 changes: 8 additions & 8 deletions deploy/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ endpoints:

# `component` is used to split different types of networks.
component:
# `rss` network type includes workers indexing data in RSS format.
# `rss` network type includes the worker indexing data in RSS format.
rss:
- id: rss-rsshub
network: rss
worker: rsshub
endpoint: https://your.rsshub.com/
parameters:
authentication:
access_key:
id: rss-rsshub
network: rss
worker: rsshub
endpoint: https://your.rsshub.com/
parameters:
authentication:
access_key:
# `decentralized` network type includes workers indexing data from decentralized networks such as blockchain networks, Arweave, etc.
decentralized:
# Each configuration here initializes a worker.
Expand Down
16 changes: 8 additions & 8 deletions deploy/sample/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ endpoints:
url: https://rpc.rss3.io

component:
# rss:
# - id: rss-rsshub
# network: rss
# worker: rsshub
# endpoint: https://your.rsshub.com/
# parameters:
# authentication:
# access_key:
rss:
id: rss-rsshub
network: rss
worker: rsshub
endpoint: https://your.rsshub.com/
parameters:
authentication:
access_key:
decentralized:
- id: vsl-core
network: vsl
Expand Down
6 changes: 3 additions & 3 deletions internal/node/component/info/handler_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func (c *Component) buildVersion() Version {

// getNodeWorkerCoverage returns the worker coverage.
func (c *Component) getNodeWorkerCoverage() []string {
workerCoverage := make([]string, 0, len(c.config.Component.Decentralized)+len(c.config.Component.RSS)+len(c.config.Component.Federated))
workerCoverage := make([]string, 0, len(c.config.Component.Decentralized)+lo.Ternary(c.config.Component.RSS != nil, 1, 0)+len(c.config.Component.Federated))

// append all workers
for _, worker := range c.config.Component.Decentralized {
workerCoverage = append(workerCoverage, worker.Worker.Name())
}

for _, worker := range c.config.Component.RSS {
workerCoverage = append(workerCoverage, worker.Worker.Name())
if c.config.Component.RSS != nil {
workerCoverage = append(workerCoverage, c.config.Component.RSS.Worker.Name())
}

for _, worker := range c.config.Component.Federated {
Expand Down
34 changes: 16 additions & 18 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rss3-network/node/schema/worker/rss"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/rss3-network/protocol-go/schema/tag"
"github.com/samber/lo"
)

type WorkerResponse struct {
Expand All @@ -23,7 +24,7 @@ type WorkerResponse struct {

type ComponentInfo struct {
Decentralized []*WorkerInfo `json:"decentralized"`
RSS []*WorkerInfo `json:"rss"`
RSS *WorkerInfo `json:"rss"`
Federated []*WorkerInfo `json:"federated"`
}

Expand All @@ -41,7 +42,7 @@ type WorkerInfo struct {
func (c *Component) GetWorkersStatus(ctx echo.Context) error {
go c.CollectTrace(ctx.Request().Context(), ctx.Request().RequestURI, "status")

workerCount := len(c.config.Component.Decentralized) + len(c.config.Component.RSS) + len(c.config.Component.Federated)
workerCount := len(c.config.Component.Decentralized) + lo.Ternary(c.config.Component.RSS != nil, 1, 0) + len(c.config.Component.Federated)
workerInfoChan := make(chan *WorkerInfo, workerCount)

var response *WorkerResponse
Expand All @@ -52,23 +53,20 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {

// Build the worker response.
response = c.buildWorkerResponse(workerInfoChan)
} else if len(c.config.Component.RSS) > 0 {
} else if c.config.Component.RSS != nil {
m := c.config.Component.RSS

response = &WorkerResponse{
Data: ComponentInfo{
RSS: make([]*WorkerInfo, 0, len(c.config.Component.RSS)),
RSS: &WorkerInfo{
WorkerID: m.ID,
Network: m.Network,
Worker: m.Worker,
Tags: []tag.Tag{tag.RSS},
Platform: decentralized.PlatformUnknown,
Status: worker.StatusReady},
},
}

for _, m := range c.config.Component.RSS {
response.Data.RSS = append(response.Data.RSS, &WorkerInfo{
WorkerID: m.ID,
Network: m.Network,
Worker: m.Worker,
Tags: []tag.Tag{tag.RSS},
Platform: decentralized.PlatformUnknown,
Status: worker.StatusReady,
})
}
}

return ctx.JSON(http.StatusOK, response)
Expand All @@ -88,7 +86,7 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- *
}(w)
}

modules := append(append(c.config.Component.Decentralized, c.config.Component.RSS...), c.config.Component.Federated...)
modules := append(append(c.config.Component.Decentralized, c.config.Component.RSS), c.config.Component.Federated...)

for _, m := range modules {
fetchWorkerInfo(m, c.fetchWorkerInfo)
Expand All @@ -105,15 +103,15 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work
response := &WorkerResponse{
Data: ComponentInfo{
Decentralized: []*WorkerInfo{},
RSS: []*WorkerInfo{},
RSS: &WorkerInfo{},
Federated: []*WorkerInfo{},
},
}

for workerInfo := range workerInfoChan {
switch workerInfo.Network.Source() {
case network.RSSSource:
response.Data.RSS = append(response.Data.RSS, workerInfo)
response.Data.RSS = workerInfo
case network.EthereumSource, network.FarcasterSource, network.ArweaveSource:
response.Data.Decentralized = append(response.Data.Decentralized, workerInfo)
default:
Expand Down
24 changes: 10 additions & 14 deletions internal/node/component/rss/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,16 @@ func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File)
panic(err)
}

for _, conf := range config.Component.RSS {
if conf.Network == network.RSS {
c.rsshub = &configx{
id: conf.ID,
network: conf.Network,
worker: conf.Worker,
endpoint: conf.Endpoint.URL,
}

if err := c.setAccessKey(conf); err != nil {
c.rsshub = nil
}

break
if config.Component.RSS != nil && config.Component.RSS.Network == network.RSS {
c.rsshub = &configx{
id: config.Component.RSS.ID,
network: config.Component.RSS.Network,
worker: config.Component.RSS.Worker,
endpoint: config.Component.RSS.Endpoint.URL,
}

if err := c.setAccessKey(config.Component.RSS); err != nil {
c.rsshub = nil
}
}

Expand Down
3 changes: 2 additions & 1 deletion internal/node/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func NewRssClient(endpoint string, param *config.Parameters) (Client, error) {
}

// used for health checks
base.Path = path.Join(base.Path, "abc")
// https://rsshub.app/api/reference#tag/namespace
base.Path = path.Join(base.Path, "api/namespace")

option, err := rss.NewOption(param)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions internal/node/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
workerx "github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
"go.uber.org/zap"
)

Expand All @@ -34,7 +35,7 @@ type WorkerProgress struct {
func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error {
var wg sync.WaitGroup

errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.RSS)+len(m.config.Component.Federated))
errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0)+len(m.config.Component.Federated))

processWorker := func(w *config.Module, processFunc func(context.Context, *config.Module) error) {
wg.Add(1)
Expand All @@ -52,8 +53,8 @@ func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error {
processWorker(w, m.processDecentralizedWorker)
}

for _, w := range m.config.Component.RSS {
processWorker(w, m.processRSSWorker)
if m.config.Component.RSS != nil {
processWorker(m.config.Component.RSS, m.processRSSWorker)
}

go func() {
Expand Down
9 changes: 5 additions & 4 deletions internal/node/monitor/monitor_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/schema/worker/decentralized"
"github.com/samber/lo"
"go.uber.org/zap"
)

func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState CheckpointState, targetWorkerState, latestState uint64) error {
var wg sync.WaitGroup

errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.RSS))
errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0))

for _, w := range m.config.Component.Decentralized {
wg.Add(1)
Expand All @@ -28,16 +29,16 @@ func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState Chec
}(w)
}

for _, w := range m.config.Component.RSS {
if m.config.Component.RSS != nil {
wg.Add(1)

go func(w *config.Module) {
defer wg.Done()

if err := m.processRSSWorker(ctx, w); err != nil {
if err := m.processMockWorker(ctx, w, currentState, targetWorkerState, latestState); err != nil {
errChan <- err
}
}(w)
}(m.config.Component.RSS)
}

go func() {
Expand Down
Loading

0 comments on commit f7e07a1

Please sign in to comment.