From 586b0eb180afc22d06d673756dd916c17a173361 Mon Sep 17 00:00:00 2001 From: Ainar Garipov Date: Tue, 12 Nov 2024 19:58:56 +0300 Subject: [PATCH] next: upd more --- internal/next/cmd/cmd.go | 30 ++++++++++++++---------------- internal/next/cmd/signal.go | 9 ++++++--- internal/next/websvc/server.go | 17 +++++++++-------- internal/next/websvc/websvc.go | 28 ++++++++++++++++++++++------ 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/internal/next/cmd/cmd.go b/internal/next/cmd/cmd.go index f38dcc38..3bab1396 100644 --- a/internal/next/cmd/cmd.go +++ b/internal/next/cmd/cmd.go @@ -49,6 +49,9 @@ func Main(embeddedFrontend fs.FS) { frontend, err := frontendFromOpts(ctx, baseLogger, opts, embeddedFrontend) errors.Check(err) + startCtx, startCancel := context.WithTimeout(ctx, defaultTimeoutStart) + defer startCancel() + confMgrConf := &configmgr.Config{ BaseLogger: baseLogger, Logger: baseLogger.With(slogutil.KeyPrefix, "configmgr"), @@ -58,15 +61,15 @@ func Main(embeddedFrontend fs.FS) { FileName: opts.confFile, } - confMgr, err := newConfigMgr(confMgrConf) + confMgr, err := configmgr.New(startCtx, confMgrConf) errors.Check(err) web := confMgr.Web() - err = web.Start(ctx) + err = web.Start(startCtx) errors.Check(err) dns := confMgr.DNS() - err = dns.Start(ctx) + err = dns.Start(startCtx) errors.Check(err) sigHdlr := newSignalHandler( @@ -80,21 +83,16 @@ func Main(embeddedFrontend fs.FS) { os.Exit(sigHdlr.handle(ctx)) } -// defaultTimeout is the timeout used for some operations where another timeout -// hasn't been defined yet. -const defaultTimeout = 5 * time.Second - -// ctxWithDefaultTimeout is a helper function that returns a context with -// timeout set to defaultTimeout. -func ctxWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc) { - return context.WithTimeout(context.Background(), defaultTimeout) -} +// Default timeouts. +// +// TODO(a.garipov): Make configurable. +const ( + defaultTimeoutStart = 1 * time.Minute + defaultTimeoutShutdown = 5 * time.Second +) // newConfigMgr returns a new configuration manager using defaultTimeout as the // context timeout. -func newConfigMgr(c *configmgr.Config) (m *configmgr.Manager, err error) { - ctx, cancel := ctxWithDefaultTimeout() - defer cancel() - +func newConfigMgr(ctx context.Context, c *configmgr.Config) (m *configmgr.Manager, err error) { return configmgr.New(ctx, c) } diff --git a/internal/next/cmd/signal.go b/internal/next/cmd/signal.go index 74928e61..d6aa7dc5 100644 --- a/internal/next/cmd/signal.go +++ b/internal/next/cmd/signal.go @@ -109,7 +109,10 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) { var errs []error - confMgr, err := newConfigMgr(h.confMgrConf) + ctx, cancel := context.WithTimeout(ctx, defaultTimeoutStart) + defer cancel() + + confMgr, err := newConfigMgr(ctx, h.confMgrConf) if err != nil { errs = append(errs, fmt.Errorf("configuration manager: %w", err)) } @@ -142,7 +145,7 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) { // shutdown gracefully shuts down all services. func (h *signalHandler) shutdown(ctx context.Context) (status int) { - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, h.shutdownTimeout) defer cancel() status = osutil.ExitCodeSuccess @@ -173,7 +176,7 @@ func newSignalHandler( signal: make(chan os.Signal, 1), pidFile: pidFile, services: svcs, - shutdownTimeout: defaultTimeout, + shutdownTimeout: defaultTimeoutShutdown, } notifier := osutil.DefaultSignalNotifier{} diff --git a/internal/next/websvc/server.go b/internal/next/websvc/server.go index df0a5d52..f1299d04 100644 --- a/internal/next/websvc/server.go +++ b/internal/next/websvc/server.go @@ -37,6 +37,9 @@ type server struct { initialAddr netip.AddrPort } +// loggerKeyServer is the key used by [server] to identify itself. +const loggerKeyServer = "server" + // newServer returns a *server that is ready to serve HTTP queries. The TCP // listener is not started. handler must not be nil. func newServer( @@ -55,7 +58,7 @@ func newServer( u.Scheme = urlutil.SchemeHTTPS } - logger := baseLogger.With("server", u) + logger := baseLogger.With(loggerKeyServer, u) return &server{ mu: &sync.Mutex{}, @@ -96,11 +99,9 @@ func (s *server) localAddr() (addr net.Addr) { func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) { l, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(s.initialAddr)) if err != nil { - err = fmt.Errorf("listening tcp: %w", err) - s.logger.ErrorContext(ctx, "listening tcp", slogutil.KeyError, err) - panic(fmt.Errorf("websvc: %s", err)) + panic(fmt.Errorf("websvc: listening tcp: %w", err)) } func() { @@ -111,7 +112,7 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) { // Reassign the address in case the port was zero. s.url.Host = l.Addr().String() - s.logger = baseLogger.With("server", s.url) + s.logger = baseLogger.With(loggerKeyServer, s.url) s.http.ErrorLog = slog.NewLogLogger(s.logger.Handler(), slog.LevelError) }() @@ -123,9 +124,9 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) { return } - err = fmt.Errorf("serving: %w", err) - s.logger.ErrorContext(ctx, "serve failed", slogutil.KeyError, err) - panic(fmt.Errorf("websvc: %s", err)) + s.logger.ErrorContext(ctx, "serving", slogutil.KeyError, err) + + panic(fmt.Errorf("websvc: serving: %w", err)) } // shutdown shuts s down. diff --git a/internal/next/websvc/websvc.go b/internal/next/websvc/websvc.go index 88325e98..189d231e 100644 --- a/internal/next/websvc/websvc.go +++ b/internal/next/websvc/websvc.go @@ -163,22 +163,38 @@ func (svc *Service) Start(ctx context.Context) (err error) { go svc.pprof.serve(ctx, svc.logger) } - started := false - for !started { + return svc.wait(ctx) +} + +// wait waits until either the context is canceled or all servers have started. +func (svc *Service) wait(ctx context.Context) (err error) { + for !svc.serversHaveStarted() { select { case <-ctx.Done(): return ctx.Err() default: - started = true - for _, srv := range svc.servers { - started = started && srv.localAddr() != nil - } + // Wait and let the other goroutines do their job. + runtime.Gosched() } } return nil } +// serversHaveStarted returns true if all servers have started serving. +func (svc *Service) serversHaveStarted() (started bool) { + started = len(svc.servers) != 0 + for _, srv := range svc.servers { + started = started && srv.localAddr() != nil + } + + if svc.pprof != nil { + started = started && svc.pprof.localAddr() != nil + } + + return started +} + // Shutdown implements the [agh.Service] interface for *Service. svc may be // nil. func (svc *Service) Shutdown(ctx context.Context) (err error) {