Skip to content

Commit

Permalink
Merge pull request #29012 from vespa-engine/vekterli/dedupe-storageno…
Browse files Browse the repository at this point in the history
…de-config-propagation

De-dupe `StorageNode` config propagation
  • Loading branch information
baldersheim authored Oct 18, 2023
2 parents 24f8d5a + abcadea commit 7742d05
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
if (_retrievedCommunicationManager) {
builder.add(std::move(_retrievedCommunicationManager));
} else {
auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, *_comm_mgr_config);
auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, communication_manager_config());
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
}
Expand Down
12 changes: 6 additions & 6 deletions storage/src/vespa/storage/storageserver/servicelayernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ ServiceLayerNode::initializeNodeSpecific()
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());

ns.setCapacity(_serverConfig->nodeCapacity);
ns.setCapacity(server_config().nodeCapacity);
LOG(debug, "Adjusting reported node state to include capacity: %s", ns.toString().c_str());
_component->getStateUpdater().setReportedNodeState(ns);
}
Expand All @@ -118,10 +118,10 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
if (_newServerConfig) {
if (_server_config.staging) {
bool updated = false;
vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
StorServerConfig& newC(*_newServerConfig);
vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
StorServerConfig& newC(*_server_config.staging);
{
updated = false;
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
Expand All @@ -133,7 +133,7 @@ ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
ns.setCapacity(newC.nodeCapacity);
}
if (updated) {
_serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC));
_server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
_component->getStateUpdater().setReportedNodeState(ns);
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
{
ServiceLayerComponentRegister& compReg(_context.getComponentRegister());

auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, *_comm_mgr_config);
auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, communication_manager_config());
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
builder.add(std::make_unique<Bouncer>(compReg, _configUri));
Expand Down
168 changes: 69 additions & 99 deletions storage/src/vespa/storage/storageserver/storagenode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,10 @@ StorageNode::StorageNode(
_chain(),
_configLock(),
_initial_config_mutex(),
_serverConfig(),
_clusterConfig(),
_distributionConfig(),
_bucketSpacesConfig(),
_newServerConfig(),
_newClusterConfig(),
_newDistributionConfig(),
_newBucketSpacesConfig(),
_bucket_spaces_config(),
_comm_mgr_config(),
_distribution_config(),
_server_config(),
_component(),
_node_identity(),
_configUri(configUri),
Expand All @@ -114,17 +110,15 @@ StorageNode::subscribeToConfigs()
_configFetcher->subscribe<CommunicationManagerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this);

_configFetcher->start();

// All the below config instances were synchronously populated as part of start()ing the config fetcher
std::lock_guard configLockGuard(_configLock);
_bucketSpacesConfig = std::move(_newBucketSpacesConfig);
_clusterConfig = std::move(_newClusterConfig);
_comm_mgr_config = std::move(_new_comm_mgr_config);
_distributionConfig = std::move(_newDistributionConfig);
_serverConfig = std::move(_newServerConfig);
_bucket_spaces_config.promote_staging_to_active();
_comm_mgr_config.promote_staging_to_active();
_distribution_config.promote_staging_to_active();
_server_config.promote_staging_to_active();
}

void
Expand All @@ -142,13 +136,13 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)

// First update some basics that doesn't depend on anything else to be
// available
_rootFolder = _serverConfig->rootFolder;
_rootFolder = server_config().rootFolder;

_context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
_context.getComponentRegister().setNodeInfo(server_config().clusterName, getNodeType(), server_config().nodeIndex);
_context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory());
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig));
_context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
_node_identity = std::make_unique<NodeIdentity>(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(distribution_config()));
_context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config());
_node_identity = std::make_unique<NodeIdentity>(server_config().clusterName, getNodeType(), server_config().nodeIndex);

_metrics = std::make_shared<StorageMetricSet>();
_component = std::make_unique<StorageComponent>(_context.getComponentRegister(), "storagenode");
Expand Down Expand Up @@ -185,17 +179,17 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)

// Start deadlock detector
_deadLockDetector = std::make_unique<DeadLockDetector>(_context.getComponentRegister());
_deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));

createChain(*_chain_builder);
_chain = std::move(*_chain_builder).build();
_chain_builder.reset();

assert(_communicationManager != nullptr);
_communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
_communicationManager->updateBucketSpacesConfig(bucket_spaces_config());

perform_post_chain_creation_init_steps();

Expand Down Expand Up @@ -257,23 +251,23 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
// If we get here, initialize is done running. We have to handle changes
// we want to handle.

