Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats inner/inter zone network traffic for mpp tasks #9747

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
21 changes: 16 additions & 5 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Flash/Coprocessor/ChunkDecodeAndSquash.h>
#include <Flash/Coprocessor/DecodeDetail.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <common/logger_useful.h>

#pragma GCC diagnostic push
Expand Down Expand Up @@ -134,6 +135,7 @@ struct CoprocessorReaderResult
bool meet_error;
String error_msg;
bool eof;
bool same_zone_flag;
String req_info = "cop request";
DecodeDetail decode_detail;

Expand All @@ -142,11 +144,13 @@ struct CoprocessorReaderResult
bool meet_error_ = false,
const String & error_msg_ = "",
bool eof_ = false,
DecodeDetail decode_detail_ = {})
DecodeDetail decode_detail_ = {},
bool same_zone_flag_ = true)
: resp(resp_)
, meet_error(meet_error_)
, error_msg(error_msg_)
, eof(eof_)
, same_zone_flag(same_zone_flag_)
, decode_detail(decode_detail_)
{}
};
Expand All @@ -158,6 +162,11 @@ class CoprocessorReader
public:
static constexpr bool is_streaming_reader = false;
static constexpr auto name = "CoprocessorReader";
static const inline ConnectionProfileInfo::ConnTypeVec conn_type_vec{
ConnectionProfileInfo::InnerZoneRemote,
ConnectionProfileInfo::InterZoneRemote};
static const Int32 inner_zone_index = 0;
static const Int32 inter_zone_index = 1;

private:
const DAGSchema schema;
Expand All @@ -177,7 +186,8 @@ class CoprocessorReader
size_t queue_size,
UInt64 cop_timeout,
const pingcap::kv::LabelFilter & tiflash_label_filter_,
const String & source_identifier)
const String & source_identifier,
const String & store_zone_label = "")
: schema(schema_)
, has_enforce_encode_type(has_enforce_encode_type_)
, concurrency(concurrency_)
Expand All @@ -189,7 +199,8 @@ class CoprocessorReader
concurrency_,
&Poco::Logger::get(fmt::format("{} pingcap/coprocessor", source_identifier)),
cop_timeout,
tiflash_label_filter_)
tiflash_label_filter_,
store_zone_label)
{}

const DAGSchema & getOutputSchema() const { return schema; }
Expand Down Expand Up @@ -291,7 +302,7 @@ class CoprocessorReader
false};
}
auto detail = decodeChunks(resp, block_queue, header, schema);
return {resp, false, "", false, detail};
return {resp, false, "", false, detail, result.same_zone};
}
else
{
Expand All @@ -311,7 +322,7 @@ class CoprocessorReader
return toResult(result_pair, block_queue, header);
}

static size_t getSourceNum() { return 1; }
static size_t getSourceNum() { return 2; }

size_t getConcurrency() const { return concurrency; }

Expand Down
17 changes: 15 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,19 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve
: concurrent_num * 4;
bool enable_cop_stream = context.getSettingsRef().enable_cop_stream_for_remote_read;
UInt64 cop_timeout = context.getSettingsRef().cop_timeout_for_remote_read;

String store_zone_label;
auto kv_store = tmt.getKVStore();
if likely (kv_store)
{
for (int i = 0; i < kv_store->getStoreMeta().labels_size(); ++i)
{
if (kv_store->getStoreMeta().labels().at(i).key() == "zone")
{
store_zone_label = kv_store->getStoreMeta().labels().at(i).value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we break the loop if got the zone lable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updated

break;
}
}
}
auto coprocessor_reader = std::make_shared<CoprocessorReader>(
schema,
cluster,
Expand All @@ -750,7 +762,8 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve
queue_size,
cop_timeout,
tiflash_label_filter,
log->identifier());
log->identifier(),
store_zone_label);
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);

