Skip to content

Commit

Permalink
Merge pull request #29073 from vespa-engine/vekterli/start-moving-out…
Browse files Browse the repository at this point in the history
…-content-reconfig-to-runloop

Propagate existing `StorageNode` config from main `Process` reconfig loop
  • Loading branch information
baldersheim authored Oct 23, 2023
2 parents cb5d179 + 24ea2ed commit 7b3e6d0
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 130 deletions.
5 changes: 3 additions & 2 deletions storage/src/vespa/storage/storageserver/distributornode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ namespace storage {
DistributorNode::DistributorNode(
const config::ConfigUri& configUri,
DistributorNodeContext& context,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
uint32_t num_distributor_stripes,
StorageLink::UP communicationManager,
std::unique_ptr<IStorageChainBuilder> storage_chain_builder)
: StorageNode(configUri, context, generationFetcher,
std::make_unique<HostInfo>(),
: StorageNode(configUri, context, std::move(bootstrap_configs),
generationFetcher, std::make_unique<HostInfo>(),
!communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE),
_threadPool(framework::TickingThreadPool::createDefault("distributor", 100ms, 1, 5s)),
_stripe_pool(std::make_unique<distributor::DistributorStripePool>()),
Expand Down
8 changes: 1 addition & 7 deletions storage/src/vespa/storage/storageserver/distributornode.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
* \class storage::DistributorNode
* \ingroup storageserver
*
* \brief Class for setting up a distributor node.
*/

#pragma once

#include "distributornodecontext.h"
Expand Down Expand Up @@ -49,6 +42,7 @@ class DistributorNode

DistributorNode(const config::ConfigUri & configUri,
DistributorNodeContext&,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
uint32_t num_distributor_stripes,
std::unique_ptr<StorageLink> communicationManager,
Expand Down
22 changes: 5 additions & 17 deletions storage/src/vespa/storage/storageserver/servicelayernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ LOG_SETUP(".node.servicelayer");

namespace storage {

ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context,
ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
ServiceLayerNodeContext& context,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
const VisitorFactory::Map& externalVisitors)
: StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()),
: StorageNode(configUri, context, std::move(bootstrap_configs), generationFetcher, std::make_unique<HostInfo>()),
_context(context),
_persistenceProvider(persistenceProvider),
_externalVisitors(externalVisitors),
Expand Down Expand Up @@ -85,21 +87,6 @@ ServiceLayerNode::~ServiceLayerNode()
shutdown();
}

void
ServiceLayerNode::subscribeToConfigs()
{
StorageNode::subscribeToConfigs();
// TODO consolidate this with existing config fetcher in StorageNode parent...
_configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
}

void
ServiceLayerNode::removeConfigSubscriptions()
{
StorageNode::removeConfigSubscriptions();
_configFetcher.reset();
}

void
ServiceLayerNode::initializeNodeSpecific()
{
Expand Down Expand Up @@ -134,6 +121,7 @@ ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
ns.setCapacity(newC.nodeCapacity);
}
if (updated) {
// FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional?
_server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
_component->getStateUpdater().setReportedNodeState(ns);
}
Expand Down
19 changes: 8 additions & 11 deletions storage/src/vespa/storage/storageserver/servicelayernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ class ServiceLayerNode
private NodeStateReporter

{
ServiceLayerNodeContext & _context;
spi::PersistenceProvider & _persistenceProvider;
VisitorFactory::Map _externalVisitors;
ServiceLayerNodeContext& _context;
spi::PersistenceProvider& _persistenceProvider;
VisitorFactory::Map _externalVisitors;

// FIXME: Should probably use the fetcher in StorageNode
std::unique_ptr<config::ConfigFetcher> _configFetcher;
Bouncer* _bouncer;
BucketManager* _bucket_manager;
FileStorManager* _fileStorManager;
bool _init_has_been_called;
Bouncer* _bouncer;
BucketManager* _bucket_manager;
FileStorManager* _fileStorManager;
bool _init_has_been_called;

public:
using UP = std::unique_ptr<ServiceLayerNode>;

ServiceLayerNode(const config::ConfigUri & configUri,
ServiceLayerNodeContext& context,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
const VisitorFactory::Map& externalVisitors);
Expand All @@ -61,14 +60,12 @@ class ServiceLayerNode

private:
void report(vespalib::JsonStream &writer) const override;
void subscribeToConfigs() override;
void initializeNodeSpecific() override;
void perform_post_chain_creation_init_steps() override;
void handleLiveConfigUpdate(const InitialGuard & initGuard) override;
VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override;
documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override;
void createChain(IStorageChainBuilder &builder) override;
void removeConfigSubscriptions() override;
void on_bouncer_config_changed() override;
};

Expand Down
64 changes: 22 additions & 42 deletions storage/src/vespa/storage/storageserver/storagenode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ removePidFile(const vespalib::string& pidfile)

} // End of anonymous namespace

