Skip to content

Commit

Permalink
Merge pull request #25919 from vespa-engine/toregge/pass-optional-ser…
Browse files Browse the repository at this point in the history
…ial-num-to-prepare-reconfig

Pass optional serial num to prepare reconfig
  • Loading branch information
geirst authored Feb 7, 2023
2 parents c71bc7f + 1fb56a0 commit 260788d
Show file tree
Hide file tree
Showing 28 changed files with 116 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,20 @@ struct Fixture : public BaseFixture, public AttributeManagerFixture
struct SequentialAttributeManager
{
SequentialAttributesInitializer initializer;
uint32_t docid_limit;
SerialNum serial_num;
proton::AttributeManager mgr;
SequentialAttributeManager(const AttributeManager &currMgr, AttrMgrSpec && newSpec);
~SequentialAttributeManager();
};

SequentialAttributeManager::SequentialAttributeManager(const AttributeManager &currMgr, AttrMgrSpec && newSpec)
: initializer(newSpec.getDocIdLimit()),
docid_limit(newSpec.getDocIdLimit()),
serial_num(newSpec.getCurrentSerialNum().value_or(0)),
mgr(currMgr, std::move(newSpec), initializer)
{
mgr.addInitializedAttributes(initializer.getInitializedAttributes(), std::nullopt, std::nullopt);
mgr.addInitializedAttributes(initializer.getInitializedAttributes(), docid_limit, serial_num);
}
SequentialAttributeManager::~SequentialAttributeManager() = default;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Fixture::reconfigure(const DocumentDBConfig& new_config_snapshot,
IDocumentDBReferenceResolver& resolver,
SerialNum serial_num)
{
auto prepared_reconfig = _configurer->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
auto prepared_reconfig = _configurer->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
_configurer->reconfigure(new_config_snapshot, old_config_snapshot, reconfig_params, resolver, *prepared_reconfig, serial_num);
}

Expand All @@ -258,7 +258,7 @@ Fixture::reconfigure(const DocumentDBConfig& new_config_snapshot,
IDocumentDBReferenceResolver& resolver,
SerialNum serial_num)
{
auto prepared_reconfig = _configurer->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
auto prepared_reconfig = _configurer->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
return _configurer->reconfigure(new_config_snapshot, old_config_snapshot, std::move(attr_spec), reconfig_params, resolver, *prepared_reconfig, serial_num);
}

