diff --git a/CHANGELOG.md b/CHANGELOG.md index e75e46e..18225ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* `firehose-grpc-listen-addr` flag now accepts comma-separated addresses (allows listening as plaintext and snakeoil-ssl at the same time or on specific ip addresses) +* removed old `RegisterServiceExtension` implementation (not used anywhere anymore) + ## v1.6.6 * Bump `substreams` and `dmetering` to latest version adding the `outputModuleHash` to metering sender. diff --git a/firehose/app/firehose/app.go b/firehose/app/firehose/app.go index 7474a78..20dae47 100644 --- a/firehose/app/firehose/app.go +++ b/firehose/app/firehose/app.go @@ -26,7 +26,6 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/bstream/transform" "github.com/streamingfast/dauth" - dgrpcserver "github.com/streamingfast/dgrpc/server" "github.com/streamingfast/dmetrics" "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" @@ -51,21 +50,14 @@ type Config struct { ServerOptions []server.Option `json:"-"` } -type RegisterServiceExtensionFunc func(server dgrpcserver.Server, - mergedBlocksStore dstore.Store, - forkedBlocksStore dstore.Store, // this can be nil here - forkableHub *hub.ForkableHub, - logger *zap.Logger) - type Modules struct { // Required dependencies - Authenticator dauth.Authenticator - HeadTimeDriftMetric *dmetrics.HeadTimeDrift - HeadBlockNumberMetric *dmetrics.HeadBlockNum - TransformRegistry *transform.Registry - RegisterServiceExtension RegisterServiceExtensionFunc - CheckPendingShutdown func() bool - InfoServer *info.InfoServer + Authenticator dauth.Authenticator + HeadTimeDriftMetric *dmetrics.HeadTimeDrift + HeadBlockNumberMetric *dmetrics.HeadBlockNum + TransformRegistry *transform.Registry + CheckPendingShutdown func() bool + InfoServer *info.InfoServer } type App struct { @@ -169,15 +161,6 @@ func (a *App) Run() error { }) firehoseServer.OnTerminated(a.Shutdown) - if a.modules.RegisterServiceExtension != nil { - a.modules.RegisterServiceExtension( - firehoseServer.Server, - mergedBlocksStore, - forkedBlocksStore, - forkableHub, - a.logger) - } - go func() { if withLive { a.logger.Info("waiting until hub is real-time synced") diff --git a/firehose/server/server.go b/firehose/server/server.go index 66acb03..22a3291 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -4,6 +4,7 @@ import ( "context" "net/url" "strings" + "sync" "time" _ "github.com/mostynb/go-grpc-compression/zstd" @@ -13,7 +14,6 @@ import ( dgrpcserver "github.com/streamingfast/dgrpc/server" "github.com/streamingfast/dgrpc/server/factory" "github.com/streamingfast/dmetering" - "github.com/streamingfast/dmetrics" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" "github.com/streamingfast/firehose-core/firehose/info" @@ -37,15 +37,17 @@ type Server struct { initFunc func(context.Context, *pbfirehoseV2.Request) context.Context postHookFunc func(context.Context, *pbfirehoseV2.Response) - dgrpcserver.Server - listenAddr string - healthListenAddr string - logger *zap.Logger - metrics dmetrics.Set + servers []*wrappedServer + logger *zap.Logger rateLimiter rate.Limiter } +type wrappedServer struct { + dgrpcserver.Server + listenAddr string +} + type Option func(*Server) func WithLeakyBucketLimiter(size int, dripRate time.Duration) Option { @@ -83,48 +85,60 @@ func New( } tracerProvider := otel.GetTracerProvider() - options := []dgrpcserver.Option{ - dgrpcserver.WithLogger(logger), - dgrpcserver.WithHealthCheck(dgrpcserver.HealthCheckOverGRPC|dgrpcserver.HealthCheckOverHTTP, createHealthCheck(isReady)), - dgrpcserver.WithPostUnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))), - dgrpcserver.WithPostStreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))), - dgrpcserver.WithGRPCServerOptions(grpc.MaxRecvMsgSize(25 * 1024 * 1024)), - dgrpcserver.WithPostUnaryInterceptor(dauthgrpc.UnaryAuthChecker(authenticator, logger)), - dgrpcserver.WithPostStreamInterceptor(dauthgrpc.StreamAuthChecker(authenticator, logger)), - } - if serviceDiscoveryURL != nil { - options = append(options, dgrpcserver.WithServiceDiscoveryURL(serviceDiscoveryURL)) - } + var servers []*wrappedServer + for _, addr := range strings.Split(listenAddr, ",") { + options := []dgrpcserver.Option{ + dgrpcserver.WithLogger(logger), + dgrpcserver.WithHealthCheck(dgrpcserver.HealthCheckOverGRPC|dgrpcserver.HealthCheckOverHTTP, createHealthCheck(isReady)), + dgrpcserver.WithPostUnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))), + dgrpcserver.WithPostStreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(tracerProvider))), + dgrpcserver.WithGRPCServerOptions(grpc.MaxRecvMsgSize(25 * 1024 * 1024)), + dgrpcserver.WithPostUnaryInterceptor(dauthgrpc.UnaryAuthChecker(authenticator, logger)), + dgrpcserver.WithPostStreamInterceptor(dauthgrpc.StreamAuthChecker(authenticator, logger)), + } - if strings.Contains(listenAddr, "*") { - options = append(options, dgrpcserver.WithInsecureServer()) - } else { - options = append(options, dgrpcserver.WithPlainTextServer()) - } + if serviceDiscoveryURL != nil { + options = append(options, dgrpcserver.WithServiceDiscoveryURL(serviceDiscoveryURL)) + } + + if strings.Contains(addr, "*") { + options = append(options, dgrpcserver.WithInsecureServer()) + addr = strings.ReplaceAll(addr, "*", "") + } else { + options = append(options, dgrpcserver.WithPlainTextServer()) + } + + srv := factory.ServerFromOptions(options...) - grpcServer := factory.ServerFromOptions(options...) + servers = append(servers, &wrappedServer{ + Server: srv, + listenAddr: addr, + }) + + } s := &Server{ - Server: grpcServer, + servers: servers, transformRegistry: transformRegistry, blockGetter: blockGetter, streamFactory: streamFactory, - listenAddr: strings.ReplaceAll(listenAddr, "*", ""), initFunc: initFunc, postHookFunc: postHookFunc, logger: logger, } logger.Info("registering grpc services") - grpcServer.RegisterService(func(gs grpc.ServiceRegistrar) { - if blockGetter != nil { - pbfirehoseV2.RegisterFetchServer(gs, s) - } - pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer) - pbfirehoseV2.RegisterStreamServer(gs, s) - pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose - }) + for _, srv := range servers { + srv.RegisterService(func(gs grpc.ServiceRegistrar) { + if blockGetter != nil { + pbfirehoseV2.RegisterFetchServer(gs, s) + } + pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer) + pbfirehoseV2.RegisterStreamServer(gs, s) + pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose + }) + } for _, opt := range opts { opt(s) @@ -133,8 +147,28 @@ func New( return s } +func (s *Server) OnTerminated(f func(error)) { + for _, server := range s.servers { + server.OnTerminated(f) + } +} + +func (s *Server) Shutdown(timeout time.Duration) { + for _, server := range s.servers { + server.Shutdown(timeout) + } +} + func (s *Server) Launch() { - s.Server.Launch(s.listenAddr) + wg := sync.WaitGroup{} + for _, server := range s.servers { + wg.Add(1) + go func() { + server.Launch(server.listenAddr) + wg.Done() + }() + } + wg.Wait() } func createHealthCheck(isReady func(ctx context.Context) bool) dgrpcserver.HealthCheck {