StorageNode::BootstrapConfigs::BootstrapConfigs() = default;
StorageNode::BootstrapConfigs::~BootstrapConfigs() = default;
StorageNode::BootstrapConfigs::BootstrapConfigs(BootstrapConfigs&&) noexcept = default;
StorageNode::BootstrapConfigs& StorageNode::BootstrapConfigs::operator=(BootstrapConfigs&&) noexcept = default;

StorageNode::StorageNode(
const config::ConfigUri & configUri,
StorageNodeContext& context,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
std::unique_ptr<HostInfo> hostInfo,
RunMode mode)
: _singleThreadedDebugMode(mode == SINGLE_THREADED_TEST_MODE),
_configFetcher(),
_hostInfo(std::move(hostInfo)),
_context(context),
_generationFetcher(generationFetcher),
Expand All @@ -90,10 +95,11 @@ StorageNode::StorageNode(
_chain(),
_configLock(),
_initial_config_mutex(),
_bucket_spaces_config(),
_comm_mgr_config(),
_distribution_config(),
_server_config(),
_bouncer_config(std::move(bootstrap_configs.bouncer_cfg)),
_bucket_spaces_config(std::move(bootstrap_configs.bucket_spaces_cfg)),
_comm_mgr_config(std::move(bootstrap_configs.comm_mgr_cfg)),
_distribution_config(std::move(bootstrap_configs.distribution_cfg)),
_server_config(std::move(bootstrap_configs.server_cfg)),
_component(),
_node_identity(),
_configUri(configUri),
Expand All @@ -102,40 +108,16 @@ StorageNode::StorageNode(
{
}

void
StorageNode::subscribeToConfigs()
{
_configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext());
_configFetcher->subscribe<StorBouncerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<CommunicationManagerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this);

_configFetcher->start();

// All the below config instances were synchronously populated as part of start()ing the config fetcher
std::lock_guard configLockGuard(_configLock);
_bouncer_config.promote_staging_to_active();
_bucket_spaces_config.promote_staging_to_active();
_comm_mgr_config.promote_staging_to_active();
_distribution_config.promote_staging_to_active();
_server_config.promote_staging_to_active();
}

