Skip to content

Commit

Permalink
allow multiple listen-addresses for firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Nov 18, 2024
1 parent d04a155 commit 4b8bf06
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 58 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* `firehose-grpc-listen-addr` flag now accepts comma-separated addresses (allows listening as plaintext and snakeoil-ssl at the same time or on specific ip addresses)
* removed old `RegisterServiceExtension` implementation (not used anywhere anymore)

## v1.6.6

* Bump `substreams` and `dmetering` to latest version adding the `outputModuleHash` to metering sender.
Expand Down
29 changes: 6 additions & 23 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dauth"
dgrpcserver "github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
Expand All @@ -51,21 +50,14 @@ type Config struct {
ServerOptions []server.Option `json:"-"`
}

type RegisterServiceExtensionFunc func(server dgrpcserver.Server,
mergedBlocksStore dstore.Store,
forkedBlocksStore dstore.Store, // this can be nil here
forkableHub *hub.ForkableHub,
logger *zap.Logger)

type Modules struct {
// Required dependencies
Authenticator dauth.Authenticator
HeadTimeDriftMetric *dmetrics.HeadTimeDrift
HeadBlockNumberMetric *dmetrics.HeadBlockNum
TransformRegistry *transform.Registry
RegisterServiceExtension RegisterServiceExtensionFunc
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
Authenticator dauth.Authenticator
HeadTimeDriftMetric *dmetrics.HeadTimeDrift
HeadBlockNumberMetric *dmetrics.HeadBlockNum
TransformRegistry *transform.Registry
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
}

type App struct {
Expand Down Expand Up @@ -169,15 +161,6 @@ func (a *App) Run() error {
})
firehoseServer.OnTerminated(a.Shutdown)

if a.modules.RegisterServiceExtension != nil {
a.modules.RegisterServiceExtension(
firehoseServer.Server,
mergedBlocksStore,
forkedBlocksStore,
forkableHub,
a.logger)
}

go func() {
if withLive {
a.logger.Info("waiting until hub is real-time synced")
Expand Down
104 changes: 69 additions & 35 deletions firehose/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/url"
"strings"
"sync"
"time"

_ "github.com/mostynb/go-grpc-compression/zstd"
Expand All @@ -13,7 +14,6 @@ import (
dgrpcserver "github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/dgrpc/server/factory"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dmetrics"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
Expand All @@ -37,15 +37,17 @@ type Server struct {
initFunc func(context.Context, *pbfirehoseV2.Request) context.Context
postHookFunc func(context.Context, *pbfirehoseV2.Response)

dgrpcserver.Server
listenAddr string
healthListenAddr string
logger *zap.Logger
metrics dmetrics.Set
servers []*wrappedServer
logger *zap.Logger

rateLimiter rate.Limiter
}

type wrappedServer struct {
dgrpcserver.Server
listenAddr string
}

type Option func(*Server)

func WithLeakyBucketLimiter(size int, dripRate time.Duration) Option {
Expand Down Expand Up @@ -83,48 +85,60 @@ func New(
}

tracerProvider := otel.GetTracerProvider()
options := []dgrpcserver.Option{
dgrpcserver.WithLogger(logger),
dgrpcserver.WithHealthCheck(dgrpcserver.HealthCheckOverGRPC|dgrpcserver.HealthCheckOverHTTP, createHealthCheck(isReady)),
dgrpcserver.WithPostUnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))),
dgrpcserver.WithPostStreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))),
dgrpcserver.WithGRPCServerOptions(grpc.MaxRecvMsgSize(25 * 1024 * 1024)),
dgrpcserver.WithPostUnaryInterceptor(dauthgrpc.UnaryAuthChecker(authenticator, logger)),
dgrpcserver.WithPostStreamInterceptor(dauthgrpc.StreamAuthChecker(authenticator, logger)),
}

if serviceDiscoveryURL != nil {
options = append(options, dgrpcserver.WithServiceDiscoveryURL(serviceDiscoveryURL))
}
var servers []*wrappedServer
for _, addr := range strings.Split(listenAddr, ",") {
options := []dgrpcserver.Option{
dgrpcserver.WithLogger(logger),
dgrpcserver.WithHealthCheck(dgrpcserver.HealthCheckOverGRPC|dgrpcserver.HealthCheckOverHTTP, createHealthCheck(isReady)),
dgrpcserver.WithPostUnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))),
dgrpcserver.WithPostStreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))),
dgrpcserver.WithGRPCServerOptions(grpc.MaxRecvMsgSize(25 * 1024 * 1024)),
dgrpcserver.WithPostUnaryInterceptor(dauthgrpc.UnaryAuthChecker(authenticator, logger)),
dgrpcserver.WithPostStreamInterceptor(dauthgrpc.StreamAuthChecker(authenticator, logger)),
}

if strings.Contains(listenAddr, "*") {
options = append(options, dgrpcserver.WithInsecureServer())
} else {
options = append(options, dgrpcserver.WithPlainTextServer())
}
if serviceDiscoveryURL != nil {
options = append(options, dgrpcserver.WithServiceDiscoveryURL(serviceDiscoveryURL))
}

if strings.Contains(addr, "*") {
options = append(options, dgrpcserver.WithInsecureServer())
addr = strings.ReplaceAll(addr, "*", "")
} else {
options = append(options, dgrpcserver.WithPlainTextServer())
}

srv := factory.ServerFromOptions(options...)

grpcServer := factory.ServerFromOptions(options...)
servers = append(servers, &wrappedServer{
Server: srv,
listenAddr: addr,
})

}

s := &Server{
Server: grpcServer,
servers: servers,
transformRegistry: transformRegistry,
blockGetter: blockGetter,
streamFactory: streamFactory,
listenAddr: strings.ReplaceAll(listenAddr, "*", ""),
initFunc: initFunc,
postHookFunc: postHookFunc,
logger: logger,
}

logger.Info("registering grpc services")
grpcServer.RegisterService(func(gs grpc.ServiceRegistrar) {
if blockGetter != nil {
pbfirehoseV2.RegisterFetchServer(gs, s)
}
pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer)
pbfirehoseV2.RegisterStreamServer(gs, s)
pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose
})
for _, srv := range servers {
srv.RegisterService(func(gs grpc.ServiceRegistrar) {
if blockGetter != nil {
pbfirehoseV2.RegisterFetchServer(gs, s)
}
pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer)
pbfirehoseV2.RegisterStreamServer(gs, s)
pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose
})
}

for _, opt := range opts {
opt(s)
Expand All @@ -133,8 +147,28 @@ func New(
return s
}

func (s *Server) OnTerminated(f func(error)) {
for _, server := range s.servers {
server.OnTerminated(f)
}
}

func (s *Server) Shutdown(timeout time.Duration) {
for _, server := range s.servers {
server.Shutdown(timeout)
}
}

func (s *Server) Launch() {
s.Server.Launch(s.listenAddr)
wg := sync.WaitGroup{}
for _, server := range s.servers {
wg.Add(1)
go func() {
server.Launch(server.listenAddr)
wg.Done()
}()
}
wg.Wait()
}

func createHealthCheck(isReady func(ctx context.Context) bool) dgrpcserver.HealthCheck {
Expand Down

0 comments on commit 4b8bf06

Please sign in to comment.