Skip to content

Commit

Permalink
Use flow-go Components for composing
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 27, 2024
1 parent 1d626c7 commit 33d6278
Show file tree
Hide file tree
Showing 14 changed files with 2,555 additions and 1,410 deletions.
80 changes: 60 additions & 20 deletions api/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,94 @@ import (
"net/http"
_ "net/http/pprof"
"strconv"
"time"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

"github.com/rs/zerolog"
)

type ProfileServer struct {
logger zerolog.Logger
log zerolog.Logger
server *http.Server
endpoint string

startupCompleted chan struct{}
}

var _ component.Component = (*ProfileServer)(nil)

func NewProfileServer(
logger zerolog.Logger,
host string,
port int,
) *ProfileServer {
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
return &ProfileServer{
logger: logger,
server: &http.Server{Addr: endpoint},
endpoint: endpoint,
log: logger,
server: &http.Server{Addr: endpoint},
endpoint: endpoint,
startupCompleted: make(chan struct{}),
}
}

func (s *ProfileServer) ListenAddr() string {
return s.endpoint
}
func (s *ProfileServer) Start(ctx irrecoverable.SignalerContext) {
defer close(s.startupCompleted)

s.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}

func (s *ProfileServer) Start() {
go func() {
err := s.server.ListenAndServe()
if err != nil {
s.log.Info().Msgf("Profiler server started: %s", s.endpoint)

if err := s.server.ListenAndServe(); err != nil {
// http.ErrServerClosed is returned when Close or Shutdown is called
// we don't consider this an error, so print this with debug level instead
if errors.Is(err, http.ErrServerClosed) {
s.logger.Warn().Msg("Profiler server shutdown")
return
s.log.Debug().Err(err).Msg("Profiler server shutdown")
} else {
s.log.Err(err).Msg("error running profiler server")
}
s.logger.Err(err).Msg("failed to start Profiler server")
panic(err)
}
}()
}

func (s *ProfileServer) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
func (s *ProfileServer) Ready() <-chan struct{} {
ready := make(chan struct{})

go func() {
<-s.startupCompleted
close(ready)
}()

return s.server.Shutdown(ctx)
return ready
}

func (s *ProfileServer) Close() error {
return s.server.Close()
func (s *ProfileServer) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-s.startupCompleted
defer close(done)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := s.server.Shutdown(ctx)
if err == nil {
s.log.Info().Msg("Profiler server graceful shutdown completed")
}

if errors.Is(err, ctx.Err()) {
s.log.Warn().Msg("Profiler server graceful shutdown timed out")
err := s.server.Close()
if err != nil {
s.log.Err(err).Msg("error closing profiler server")
}
} else {
s.log.Err(err).Msg("error shutting down profiler server")
}
}()
return done
}
112 changes: 72 additions & 40 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"strings"
"time"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

gethVM "github.com/onflow/go-ethereum/core/vm"
gethLog "github.com/onflow/go-ethereum/log"
"github.com/onflow/go-ethereum/rpc"
Expand Down Expand Up @@ -55,8 +58,12 @@ type Server struct {

config config.Config
collector metrics.Collector

startupCompleted chan struct{}
}

var _ component.Component = (*Server)(nil)

const (
shutdownTimeout = 5 * time.Second
batchRequestLimit = 50
Expand All @@ -77,10 +84,11 @@ func NewServer(
gethLog.SetDefault(gethLog.NewLogger(zeroSlog))

return &Server{
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
startupCompleted: make(chan struct{}),
}
}

Expand Down Expand Up @@ -177,9 +185,10 @@ func (h *Server) disableWS() bool {
}

// Start starts the HTTP server if it is enabled and not already running.
func (h *Server) Start() error {
func (h *Server) Start(ctx irrecoverable.SignalerContext) {
defer close(h.startupCompleted)
if h.endpoint == "" || h.listener != nil {
return nil // already running or not configured
return // already running or not configured
}

// Initialize the server.
Expand All @@ -190,16 +199,21 @@ func (h *Server) Start() error {
h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout
h.server.WriteTimeout = h.timeouts.WriteTimeout
h.server.IdleTimeout = h.timeouts.IdleTimeout
h.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}
}

listenConfig := net.ListenConfig{}
// Start the server.
listener, err := net.Listen("tcp", h.endpoint)
listener, err := listenConfig.Listen(ctx, "tcp", h.endpoint)
if err != nil {
// If the server fails to start, we need to clear out the RPC and WS
// configurations so they can be configured another time.
h.disableRPC()
h.disableWS()
return err
ctx.Throw(err)
return
}

h.listener = listener
Expand All @@ -211,7 +225,7 @@ func (h *Server) Start() error {
return
}
h.logger.Err(err).Msg("failed to start API server")
panic(err)
ctx.Throw(err)
}
}()

Expand All @@ -223,8 +237,17 @@ func (h *Server) Start() error {
url := fmt.Sprintf("ws://%v", listener.Addr())
h.logger.Info().Msgf("JSON-RPC over WebSocket enabled: %s", url)
}
}

return nil
func (h *Server) Ready() <-chan struct{} {
ready := make(chan struct{})

go func() {
<-h.startupCompleted
close(ready)
}()

return ready
}

// disableRPC stops the JSON-RPC over HTTP handler.
Expand Down Expand Up @@ -294,41 +317,50 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}

// Stop shuts down the HTTP server.
func (h *Server) Stop() {
if h.listener == nil {
return // not running
}
// Done shuts down the HTTP server.
func (h *Server) Done() <-chan struct{} {
done := make(chan struct{})

// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}
go func() {
defer close(done)

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}
if h.listener == nil {
return // not running
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}
// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}

h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)
h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil

}()

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil
return done
}

// CheckTimeouts ensures that timeout values are meaningful
Expand Down
14 changes: 3 additions & 11 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func newSubscription[T any](

rpcSub := notifier.CreateSubscription()

subs := models.NewSubscription(logger, callback(notifier, rpcSub))
subs := models.NewSubscription(callback(notifier, rpcSub))

l := logger.With().
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)).
Expand All @@ -190,16 +190,8 @@ func newSubscription[T any](
go func() {
defer publisher.Unsubscribe(subs)

for {
select {
case err := <-subs.Error():
l.Debug().Err(err).Msg("subscription returned error")
return
case err := <-rpcSub.Err():
l.Debug().Err(err).Msg("client unsubscribed")
return
}
}
err := <-rpcSub.Err()
l.Debug().Err(err).Msg("client unsubscribed")
}()

l.Info().Msg("new heads subscription created")
Expand Down
Loading

0 comments on commit 33d6278

Please sign in to comment.