return coprocessor_reader;
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ void ExecutionSummary::merge(const ExecutionSummary & other)
num_produced_rows += other.num_produced_rows;
num_iterations += other.num_iterations;
concurrency += other.concurrency;
inner_zone_send_bytes += other.inner_zone_send_bytes;
inner_zone_receive_bytes += other.inner_zone_receive_bytes;
inter_zone_send_bytes += other.inter_zone_send_bytes;
inter_zone_receive_bytes += other.inter_zone_receive_bytes;
ru_consumption = mergeRUConsumption(ru_consumption, other.ru_consumption);
scan_context->merge(*other.scan_context);
}
Expand All @@ -53,6 +57,10 @@ void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other)
num_produced_rows += other.num_produced_rows();
num_iterations += other.num_iterations();
concurrency += other.concurrency();
inner_zone_send_bytes += other.tiflash_network_summary().inner_zone_send_bytes();
inner_zone_receive_bytes += other.tiflash_network_summary().inner_zone_receive_bytes();
inter_zone_send_bytes += other.tiflash_network_summary().inter_zone_send_bytes();
inter_zone_receive_bytes += other.tiflash_network_summary().inter_zone_send_bytes();
ru_consumption = mergeRUConsumption(ru_consumption, parseRUConsumption(other));
scan_context->merge(other.tiflash_scan_context());
}
Expand All @@ -66,6 +74,10 @@ void ExecutionSummary::fill(const BaseRuntimeStatistics & other)
num_produced_rows = other.rows;
num_iterations = other.blocks;
concurrency = other.concurrency;
inner_zone_send_bytes = other.inner_zone_send_bytes;
inner_zone_receive_bytes = other.inner_zone_receive_bytes;
inter_zone_send_bytes = other.inter_zone_send_bytes;
inter_zone_receive_bytes = other.inter_zone_receive_bytes;
}

void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other)
Expand All @@ -77,6 +89,10 @@ void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other)
num_produced_rows = other.num_produced_rows();
num_iterations = other.num_iterations();
concurrency = other.concurrency();
inner_zone_send_bytes = other.tiflash_network_summary().inner_zone_send_bytes();
inner_zone_receive_bytes = other.tiflash_network_summary().inner_zone_receive_bytes();
inter_zone_send_bytes = other.tiflash_network_summary().inter_zone_send_bytes();
inter_zone_receive_bytes = other.tiflash_network_summary().inter_zone_send_bytes();
ru_consumption = parseRUConsumption(other);
scan_context->deserialize(other.tiflash_scan_context());
}
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ struct ExecutionSummary
UInt64 num_produced_rows = 0;
UInt64 num_iterations = 0;
UInt64 concurrency = 0;
UInt64 inner_zone_send_bytes = 0;
UInt64 inner_zone_receive_bytes = 0;
UInt64 inter_zone_send_bytes = 0;
UInt64 inter_zone_receive_bytes = 0;
resource_manager::Consumption ru_consumption{};

DM::ScanContextPtr scan_context;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
}
}

template <typename RPCContext>
const ConnectionProfileInfo::ConnTypeVec & ExchangeReceiverBase<RPCContext>::getConnTypeVec() const
{
return rpc_context->getConnTypeVec();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::handleConnectionAfterException()
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Mpp/AsyncRequestHandler.h>
#include <Flash/Mpp/GRPCReceiverContext.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -142,6 +143,7 @@ class ExchangeReceiverBase
std::atomic<Int64> * getDataSizeInQueue() { return &data_size_in_queue; }

void verifyStreamId(size_t stream_id) const;
const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const;

private:
std::shared_ptr<MemoryTracker> mem_tracker;
Expand Down Expand Up @@ -199,9 +201,7 @@ class ExchangeReceiverBase

std::shared_ptr<RPCContext> rpc_context;

const tipb::ExchangeReceiver pb_exchange_receiver;
const size_t source_num;
const ::mpp::TaskMeta task_meta;
const bool enable_fine_grained_shuffle_flag;
const size_t output_stream_count;
const size_t max_buffer_size;
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/GRPCCompletionQueuePool.h>
#include <Flash/Mpp/GRPCReceiverContext.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <fmt/core.h>
#include <grpcpp/completion_queue.h>

Expand Down Expand Up @@ -104,7 +105,9 @@ GRPCReceiverContext::GRPCReceiverContext(
, task_manager(std::move(task_manager_))
, enable_local_tunnel(enable_local_tunnel_)
, enable_async_grpc(enable_async_grpc_)
{}
{
conn_type_vec.resize(exchange_receiver_meta.encoded_task_meta_size(), ConnectionProfileInfo::Local);
}

ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const
{
Expand All @@ -120,6 +123,19 @@ ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const
req.recv_task_id = task_meta.task_id();
req.req.set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); // NOLINT
req.req.set_allocated_sender_meta(sender_task.release()); // NOLINT

bool valid_zone_flag
= exchange_receiver_meta.same_zone_flag_size() == exchange_receiver_meta.encoded_task_meta_size();
if likely (valid_zone_flag)
{
conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(
req.is_local,
exchange_receiver_meta.same_zone_flag().Get(index));
}
else
{
conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, true);
}
return req;
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Mpp/LocalRequestHandler.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <common/types.h>
#include <grpcpp/completion_queue.h>
#include <kvproto/mpp.pb.h>
Expand Down Expand Up @@ -104,12 +105,15 @@ class GRPCReceiverContext
LocalRequestHandler & local_request_handler,
bool has_remote_conn);