void
StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
{
// Avoid racing with concurrent reconfigurations before we've set up the entire
// node component stack.
// TODO no longer needed... probably
std::lock_guard<std::mutex> concurrent_config_guard(_initial_config_mutex);

_context.getComponentRegister().registerShutdownListener(*this);

// Fetch configs needed first. These functions will just grab the config
// and store them away, while having the config lock.
subscribeToConfigs();

// First update some basics that doesn't depend on anything else to be
// available
_rootFolder = server_config().rootFolder;
Expand Down Expand Up @@ -260,7 +242,7 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
_server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO isn't this a no-op...?
_server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode
_server_config.staging.reset();
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
Expand Down Expand Up @@ -346,13 +328,6 @@ StorageNode::notifyDoneInitializing()

StorageNode::~StorageNode() = default;

void
StorageNode::removeConfigSubscriptions()
{
LOG(debug, "Removing config subscribers");
_configFetcher.reset();
}

void
StorageNode::shutdown()
{
Expand All @@ -364,8 +339,6 @@ StorageNode::shutdown()
LOG(debug, "Storage killed before requestShutdown() was called. No "
"reason has been given for why we're stopping.");
}
// Remove the subscription to avoid more callbacks from config
removeConfigSubscriptions();

if (_chain) {
LOG(debug, "Closing storage chain");
Expand Down Expand Up @@ -535,13 +508,20 @@ StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> bui
}

template <typename ConfigT>
StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() = default;
StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() noexcept = default;

template <typename ConfigT>
StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper(std::unique_ptr<ConfigT> initial_active) noexcept
: staging(),
active(std::move(initial_active))
{
}

template <typename ConfigT>
StorageNode::ConfigWrapper<ConfigT>::~ConfigWrapper() = default;

template <typename ConfigT>
void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() {
void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() noexcept {
assert(staging);
active = std::move(staging);
}
Expand Down
59 changes: 32 additions & 27 deletions storage/src/vespa/storage/storageserver/storagenode.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,45 @@ struct StorageNodeContext;
namespace lib { class NodeType; }


class StorageNode : private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>,
private config::IFetcherCallback<vespa::config::content::StorDistributionConfig>,
private config::IFetcherCallback<vespa::config::content::core::BucketspacesConfig>,
private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
private config::IFetcherCallback<vespa::config::content::core::StorBouncerConfig>,
private framework::MetricUpdateHook,
class StorageNode : private framework::MetricUpdateHook,
private DoneInitializeHandler,
private framework::defaultimplementation::ShutdownListener
{
public:
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig;
using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
using StorServerConfig = vespa::config::content::core::StorServerConfig;

enum RunMode { NORMAL, SINGLE_THREADED_TEST_MODE };

StorageNode(const StorageNode &) = delete;
StorageNode & operator = (const StorageNode &) = delete;

struct BootstrapConfigs {
std::unique_ptr<StorBouncerConfig> bouncer_cfg;
std::unique_ptr<BucketspacesConfig> bucket_spaces_cfg;
std::unique_ptr<CommunicationManagerConfig> comm_mgr_cfg;
std::unique_ptr<StorDistributionConfig> distribution_cfg;
std::unique_ptr<StorServerConfig> server_cfg;

BootstrapConfigs();
~BootstrapConfigs();
BootstrapConfigs(BootstrapConfigs&&) noexcept;
BootstrapConfigs& operator=(BootstrapConfigs&&) noexcept;
};

StorageNode(const config::ConfigUri& configUri,
StorageNodeContext& context,
BootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
std::unique_ptr<HostInfo> hostInfo,
RunMode = NORMAL);
~StorageNode() override;

virtual const lib::NodeType& getNodeType() const = 0;
bool attemptedStopped() const;
[[nodiscard]] bool attemptedStopped() const;
void notifyDoneInitializing() override;
void waitUntilInitialized(vespalib::duration timeout = 15s);
void updateMetrics(const MetricLockGuard & guard) override;
Expand All @@ -88,19 +103,17 @@ class StorageNode : private config::IFetcherCallback<vespa::config::content::cor
void requestShutdown(vespalib::stringref reason) override;
DoneInitializeHandler& getDoneInitializeHandler() { return *this; }

void configure(std::unique_ptr<StorServerConfig> config);
void configure(std::unique_ptr<StorDistributionConfig> config);
void configure(std::unique_ptr<BucketspacesConfig>);
void configure(std::unique_ptr<CommunicationManagerConfig> config);
void configure(std::unique_ptr<StorBouncerConfig> config);

// For testing
StorageLink* getChain() { return _chain.get(); }
virtual void initializeStatusWebServer();
protected:
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig;
using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
using StorServerConfig = vespa::config::content::core::StorServerConfig;
private:
bool _singleThreadedDebugMode;
// Subscriptions to config
std::unique_ptr<config::ConfigFetcher> _configFetcher;

std::unique_ptr<HostInfo> _hostInfo;

Expand Down Expand Up @@ -131,26 +144,20 @@ class StorageNode : private config::IFetcherCallback<vespa::config::content::cor
std::unique_ptr<ConfigT> staging;
std::unique_ptr<ConfigT> active;

ConfigWrapper();
ConfigWrapper() noexcept;
explicit ConfigWrapper(std::unique_ptr<ConfigT> initial_active) noexcept;
~ConfigWrapper();

void promote_staging_to_active();
void promote_staging_to_active() noexcept;
};

template <typename ConfigT>
void stage_config_change(ConfigWrapper<ConfigT>& my_cfg, std::unique_ptr<ConfigT> new_cfg);

/** Implementation of config callbacks. */
void configure(std::unique_ptr<StorServerConfig> config) override;
void configure(std::unique_ptr<StorDistributionConfig> config) override;
void configure(std::unique_ptr<BucketspacesConfig>) override;
void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
void configure(std::unique_ptr<StorBouncerConfig> config) override;

protected:
// Lock taken while doing configuration of the server.
std::mutex _configLock;
std::mutex _initial_config_mutex;
std::mutex _initial_config_mutex; // TODO can probably be removed
using InitialGuard = std::lock_guard<std::mutex>;

ConfigWrapper<StorBouncerConfig> _bouncer_config;
Expand Down Expand Up @@ -193,13 +200,11 @@ class StorageNode : private config::IFetcherCallback<vespa::config::content::cor
std::unique_ptr<StateManager> releaseStateManager();

void initialize(const NodeStateReporter & reporter);
virtual void subscribeToConfigs();
virtual void initializeNodeSpecific() = 0;
virtual void perform_post_chain_creation_init_steps() = 0;
virtual void createChain(IStorageChainBuilder &builder) = 0;
virtual void handleLiveConfigUpdate(const InitialGuard & initGuard);
void shutdown();
virtual void removeConfigSubscriptions();

virtual void on_bouncer_config_changed() { /* no-op by default */ }
public:
Expand Down
Loading

0 comments on commit 7b3e6d0

Please sign in to comment.