Skip to content

Commit

Permalink
Merge pull request #19209 from vespa-engine/geirst/refactor-tracking-…
Browse files Browse the repository at this point in the history
…of-cluster-state-and-distribution

Use BucketSpaceStateMap to track cluster state and distribution in th…
  • Loading branch information
geirst authored Sep 20, 2021
2 parents 33be0f9 + a89b2ce commit b0f91ec
Show file tree
Hide file tree
Showing 19 changed files with 112 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class TopLevelBucketDBUpdaterTest : public Test,
OutdatedNodesMap outdated_nodes_map;
state = PendingClusterState::createForClusterStateChange(
clock, cluster_info, sender,
owner.top_level_bucket_space_repo(),
owner.bucket_space_states(),
cmd, outdated_nodes_map, api::Timestamp(1));
}

Expand All @@ -374,7 +374,7 @@ class TopLevelBucketDBUpdaterTest : public Test,
{
auto cluster_info = owner.create_cluster_info(old_cluster_state);
state = PendingClusterState::createForDistributionChange(
clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1));
clock, cluster_info, sender, owner.bucket_space_states(), api::Timestamp(1));
}
};

Expand Down Expand Up @@ -1389,7 +1389,7 @@ TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::stri
auto cluster_info = create_cluster_info(old_cluster_state);
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1)));
clock, cluster_info, sender, bucket_space_states(), api::Timestamp(1)));

sort_sent_messages_by_index(sender);

Expand Down Expand Up @@ -1514,7 +1514,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) {
OutdatedNodesMap outdated_nodes_map;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
clock, cluster_info, sender, top_level_bucket_space_repo(),
clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, api::Timestamp(1)));

ASSERT_EQ(message_count(3), sender.commands().size());
Expand Down Expand Up @@ -1670,7 +1670,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists(
auto cluster_info = create_cluster_info("cluster:d");

auto state = PendingClusterState::createForClusterStateChange(
clock, cluster_info, sender, top_level_bucket_space_repo(),
clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, before_time);

parse_input_data(existing_data, before_time, *state, include_bucket_info);
Expand All @@ -1690,7 +1690,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists(
auto cluster_info = create_cluster_info(old_state.toString());

auto state = PendingClusterState::createForClusterStateChange(
clock, cluster_info, sender, top_level_bucket_space_repo(),
clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, after_time);

parse_input_data(new_data, after_time, *state, include_bucket_info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,16 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const
return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId);
}

DistributorBucketSpaceRepo&
TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept
BucketSpaceStateMap&
TopLevelDistributorTestUtil::bucket_space_states() noexcept
{
return _distributor->_component.bucket_space_repo();
return _distributor->_component.bucket_space_states();
}

const DistributorBucketSpaceRepo&
TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept
const BucketSpaceStateMap&
TopLevelDistributorTestUtil::bucket_space_states() const noexcept
{
return _distributor->_component.bucket_space_repo();
return _distributor->_component.bucket_space_states();
}

std::unique_ptr<StripeAccessGuard>
Expand Down
10 changes: 5 additions & 5 deletions storage/src/tests/distributor/top_level_distributor_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; }

namespace distributor {

class TopLevelDistributor;
class BucketSpaceStateMap;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
class DistributorMetricSet;
Expand All @@ -26,10 +26,11 @@ class DistributorStripe;
class DistributorStripeComponent;
class DistributorStripeOperationContext;
class DistributorStripePool;
class StripeAccessGuard;
class IdealStateMetricSet;
class Operation;
class StripeAccessGuard;
class TopLevelBucketDBUpdater;
class TopLevelDistributor;

class TopLevelDistributorTestUtil : private DoneInitializeHandler
{
Expand Down Expand Up @@ -60,9 +61,8 @@ class TopLevelDistributorTestUtil : private DoneInitializeHandler
// As the above, but always inserts into default bucket space
void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr);

// TODO STRIPE replace with BucketSpaceStateMap once legacy is gone
DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept;
const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept;
BucketSpaceStateMap& bucket_space_states() noexcept;
const BucketSpaceStateMap& bucket_space_states() const noexcept;

std::unique_ptr<StripeAccessGuard> acquire_stripe_guard();

Expand Down
16 changes: 16 additions & 0 deletions storage/src/vespa/storage/distributor/bucket_space_state_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ BucketSpaceStateMap::BucketSpaceStateMap()
_map.emplace(document::FixedBucketSpaces::global_space(), std::make_unique<BucketSpaceState>());
}