Expand Down Expand Up @@ -341,7 +341,7 @@ FastAccessFixture::reconfigure(const DocumentDBConfig& new_config_snapshot,
SerialNum serial_num)
{
ReconfigParams reconfig_params{CCR()};
auto prepared_reconfig = _configurer.prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
auto prepared_reconfig = _configurer.prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
return _configurer.reconfigure(new_config_snapshot, old_config_snapshot, std::move(attr_spec), *prepared_reconfig, serial_num);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ struct FixtureBase
cmpResult.documentTypeRepoChanged = true;
MyDocumentDBReferenceResolver resolver;
ReconfigParams reconfig_params(cmpResult);
auto prepared_reconfig = _subDb.prepare_reconfig(*newCfg->_cfg, *_snapshot->_cfg, reconfig_params);
auto prepared_reconfig = _subDb.prepare_reconfig(*newCfg->_cfg, *_snapshot->_cfg, reconfig_params, serialNum);
auto tasks = _subDb.applyConfig(*newCfg->_cfg, *_snapshot->_cfg,
serialNum, reconfig_params, resolver, *prepared_reconfig);
prepared_reconfig.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void
AttributeManagerInitializerTask::run()
{
_attrMgr->addExtraAttribute(_documentMetaStore);
_attrMgr->addInitializedAttributes(_attributesResult.get(), std::nullopt, std::nullopt);
_attrMgr->addInitializedAttributes(_attributesResult.get(), 1, _configSerialNum);
_attrMgr->pruneRemovedFields(_configSerialNum);
_promise.set_value();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ AttributeManagerReconfig::AttributeManagerReconfig(std::shared_ptr<AttributeMana
AttributeManagerReconfig::~AttributeManagerReconfig() = default;

std::shared_ptr<AttributeManager>
AttributeManagerReconfig::create(std::optional<uint32_t> docid_limit, std::optional<search::SerialNum> serial_num)
AttributeManagerReconfig::create(uint32_t docid_limit, search::SerialNum serial_num)
{
assert(_mgr);
_mgr->addInitializedAttributes(_initializer->getInitializedAttributes(), docid_limit, serial_num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AttributeManagerReconfig {
AttributeManagerReconfig(std::shared_ptr<AttributeManager> mgr,
std::unique_ptr<SequentialAttributesInitializer> initializer);
~AttributeManagerReconfig();
std::shared_ptr<AttributeManager> create(std::optional<uint32_t> docid_limit, std::optional<search::SerialNum> serial_num);
std::shared_ptr<AttributeManager> create(uint32_t docid_limit, search::SerialNum serial_num);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,24 @@ AttributeManager::addAttribute(AttributeSpec && spec, uint64_t serialNum)
}

void
AttributeManager::addInitializedAttributes(const std::vector<AttributeInitializerResult> &attributes, std::optional<uint32_t> docid_limit, std::optional<SerialNum> serial_num)
AttributeManager::addInitializedAttributes(const std::vector<AttributeInitializerResult> &attributes, uint32_t docid_limit, SerialNum serial_num)
{
/*
* Called (indirectly) by
* DocumentSubDBCollection::complete_prepare_reconfig to complete
* setup of new attribute manager.
*/
for (const auto &result : attributes) {
assert(result);
auto attr = result.getAttribute();
if (docid_limit.has_value()) {
AttributeManager::padAttribute(*attr, docid_limit.value());
}
if (serial_num.has_value()) {
attr->setCreateSerialNum(serial_num.value());
if (attr->getCreateSerialNum() == 0) {
/*
* The attribute vector is empty (not loaded from disk)
* and has been added as part of live reconfig. Make it
* ready for use by setting size and create serial num.
*/
AttributeManager::padAttribute(*attr, docid_limit);
attr->setCreateSerialNum(serial_num);
}
attr->setInterlock(_interlock);
auto shrinker = allocShrinker(attr, _attributeFieldWriter, *_diskLayout);
Expand Down Expand Up @@ -503,8 +511,10 @@ proton::IAttributeManager::SP
AttributeManager::create(Spec&& spec) const
{
assert(spec.getCurrentSerialNum().has_value());
auto docid_limit = spec.getDocIdLimit();
auto serial_num = spec.getCurrentSerialNum().value();
auto prepared_result = prepare_create(std::move(spec));
return prepared_result->create(std::nullopt, std::nullopt);
return prepared_result->create(docid_limit, serial_num);
}

std::vector<IFlushTarget::SP>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class AttributeManager : public proton::IAttributeManager

AttributeVectorSP addAttribute(AttributeSpec && spec, uint64_t serialNum);

void addInitializedAttributes(const std::vector<AttributeInitializerResult> &attributes, std::optional<uint32_t> docid_limit, std::optional<SerialNum> serial_num);
void addInitializedAttributes(const std::vector<AttributeInitializerResult> &attributes, uint32_t docid_limit, SerialNum serial_num);

void addExtraAttribute(const AttributeVectorSP &attribute);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

namespace proton {

DocumentDBReconfig::DocumentDBReconfig(std::unique_ptr<const DocumentSubDBReconfig> ready_reconfig_in,
std::unique_ptr<const DocumentSubDBReconfig> not_ready_reconfig_in)
DocumentDBReconfig::DocumentDBReconfig(std::unique_ptr<DocumentSubDBReconfig> ready_reconfig_in,
std::unique_ptr<DocumentSubDBReconfig> not_ready_reconfig_in)
: _ready_reconfig(std::move(ready_reconfig_in)),
_not_ready_reconfig(std::move(not_ready_reconfig_in))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,31 @@ class DocumentSubDBReconfig;
/**
* Class representing the result of the prepare step of a DocumentDB reconfig.
*
* The reconfig is performed in two steps:
* The reconfig is performed in three steps:
* Prepare:
* Based on the config that is changed, new components are instantiated in each subdb.
* This can be costly and is handled by helper threads from the shared executor pool.
*
* Complete prepare:
* Docid limit and serial number are used to complete prepared reconfig.
*
* Apply:
* The new components are swapped with the old ones in the DocumentDB master write thread.
*/
class DocumentDBReconfig {
private:
std::unique_ptr<const DocumentSubDBReconfig> _ready_reconfig;
std::unique_ptr<const DocumentSubDBReconfig> _not_ready_reconfig;
std::unique_ptr<DocumentSubDBReconfig> _ready_reconfig;
std::unique_ptr<DocumentSubDBReconfig> _not_ready_reconfig;

public:
DocumentDBReconfig(std::unique_ptr<const DocumentSubDBReconfig> ready_reconfig_in,
std::unique_ptr<const DocumentSubDBReconfig> not_ready_reconfig_in);
DocumentDBReconfig(std::unique_ptr<DocumentSubDBReconfig> ready_reconfig_in,
std::unique_ptr<DocumentSubDBReconfig> not_ready_reconfig_in);
~DocumentDBReconfig();

const DocumentSubDBReconfig& ready_reconfig() const { return *_ready_reconfig; }
const DocumentSubDBReconfig& not_ready_reconfig() const { return *_not_ready_reconfig; }
const DocumentSubDBReconfig& ready_reconfig() const noexcept { return *_ready_reconfig; }
DocumentSubDBReconfig& ready_reconfig() noexcept { return *_ready_reconfig; }
const DocumentSubDBReconfig& not_ready_reconfig() const noexcept { return *_not_ready_reconfig; }
DocumentSubDBReconfig& not_ready_reconfig() noexcept { return *_not_ready_reconfig; }
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,12 @@ DocumentSubDBReconfig::DocumentSubDBReconfig(std::shared_ptr<Matchers> matchers_

DocumentSubDBReconfig::~DocumentSubDBReconfig() = default;

void
DocumentSubDBReconfig::complete(uint32_t docid_limit, search::SerialNum serial_num)
{
(void) docid_limit;
(void) serial_num;
}

}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <vespa/searchlib/common/serialnum.h>
#include <memory>

namespace proton {
Expand All @@ -26,7 +27,8 @@ class DocumentSubDBReconfig {
return _old_matchers != _new_matchers;
}
std::shared_ptr<Matchers> matchers() const { return _new_matchers; }

void complete(uint32_t docid_limit, search::SerialNum serial_num);
};

}

13 changes: 7 additions & 6 deletions searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,15 @@ DocumentDB::initFinish(DocumentDBConfig::SP configSnapshot)
}

std::unique_ptr<DocumentDBReconfig>
DocumentDB::prepare_reconfig(const DocumentDBConfig& new_config_snapshot)
DocumentDB::prepare_reconfig(const DocumentDBConfig& new_config_snapshot, std::optional<SerialNum> serial_num)
{
auto active_config_snapshot = getActiveConfig();
auto cmpres = active_config_snapshot->compare(new_config_snapshot);
if (_state.getState() == DDBState::State::APPLY_LIVE_CONFIG) {
cmpres.importedFieldsChanged = true;
}
const ReconfigParams reconfig_params(cmpres);
return _subDBs.prepare_reconfig(new_config_snapshot, *active_config_snapshot, reconfig_params);
return _subDBs.prepare_reconfig(new_config_snapshot, *active_config_snapshot, reconfig_params, serial_num);
}

void
Expand Down Expand Up @@ -423,7 +423,7 @@ DocumentDB::applySubDBConfig(const DocumentDBConfig &newConfigSnapshot,
}

void
DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum, std::unique_ptr<const DocumentDBReconfig> prepared_reconfig)
DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum, std::unique_ptr<DocumentDBReconfig> prepared_reconfig)
{
// Always called by executor thread:
// Called by performReconfig() by executor thread during normal
Expand Down Expand Up @@ -472,6 +472,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
bool elidedConfigSave = equalReplayConfig && tlsReplayDone;
forceCommitAndWait(*_feedView.get(), elidedConfigSave ? serialNum : serialNum - 1, std::move(commit_result));
}
_subDBs.complete_prepare_reconfig(*prepared_reconfig, serialNum);
if (params.shouldMaintenanceControllerChange()) {
_maintenanceController.killJobs();
}
Expand Down Expand Up @@ -796,7 +797,7 @@ DocumentDB::reconfigure(DocumentDBConfig::SP snapshot)
assert(active_snapshot);
assert(_state.getAllowReconfig());
snapshot = DocumentDBConfig::makeDelayedAttributeAspectConfig(snapshot, *active_snapshot);
auto prepared_reconfig = prepare_reconfig(*snapshot);
auto prepared_reconfig = prepare_reconfig(*snapshot, std::nullopt);
masterExecute([this, snapshot, prepared_reconfig = std::move(prepared_reconfig)]() mutable { performReconfig(snapshot, std::move(prepared_reconfig)); });
// Wait for config to be applied, or for document db close
std::unique_lock<std::mutex> guard(_configMutex);
Expand Down Expand Up @@ -835,7 +836,7 @@ DocumentDB::enterApplyLiveConfigState()
(void) _state.enterApplyLiveConfigState();
}
auto new_config_snapshot = _pendingConfigSnapshot.get();
auto prepared_reconfig = prepare_reconfig(*new_config_snapshot);
auto prepared_reconfig = prepare_reconfig(*new_config_snapshot, std::nullopt);
masterExecute([this, new_config_snapshot, prepared_reconfig = std::move(prepared_reconfig)]() mutable
{
performReconfig(std::move(new_config_snapshot), std::move(prepared_reconfig));
Expand Down Expand Up @@ -896,7 +897,7 @@ DocumentDB::replayConfig(search::SerialNum serialNum)
configSnapshot = DocumentDBConfigScout::scout(configSnapshot, *_pendingConfigSnapshot.get());
// Ignore configs that are not relevant during replay of transaction log
configSnapshot = DocumentDBConfig::makeReplayConfig(configSnapshot);
auto prepared_reconfig = prepare_reconfig(*configSnapshot);
auto prepared_reconfig = prepare_reconfig(*configSnapshot, serialNum);
applyConfig(configSnapshot, serialNum, std::move(prepared_reconfig));
LOG(info, "DocumentDB(%s): Replayed config with serialNum=%" PRIu64,
_docTypeName.toString().c_str(), serialNum);
Expand Down
4 changes: 2 additions & 2 deletions searchcore/src/vespa/searchcore/proton/server/documentdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class DocumentDB : public DocumentDBConfigOwner,
void applySubDBConfig(const DocumentDBConfig &newConfigSnapshot,
SerialNum serialNum, const ReconfigParams &params,
const DocumentDBReconfig& prepared_reconfig);
void applyConfig(DocumentDBConfigSP configSnapshot, SerialNum serialNum, std::unique_ptr<const DocumentDBReconfig> prepared_reconfig);
void applyConfig(DocumentDBConfigSP configSnapshot, SerialNum serialNum, std::unique_ptr<DocumentDBReconfig> prepared_reconfig);

/**
* Save initial config if we don't have any saved config snapshots.
Expand Down Expand Up @@ -382,7 +382,7 @@ class DocumentDB : public DocumentDBConfigOwner,
bool getDelayedConfig() const { return _state.getDelayedConfig(); }
void replayConfig(SerialNum serialNum) override;
const DocTypeName & getDocTypeName() const { return _docTypeName; }
std::unique_ptr<DocumentDBReconfig> prepare_reconfig(const DocumentDBConfig& new_config_snapshot);
std::unique_ptr<DocumentDBReconfig> prepare_reconfig(const DocumentDBConfig& new_config_snapshot, std::optional<SerialNum> serial_num);
void reconfigure(DocumentDBConfigSP snapshot) override;
int64_t getActiveGeneration() const;
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,20 @@ DocumentSubDBCollection::pruneRemovedFields(SerialNum serialNum)
}

std::unique_ptr<DocumentDBReconfig>
DocumentSubDBCollection::prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params)
DocumentSubDBCollection::prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params, std::optional<SerialNum> serial_num)
{
auto ready_reconfig = getReadySubDB()->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
auto not_ready_reconfig = getNotReadySubDB()->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
auto ready_reconfig = getReadySubDB()->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
auto not_ready_reconfig = getNotReadySubDB()->prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
return std::make_unique<DocumentDBReconfig>(std::move(ready_reconfig), std::move(not_ready_reconfig));
}

void
DocumentSubDBCollection::complete_prepare_reconfig(DocumentDBReconfig& prepared_reconfig, SerialNum serial_num)
{
getReadySubDB()->complete_prepare_reconfig(prepared_reconfig.ready_reconfig(), serial_num);
getNotReadySubDB()->complete_prepare_reconfig(prepared_reconfig.not_ready_reconfig(), serial_num);
}

void
DocumentSubDBCollection::applyConfig(const DocumentDBConfig &newConfigSnapshot,
const DocumentDBConfig &oldConfigSnapshot,
Expand All @@ -271,7 +278,7 @@ DocumentSubDBCollection::applyConfig(const DocumentDBConfig &newConfigSnapshot,
_reprocessingRunner.addTasks(tasks);
tasks = getNotReadySubDB()->applyConfig(newConfigSnapshot, oldConfigSnapshot, serialNum, params, resolver, prepared_reconfig.not_ready_reconfig());
_reprocessingRunner.addTasks(tasks);
auto removed_reconfig = getRemSubDB()->prepare_reconfig(newConfigSnapshot, oldConfigSnapshot, params);
auto removed_reconfig = getRemSubDB()->prepare_reconfig(newConfigSnapshot, oldConfigSnapshot, params, serialNum);
tasks = getRemSubDB()->applyConfig(newConfigSnapshot, oldConfigSnapshot, serialNum, params, resolver, *removed_reconfig);
removed_reconfig.reset();
_reprocessingRunner.addTasks(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vespa/vespalib/util/varholder.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <mutex>
#include <optional>

namespace vespalib {
class Clock;
Expand Down Expand Up @@ -137,7 +138,8 @@ class DocumentSubDBCollection {

void pruneRemovedFields(SerialNum serialNum);

std::unique_ptr<DocumentDBReconfig> prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params);
std::unique_ptr<DocumentDBReconfig> prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params, std::optional<SerialNum> serial_num);
void complete_prepare_reconfig(DocumentDBReconfig& prepared_reconfig, SerialNum serial_num);
void applyConfig(const DocumentDBConfig &newConfigSnapshot, const DocumentDBConfig &oldConfigSnapshot,
SerialNum serialNum, const ReconfigParams &params, IDocumentDBReferenceResolver &resolver, const DocumentDBReconfig& prepared_reconfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ FastAccessDocSubDB::initViews(const DocumentDBConfig &configSnapshot)
}
}

std::unique_ptr<const DocumentSubDBReconfig>
FastAccessDocSubDB::prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params)
std::unique_ptr<DocumentSubDBReconfig>
FastAccessDocSubDB::prepare_reconfig(const DocumentDBConfig& new_config_snapshot, const DocumentDBConfig& old_config_snapshot, const ReconfigParams& reconfig_params, std::optional<SerialNum> serial_num)
{
return _configurer.prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params);
return _configurer.prepare_reconfig(new_config_snapshot, old_config_snapshot, reconfig_params, serial_num);
}

IReprocessingTask::List
Expand Down
Loading

0 comments on commit 260788d

Please sign in to comment.