if (_newServerConfig) {
StorServerConfigBuilder oldC(*_serverConfig);
StorServerConfig& newC(*_newServerConfig);
if (_server_config.staging) {
StorServerConfigBuilder oldC(*_server_config.active);
StorServerConfig& newC(*_server_config.staging);
DIFFERWARN(rootFolder, "Cannot alter root folder of node live");
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
_serverConfig = std::make_unique<StorServerConfig>(oldC);
_newServerConfig.reset();
_deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
}
if (_newDistributionConfig) {
StorDistributionConfigBuilder oldC(*_distributionConfig);
StorDistributionConfig& newC(*_newDistributionConfig);
_server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO isn't this a no-op...?
_server_config.staging.reset();
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
}
if (_distribution_config.staging) {
StorDistributionConfigBuilder oldC(*_distribution_config.active);
StorDistributionConfig& newC(*_distribution_config.staging);
bool updated = false;
if (DIFFER(redundancy)) {
LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy);
Expand Down Expand Up @@ -304,30 +298,25 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
LOG(info, "Live config update: Group structure altered.");
ASSIGN(group);
}
_distributionConfig = std::make_unique<StorDistributionConfig>(oldC);
_newDistributionConfig.reset();
// This looks weird, but the magical ASSIGN() macro mutates `oldC` in-place upon changes
_distribution_config.active = std::make_unique<StorDistributionConfig>(oldC);
_distribution_config.staging.reset();
if (updated) {
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC));
for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) {
link->storageDistributionChanged();
}
}
}
if (_newClusterConfig) {
if (*_clusterConfig != *_newClusterConfig) {
LOG(warning, "Live config failure: Cannot alter cluster config of node live.");
}
_newClusterConfig.reset();
}

if (_newBucketSpacesConfig) {
_bucketSpacesConfig = std::move(_newBucketSpacesConfig);
_context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
_communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
if (_bucket_spaces_config.staging) {
_bucket_spaces_config.promote_staging_to_active();
_context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config());
_communicationManager->updateBucketSpacesConfig(bucket_spaces_config());
}
if (_new_comm_mgr_config) {
_comm_mgr_config = std::move(_new_comm_mgr_config);
_communicationManager->on_configure(*_comm_mgr_config);
if (_comm_mgr_config.staging) {
_comm_mgr_config.promote_staging_to_active();
_communicationManager->on_configure(communication_manager_config());
}
}

Expand Down Expand Up @@ -438,68 +427,37 @@ StorageNode::shutdown()

void
StorageNode::configure(std::unique_ptr<StorServerConfig> config) {
log_config_received(*config);
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
// to a variable where we can find it later when processing config
// updates
{
std::lock_guard configLockGuard(_configLock);
_newServerConfig = std::move(config);
}
if (_serverConfig) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
}

void
StorageNode::configure(std::unique_ptr<UpgradingConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
_newClusterConfig = std::move(config);
}
if (_clusterConfig) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
stage_config_change(_server_config, std::move(config));
}

void
StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
_newDistributionConfig = std::move(config);
}
if (_distributionConfig) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
stage_config_change(_distribution_config, std::move(config));
}

void
StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) {
log_config_received(*config);
{
std::lock_guard configLockGuard(_configLock);
_newBucketSpacesConfig = std::move(config);
}
if (_bucketSpacesConfig) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
stage_config_change(_bucket_spaces_config, std::move(config));
}

void
StorageNode::configure(std::unique_ptr<CommunicationManagerConfig > config) {
log_config_received(*config);
stage_config_change(_comm_mgr_config, std::move(config));
}

template <typename ConfigT>
void
StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<ConfigT> new_cfg) {
log_config_received(*new_cfg);
// When we get config, we try to grab the config lock to ensure no one
// else is doing configuration work, and then we write the new config
// to a variable where we can find it later when processing config
// updates
{
std::lock_guard config_lock_guard(_configLock);
_new_comm_mgr_config = std::move(config);
cfg.staging = std::move(new_cfg);
}
if (_comm_mgr_config) {
if (cfg.active) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
Expand Down Expand Up @@ -565,4 +523,16 @@ StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> bui
_chain_builder = std::move(builder);
}

template <typename ConfigT>
StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() = default;

template <typename ConfigT>
StorageNode::ConfigWrapper<ConfigT>::~ConfigWrapper() = default;

template <typename ConfigT>
void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() {
assert(staging);
active = std::move(staging);
}

} // storage
Loading

0 comments on commit 7742d05

Please sign in to comment.