Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Traffic generator V2 #1055

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo
}
return 0, a.cumulativePayment, nil
}

return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available")
}

Expand Down
1 change: 1 addition & 0 deletions api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error setting payment state for accountant: %w", err)
}

return nil
}

Expand Down
18 changes: 10 additions & 8 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ group "all" {
"churner",
"dataapi",
"traffic-generator",
"traffic-generator-v2",
"controller",
"relay"
]
Expand Down Expand Up @@ -84,6 +85,7 @@ group "internal-release" {
"churner-internal",
"dataapi-internal",
"traffic-generator-internal",
"traffic-generator-v2-internal",
"controller-internal",
"relay-internal"
]
Expand Down Expand Up @@ -201,19 +203,19 @@ target "traffic-generator-internal" {
]
}

target "traffic-generator2" {
target "traffic-generator-v2" {
context = "."
dockerfile = "./trafficgenerator2.Dockerfile"
dockerfile = "./trafficgenerator-v2.Dockerfile"
target = "generator2"
tags = ["${REGISTRY}/${REPO}/traffic-generator2:${BUILD_TAG}"]
tags = ["${REGISTRY}/${REPO}/traffic-generator-v2:${BUILD_TAG}"]
}

target "traffic-generator2-internal" {
inherits = ["traffic-generator2"]
target "traffic-generator-v2-internal" {
inherits = ["traffic-generator-v2"]
tags = [
"${REGISTRY}/eigenda-traffic-generator2:${BUILD_TAG}",
"${REGISTRY}/eigenda-traffic-generator2:${GIT_SHA}",
"${REGISTRY}/eigenda-traffic-generator2:sha-${GIT_SHORT_SHA}"
"${REGISTRY}/eigenda-traffic-generator-v2:${BUILD_TAG}",
"${REGISTRY}/eigenda-traffic-generator-v2:${GIT_SHA}",
"${REGISTRY}/eigenda-traffic-generator-v2:sha-${GIT_SHORT_SHA}"
]
}

Expand Down
23 changes: 22 additions & 1 deletion tools/traffic/cmd2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/Layr-Labs/eigenda/tools/traffic"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
Expand Down Expand Up @@ -40,5 +42,24 @@ func trafficGeneratorMain(ctx *cli.Context) error {
panic(fmt.Sprintf("failed to create new traffic generator\n%s", err))
}

return generator.Start()
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

// Run the generator in a goroutine
errChan := make(chan error, 1)
go func() {
errChan <- generator.Start()
}()

// Wait for either an error or a signal
select {
case err := <-errChan:
return err
case sig := <-sigChan:
fmt.Printf("\nReceived signal %v, shutting down...\n", sig)
// Call Stop() method for graceful shutdown
generator.Stop()
return nil
}
}
41 changes: 12 additions & 29 deletions tools/traffic/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package config

import (
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/retriever"
"time"

"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/urfave/cli"
)

Expand All @@ -19,17 +17,14 @@ type Config struct {
LoggingConfig common.LoggerConfig

// Configuration for the disperser client.
DisperserClientConfig *clients.Config
DisperserClientConfig *clients.DisperserClientConfig

// Configuration for the retriever client.
RetrievalClientConfig *retriever.Config
// Signer private key
SignerPrivateKey string

// Configuration for the graph.
TheGraphConfig *thegraph.Config

// Configuration for the EigenDA client.
EigenDAClientConfig *clients.EigenDAClientConfig

// Configures the traffic generator workers.
WorkerConfig WorkerConfig

Expand All @@ -47,6 +42,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
return nil, err
}
customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name)
if len(customQuorums) == 0 {
return nil, errors.New("no custom quorum numbers provided")
}

customQuorumsUint8 := make([]uint8, len(customQuorums))
for i, q := range customQuorums {
if q < 0 || q > 255 {
Expand All @@ -55,30 +54,21 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
customQuorumsUint8[i] = uint8(q)
}

retrieverConfig := retriever.ReadRetrieverConfig(ctx)

config := &Config{
DisperserClientConfig: &clients.Config{
DisperserClientConfig: &clients.DisperserClientConfig{
Hostname: ctx.GlobalString(HostnameFlag.Name),
Port: ctx.GlobalString(GrpcPortFlag.Name),
Timeout: ctx.Duration(TimeoutFlag.Name),
UseSecureGrpcFlag: ctx.GlobalBool(UseSecureGrpcFlag.Name),
},

RetrievalClientConfig: retrieverConfig,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),

TheGraphConfig: &thegraph.Config{
Endpoint: ctx.String(TheGraphUrlFlag.Name),
PullInterval: ctx.Duration(TheGraphPullIntervalFlag.Name),
MaxRetries: ctx.Int(TheGraphRetriesFlag.Name),
},

EigenDAClientConfig: &clients.EigenDAClientConfig{
RPC: fmt.Sprintf("%s:%s", ctx.GlobalString(HostnameFlag.Name), ctx.GlobalString(GrpcPortFlag.Name)),
SignerPrivateKeyHex: ctx.String(SignerPrivateKeyFlag.Name),
DisableTLS: ctx.GlobalBool(DisableTLSFlag.Name),
},

LoggingConfig: *loggerConfig,

MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name),
Expand All @@ -103,19 +93,12 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),

EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr,
SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name),
CustomQuorums: customQuorumsUint8,
CustomQuorums: customQuorumsUint8,

MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name),
MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name),
},
}

err = config.EigenDAClientConfig.CheckAndSetDefaults()
if err != nil {
return nil, err
}

return config, nil
}
14 changes: 3 additions & 11 deletions tools/traffic/config/flags.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package config

import (
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/Layr-Labs/eigenda/retriever/flags"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -133,7 +129,7 @@ var (
TheGraphUrlFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "the-graph-url"),
Usage: "URL of the subgraph instance.",
Required: true,
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_URL"),
}
TheGraphPullIntervalFlag = cli.DurationFlag{
Expand Down Expand Up @@ -225,7 +221,6 @@ var (
var requiredFlags = []cli.Flag{
HostnameFlag,
GrpcPortFlag,
TheGraphUrlFlag,
}

var optionalFlags = []cli.Flag{
Expand All @@ -243,6 +238,7 @@ var optionalFlags = []cli.Flag{
RequiredDownloadsFlag,
DisableTLSFlag,
MetricsHTTPPortFlag,
TheGraphUrlFlag,
TheGraphPullIntervalFlag,
TheGraphRetriesFlag,
VerifierIntervalFlag,
Expand All @@ -261,10 +257,6 @@ var Flags []cli.Flag

func init() {
Flags = append(requiredFlags, optionalFlags...)
Flags = append(Flags, flags.RetrieverFlags(envPrefix)...)
Flags = append(Flags, kzg.CLIFlags(envPrefix)...)
Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...)
Flags = append(Flags, geth.EthClientFlags(envPrefix)...)
Flags = append(Flags, indexer.CLIFlags(envPrefix)...)
Flags = append(Flags, thegraph.CLIFlags(envPrefix)...)
}
3 changes: 1 addition & 2 deletions tools/traffic/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type WorkerConfig struct {

// The address of the EigenDA service manager smart contract, in hex.
EigenDAServiceManager string
// The private key to use for signing requests.
SignerPrivateKey string

// Custom quorum numbers to use for the traffic generator.
CustomQuorums []uint8

Expand Down
Loading
Loading