const BucketSpaceState&
BucketSpaceStateMap::get(document::BucketSpace space) const
{
auto itr = _map.find(space);
assert(itr != _map.end());
return *itr->second;
}

BucketSpaceState&
BucketSpaceStateMap::get(document::BucketSpace space)
{
auto itr = _map.find(space);
assert(itr != _map.end());
return *itr->second;
}

void
BucketSpaceStateMap::set_cluster_state(std::shared_ptr<const lib::ClusterState> cluster_state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class BucketSpaceStateMap {
StateMap::const_iterator begin() const { return _map.begin(); }
StateMap::const_iterator end() const { return _map.end(); }

const BucketSpaceState& get(document::BucketSpace space) const;
BucketSpaceState& get(document::BucketSpace space);

void set_cluster_state(std::shared_ptr<const lib::ClusterState> cluster_state);
void set_distribution(std::shared_ptr<const lib::Distribution> distribution);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ DistributorComponent::DistributorComponent(DistributorInterface& distributor,
const std::string& name)
: storage::DistributorComponent(comp_reg, name),
_distributor(distributor),
_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index(), false)),
_read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index(), false))
_bucket_space_states()
{
}

Expand Down
22 changes: 7 additions & 15 deletions storage/src/vespa/storage/distributor/distributor_component.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#pragma once

#include "bucket_space_state_map.h"
#include "distributor_interface.h"
#include "distributor_node_context.h"
#include "distributor_operation_context.h"
Expand All @@ -22,9 +23,8 @@ class DistributorComponent : public storage::DistributorComponent,
public DistributorOperationContext {
private:
DistributorInterface& _distributor;
// TODO STRIPE: When legacy mode is removed, replace this with BucketSpaceStateMap.
std::unique_ptr<DistributorBucketSpaceRepo> _bucket_space_repo;
std::unique_ptr<DistributorBucketSpaceRepo> _read_only_bucket_space_repo;
BucketSpaceStateMap _bucket_space_states;


public:
DistributorComponent(DistributorInterface& distributor,
Expand All @@ -45,23 +45,15 @@ class DistributorComponent : public storage::DistributorComponent,
api::Timestamp generate_unique_timestamp() override {
return getUniqueTimestamp();
}
const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
return *_bucket_space_repo;
}
DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
return *_bucket_space_repo;
}
const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
return *_read_only_bucket_space_repo;
const BucketSpaceStateMap& bucket_space_states() const noexcept override {
return _bucket_space_states;
}
DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
return *_read_only_bucket_space_repo;
BucketSpaceStateMap& bucket_space_states() noexcept override {
return _bucket_space_states;
}
const storage::DistributorConfiguration& distributor_config() const noexcept override {
return _distributor.config();
}


};

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
#include <vespa/storageapi/defs.h>

namespace storage { class DistributorConfiguration; }
namespace storage::lib { class ClusterStateBundle; }