const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const { return conn_type_vec; }

static std::tuple<MPPTunnelPtr, grpc::Status> establishMPPConnectionLocalV1(
const ::mpp::EstablishMPPConnectionRequest * request,
const std::shared_ptr<MPPTaskManager> & task_manager);

private:
tipb::ExchangeReceiver exchange_receiver_meta;
mutable ConnectionProfileInfo::ConnTypeVec conn_type_vec;
mpp::TaskMeta task_meta;
pingcap::kv::Cluster * cluster;
std::shared_ptr<MPPTaskManager> task_manager;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
std::max(5, context->getSettingsRef().max_threads * 5),
tunnel_queue_memory_bound); // MPMCQueue can benefit from a slightly larger queue size

bool valid_zone_flag_fields = exchange_sender.same_zone_flag_size() == exchange_sender.encoded_task_meta_size();
for (int i = 0; i < exchange_sender.encoded_task_meta_size(); ++i)
{
// exchange sender will register the tunnels and wait receiver to found a connection.
Expand All @@ -235,6 +236,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
queue_limit,
is_local,
is_async,
valid_zone_flag_fields ? exchange_sender.same_zone_flag().Get(i) : true,
log->identifier());

LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/PacketWriter.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <fmt/core.h>

#include <magic_enum.hpp>
Expand Down Expand Up @@ -74,13 +75,15 @@ MPPTunnel::MPPTunnel(
const CapacityLimits & queue_limits_,
bool is_local_,
bool is_async_,
bool same_zone,
const String & req_id)
: MPPTunnel(
fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id()),
timeout_,
queue_limits_,
is_local_,
is_async_,
same_zone,
req_id)
{}

Expand All @@ -90,13 +93,15 @@ MPPTunnel::MPPTunnel(
const CapacityLimits & queue_limits_,
bool is_local_,
bool is_async_,
bool same_zone,
const String & req_id)
: status(TunnelStatus::Unconnected)
, timeout(timeout_)
, timeout_nanoseconds(timeout_.count() * 1000000000ULL)
, tunnel_id(tunnel_id_)
, mem_tracker(current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr)
, queue_limit(queue_limits_)
, connection_profile_info(is_local_, same_zone)
, log(Logger::get(req_id, tunnel_id))
, data_size_in_queue(0)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ class MPPTunnel : private boost::noncopyable
const CapacityLimits & queue_limits,
bool is_local_,
bool is_async_,
bool same_zone,
const String & req_id);

// For gtest usage
Expand All @@ -500,6 +501,7 @@ class MPPTunnel : private boost::noncopyable
const CapacityLimits & queue_limits,
bool is_local_,
bool is_async_,
bool same_zone,
const String & req_id);

~MPPTunnel();
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,19 @@ class TestMPPTunnel : public testing::Test
public:
MPPTunnelPtr constructRemoteSyncTunnel()
{
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, false, false, String("0"));
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, false, false, true, String("0"));
return tunnel;
}

MPPTunnelPtr constructRemoteAsyncTunnel()
{
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, false, true, String("0"));
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, false, true, true, String("0"));
return tunnel;
}

MPPTunnelPtr constructLocalTunnel()
{
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, true, false, String("0"));
auto tunnel = std::make_shared<MPPTunnel>(String("0000_0001"), timeout, 2, true, false, true, String("0"));
return tunnel;
}

Expand Down
Loading