namespace storage::distributor {

class BucketSpaceStateMap;
class DistributorBucketSpaceRepo;

/**
Expand All @@ -19,11 +19,8 @@ class DistributorOperationContext {
public:
virtual ~DistributorOperationContext() {}
virtual api::Timestamp generate_unique_timestamp() = 0;
// TODO STRIPE: Access to bucket space repos is only temporary at this level.
virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0;
virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0;
virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
};

Expand Down
7 changes: 4 additions & 3 deletions storage/src/vespa/storage/distributor/distributor_stripe.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "blockingoperationstarter.h"
#include "distributor_stripe.h"
#include "distributor_status.h"
#include "distributor_bucket_space.h"
#include "distributor_status.h"
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
#include "stripe_host_info_notifier.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "storage_node_up_states.h"
#include "stripe_host_info_notifier.h"
#include "throttlingoperationstarter.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
#include "pendingmessagetracker.h"
#include "storage_node_up_states.h"
#include <vespa/document/select/parser.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

#include "bucketgctimecalculator.h"
#include "bucketownership.h"
#include "distributor_operation_context.h"
#include "operation_routing_snapshot.h"
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storageapi/defs.h>

namespace document { class Bucket; }
namespace storage::lib { class ClusterStateBundle; }

namespace storage::distributor {

Expand All @@ -20,9 +20,17 @@ class PendingMessageTracker;
/**
* Interface with functionality that is used when handling distributor stripe operations.
*/
class DistributorStripeOperationContext : public DistributorOperationContext {
class DistributorStripeOperationContext {
public:
virtual ~DistributorStripeOperationContext() = default;

virtual api::Timestamp generate_unique_timestamp() = 0;
virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
virtual const DistributorConfiguration& distributor_config() const noexcept = 0;

virtual void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
uint32_t update_flags = 0) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

#include <vespa/document/fieldvalue/document.h>
#include <vespa/storage/distributor/activecopy.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/operationtargetresolverimpl.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/storage_node_up_states.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/distribution/idealnodecalculatorimpl.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <algorithm>

#include <vespa/log/log.h>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "pending_bucket_space_db_transition.h"
#include "bucket_space_state_map.h"
#include "clusterinformation.h"
#include "pending_bucket_space_db_transition.h"
#include "pendingclusterstate.h"
#include "distributor_bucket_space.h"
#include "stripe_access_guard.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
Expand All @@ -19,7 +19,7 @@ using lib::NodeType;
using lib::NodeState;

PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketSpace bucket_space,
DistributorBucketSpace &distributorBucketSpace,
const BucketSpaceState &bucket_space_state,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
Expand All @@ -31,10 +31,10 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketS
_missingEntries(),
_clusterInfo(std::move(clusterInfo)),
_outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)),
_prevClusterState(distributorBucketSpace.getClusterState()),
_prevClusterState(bucket_space_state.get_cluster_state()),
_newClusterState(newClusterState),
_creationTimestamp(creationTimestamp),
_distributorBucketSpace(distributorBucketSpace),
_bucket_space_state(bucket_space_state),
_distributorIndex(_clusterInfo->getDistributorIndex()),
_bucketOwnershipTransfer(distributionChanged),
_rejectedRequests()
Expand Down Expand Up @@ -217,24 +217,11 @@ PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::Trailing
inserter.insert_at_end(bucket_id, e);
}

// TODO STRIPE remove legacy single stripe stuff
void
PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
{
BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
std::sort(_entries.begin(), _entries.end());

const auto& dist = _distributorBucketSpace.getDistribution();
DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);

db.merge(merger);
}

void
PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard)
{
std::sort(_entries.begin(), _entries.end());
const auto& dist = _distributorBucketSpace.getDistribution();
const auto& dist = _bucket_space_state.get_distribution();
guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState,
_clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
}
Expand Down Expand Up @@ -296,7 +283,7 @@ PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old,
bool
PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const
{
const auto &dist(_distributorBucketSpace.getDistribution());
const auto &dist(_bucket_space_state.get_distribution());
if (dist.getNodeGraph().getGroupForNode(index) ==
dist.getNodeGraph().getGroupForNode(_distributorIndex)) {
LOG(debug,
Expand All @@ -317,7 +304,7 @@ PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown(
uint16_t nodeIndex,
const lib::ClusterState& state) const
{
const auto &dist(_distributorBucketSpace.getDistribution());
const auto &dist(_bucket_space_state.get_distribution());
if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) {
return false; // Not doing anything for downed groups.
}
Expand Down
Loading

0 comments on commit b0f91ec

Please sign in to comment.