diff --git a/contrib/client-c b/contrib/client-c index 6231077e78d..5a89431409b 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 6231077e78d50f70e0fbe8dab1452b16185f5643 +Subproject commit 5a89431409b0a0e2f71b50d1d10cc984617948c4 diff --git a/contrib/tipb b/contrib/tipb index 0607513e7fa..07c1d4cf432 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 0607513e7fa40f3b564f0b269da76c1c8dc90032 +Subproject commit 07c1d4cf43236a98d6299afbe864b628b52c0eae diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 7a246ba1e74..be2d8ea466d 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #pragma GCC diagnostic push @@ -134,6 +135,7 @@ struct CoprocessorReaderResult bool meet_error; String error_msg; bool eof; + bool same_zone_flag; String req_info = "cop request"; DecodeDetail decode_detail; @@ -142,11 +144,13 @@ struct CoprocessorReaderResult bool meet_error_ = false, const String & error_msg_ = "", bool eof_ = false, - DecodeDetail decode_detail_ = {}) + DecodeDetail decode_detail_ = {}, + bool same_zone_flag_ = true) : resp(resp_) , meet_error(meet_error_) , error_msg(error_msg_) , eof(eof_) + , same_zone_flag(same_zone_flag_) , decode_detail(decode_detail_) {} }; @@ -158,6 +162,11 @@ class CoprocessorReader public: static constexpr bool is_streaming_reader = false; static constexpr auto name = "CoprocessorReader"; + static const inline ConnectionProfileInfo::ConnTypeVec conn_type_vec{ + ConnectionProfileInfo::InnerZoneRemote, + ConnectionProfileInfo::InterZoneRemote}; + static const Int32 inner_zone_index = 0; + static const Int32 inter_zone_index = 1; private: const DAGSchema schema; @@ -177,7 +186,8 @@ class CoprocessorReader size_t queue_size, UInt64 cop_timeout, const pingcap::kv::LabelFilter & tiflash_label_filter_, - const String & source_identifier) + const String & source_identifier, + const String & store_zone_label = "") : schema(schema_) , has_enforce_encode_type(has_enforce_encode_type_) , concurrency(concurrency_) @@ -189,7 +199,8 @@ class CoprocessorReader concurrency_, &Poco::Logger::get(fmt::format("{} pingcap/coprocessor", source_identifier)), cop_timeout, - tiflash_label_filter_) + tiflash_label_filter_, + store_zone_label) {} const DAGSchema & getOutputSchema() const { return schema; } @@ -291,7 +302,7 @@ class CoprocessorReader false}; } auto detail = decodeChunks(resp, block_queue, header, schema); - return {resp, false, "", false, detail}; + return {resp, false, "", false, detail, result.same_zone}; } else { @@ -311,7 +322,7 @@ class CoprocessorReader return toResult(result_pair, block_queue, header); } - static size_t getSourceNum() { return 1; } + static size_t getSourceNum() { return 2; } size_t getConcurrency() const { return concurrency; } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 8fa8ac976be..80a92b76ec0 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -739,7 +739,19 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve : concurrent_num * 4; bool enable_cop_stream = context.getSettingsRef().enable_cop_stream_for_remote_read; UInt64 cop_timeout = context.getSettingsRef().cop_timeout_for_remote_read; - + String store_zone_label; + auto kv_store = tmt.getKVStore(); + if likely (kv_store) + { + for (int i = 0; i < kv_store->getStoreMeta().labels_size(); ++i) + { + if (kv_store->getStoreMeta().labels().at(i).key() == "zone") + { + store_zone_label = kv_store->getStoreMeta().labels().at(i).value(); + break; + } + } + } auto coprocessor_reader = std::make_shared( schema, cluster, @@ -750,7 +762,8 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve queue_size, cop_timeout, tiflash_label_filter, - log->identifier()); + log->identifier(), + store_zone_label); context.getDAGContext()->addCoprocessorReader(coprocessor_reader); return coprocessor_reader; diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp index 584bc2ed05c..4b926e3399d 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp @@ -37,6 +37,10 @@ void ExecutionSummary::merge(const ExecutionSummary & other) num_produced_rows += other.num_produced_rows; num_iterations += other.num_iterations; concurrency += other.concurrency; + inner_zone_send_bytes += other.inner_zone_send_bytes; + inner_zone_receive_bytes += other.inner_zone_receive_bytes; + inter_zone_send_bytes += other.inter_zone_send_bytes; + inter_zone_receive_bytes += other.inter_zone_receive_bytes; ru_consumption = mergeRUConsumption(ru_consumption, other.ru_consumption); scan_context->merge(*other.scan_context); } @@ -53,6 +57,10 @@ void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other) num_produced_rows += other.num_produced_rows(); num_iterations += other.num_iterations(); concurrency += other.concurrency(); + inner_zone_send_bytes += other.tiflash_network_summary().inner_zone_send_bytes(); + inner_zone_receive_bytes += other.tiflash_network_summary().inner_zone_receive_bytes(); + inter_zone_send_bytes += other.tiflash_network_summary().inter_zone_send_bytes(); + inter_zone_receive_bytes += other.tiflash_network_summary().inter_zone_send_bytes(); ru_consumption = mergeRUConsumption(ru_consumption, parseRUConsumption(other)); scan_context->merge(other.tiflash_scan_context()); } @@ -66,6 +74,10 @@ void ExecutionSummary::fill(const BaseRuntimeStatistics & other) num_produced_rows = other.rows; num_iterations = other.blocks; concurrency = other.concurrency; + inner_zone_send_bytes = other.inner_zone_send_bytes; + inner_zone_receive_bytes = other.inner_zone_receive_bytes; + inter_zone_send_bytes = other.inter_zone_send_bytes; + inter_zone_receive_bytes = other.inter_zone_receive_bytes; } void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other) @@ -77,6 +89,10 @@ void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other) num_produced_rows = other.num_produced_rows(); num_iterations = other.num_iterations(); concurrency = other.concurrency(); + inner_zone_send_bytes = other.tiflash_network_summary().inner_zone_send_bytes(); + inner_zone_receive_bytes = other.tiflash_network_summary().inner_zone_receive_bytes(); + inter_zone_send_bytes = other.tiflash_network_summary().inter_zone_send_bytes(); + inter_zone_receive_bytes = other.tiflash_network_summary().inter_zone_send_bytes(); ru_consumption = parseRUConsumption(other); scan_context->deserialize(other.tiflash_scan_context()); } diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.h b/dbms/src/Flash/Coprocessor/ExecutionSummary.h index 400faab4cc5..21c15af5bce 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.h @@ -35,6 +35,10 @@ struct ExecutionSummary UInt64 num_produced_rows = 0; UInt64 num_iterations = 0; UInt64 concurrency = 0; + UInt64 inner_zone_send_bytes = 0; + UInt64 inner_zone_receive_bytes = 0; + UInt64 inter_zone_send_bytes = 0; + UInt64 inter_zone_receive_bytes = 0; resource_manager::Consumption ru_consumption{}; DM::ScanContextPtr scan_context; diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 10bc0db9d29..3ca078b2144 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -382,6 +382,12 @@ ExchangeReceiverBase::~ExchangeReceiverBase() } } +template +const ConnectionProfileInfo::ConnTypeVec & ExchangeReceiverBase::getConnTypeVec() const +{ + return rpc_context->getConnTypeVec(); +} + template void ExchangeReceiverBase::handleConnectionAfterException() { diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 1adc9db1b16..038f445afad 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -142,6 +143,7 @@ class ExchangeReceiverBase std::atomic * getDataSizeInQueue() { return &data_size_in_queue; } void verifyStreamId(size_t stream_id) const; + const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const; private: std::shared_ptr mem_tracker; @@ -199,9 +201,7 @@ class ExchangeReceiverBase std::shared_ptr rpc_context; - const tipb::ExchangeReceiver pb_exchange_receiver; const size_t source_num; - const ::mpp::TaskMeta task_meta; const bool enable_fine_grained_shuffle_flag; const size_t output_stream_count; const size_t max_buffer_size; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index 7b30187dae9..d1ea9e1a4a1 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -104,7 +105,9 @@ GRPCReceiverContext::GRPCReceiverContext( , task_manager(std::move(task_manager_)) , enable_local_tunnel(enable_local_tunnel_) , enable_async_grpc(enable_async_grpc_) -{} +{ + conn_type_vec.resize(exchange_receiver_meta.encoded_task_meta_size(), ConnectionProfileInfo::Local); +} ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const { @@ -120,6 +123,19 @@ ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const req.recv_task_id = task_meta.task_id(); req.req.set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); // NOLINT req.req.set_allocated_sender_meta(sender_task.release()); // NOLINT + + bool valid_zone_flag + = exchange_receiver_meta.same_zone_flag_size() == exchange_receiver_meta.encoded_task_meta_size(); + if likely (valid_zone_flag) + { + conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType( + req.is_local, + exchange_receiver_meta.same_zone_flag().Get(index)); + } + else + { + conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, true); + } return req; } diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index 431b8418f14..a4822252c0d 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -104,12 +105,15 @@ class GRPCReceiverContext LocalRequestHandler & local_request_handler, bool has_remote_conn); + const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const { return conn_type_vec; } + static std::tuple establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager); private: tipb::ExchangeReceiver exchange_receiver_meta; + mutable ConnectionProfileInfo::ConnTypeVec conn_type_vec; mpp::TaskMeta task_meta; pingcap::kv::Cluster * cluster; std::shared_ptr task_manager; diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 8e2f65c16af..5d500e994c1 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -215,6 +215,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) std::max(5, context->getSettingsRef().max_threads * 5), tunnel_queue_memory_bound); // MPMCQueue can benefit from a slightly larger queue size + bool valid_zone_flag_fields = exchange_sender.same_zone_flag_size() == exchange_sender.encoded_task_meta_size(); for (int i = 0; i < exchange_sender.encoded_task_meta_size(); ++i) { // exchange sender will register the tunnels and wait receiver to found a connection. @@ -235,6 +236,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) queue_limit, is_local, is_async, + valid_zone_flag_fields ? exchange_sender.same_zone_flag().Get(i) : true, log->identifier()); LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index be692c7d00d..d5c06273157 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -74,6 +75,7 @@ MPPTunnel::MPPTunnel( const CapacityLimits & queue_limits_, bool is_local_, bool is_async_, + bool same_zone, const String & req_id) : MPPTunnel( fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id()), @@ -81,6 +83,7 @@ MPPTunnel::MPPTunnel( queue_limits_, is_local_, is_async_, + same_zone, req_id) {} @@ -90,6 +93,7 @@ MPPTunnel::MPPTunnel( const CapacityLimits & queue_limits_, bool is_local_, bool is_async_, + bool same_zone, const String & req_id) : status(TunnelStatus::Unconnected) , timeout(timeout_) @@ -97,6 +101,7 @@ MPPTunnel::MPPTunnel( , tunnel_id(tunnel_id_) , mem_tracker(current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr) , queue_limit(queue_limits_) + , connection_profile_info(is_local_, same_zone) , log(Logger::get(req_id, tunnel_id)) , data_size_in_queue(0) { diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 4c2421437e4..799c92fd0a5 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -474,6 +474,7 @@ class MPPTunnel : private boost::noncopyable const CapacityLimits & queue_limits, bool is_local_, bool is_async_, + bool same_zone, const String & req_id); // For gtest usage @@ -483,6 +484,7 @@ class MPPTunnel : private boost::noncopyable const CapacityLimits & queue_limits, bool is_local_, bool is_async_, + bool same_zone, const String & req_id); ~MPPTunnel(); diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index ff11aef5ec1..3b50284bccc 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -255,19 +255,19 @@ class TestMPPTunnel : public testing::Test public: MPPTunnelPtr constructRemoteSyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, true, String("0")); return tunnel; } MPPTunnelPtr constructRemoteAsyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, true, String("0")); return tunnel; } MPPTunnelPtr constructLocalTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, true, String("0")); return tunnel; } diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp index 16939b28acc..ce2bae00758 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp @@ -42,4 +42,34 @@ void BaseRuntimeStatistics::append(const OperatorProfileInfo & profile_info) } ++concurrency; } + +void BaseRuntimeStatistics::updateReceiveConnectionInfo(const ConnectionProfileInfo & conn_profile_info) +{ + switch (conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + inner_zone_receive_bytes += bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + inter_zone_receive_bytes += bytes; + break; + default: + break; + } +} + +void BaseRuntimeStatistics::updateSendConnectionInfo(const ConnectionProfileInfo & conn_profile_info) +{ + switch (conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + inner_zone_send_bytes += bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + inter_zone_send_bytes += bytes; + break; + default: + break; + } +} } // namespace DB diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index 2f51a6203e6..1874b5dbd69 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include namespace DB @@ -32,8 +33,14 @@ struct BaseRuntimeStatistics UInt64 minTSO_wait_time_ns = 0; UInt64 queue_wait_time_ns = 0; UInt64 pipeline_breaker_wait_time_ns = 0; + UInt64 inner_zone_send_bytes = 0; + UInt64 inner_zone_receive_bytes = 0; + UInt64 inter_zone_send_bytes = 0; + UInt64 inter_zone_receive_bytes = 0; void append(const BlockStreamProfileInfo &); void append(const OperatorProfileInfo &); + void updateSendConnectionInfo(const ConnectionProfileInfo & conn_profile_info); + void updateReceiveConnectionInfo(const ConnectionProfileInfo & conn_profile_info); }; } // namespace DB diff --git a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h index fb0151ac9af..89b5856c863 100644 --- a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h +++ b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h @@ -14,13 +14,50 @@ #pragma once +#include #include +#include + namespace DB { + struct ConnectionProfileInfo { + enum ConnectionType + { + Local = 0, + InnerZoneRemote = 1, + InterZoneRemote = 2, + }; + using ConnTypeVec = std::vector; + static ALWAYS_INLINE ConnectionType inferConnectionType(bool is_local, bool same_zone) + { + if (is_local) + { + return Local; + } + else if (same_zone) + { + return InnerZoneRemote; + } + else + { + return InterZoneRemote; + } + } + ConnectionProfileInfo() = default; + explicit ConnectionProfileInfo(ConnectionType type_) + : type(type_) + {} + ConnectionProfileInfo(bool is_local, bool same_zone) + : ConnectionProfileInfo(inferConnectionType(is_local, same_zone)) + {} + + String getTypeString() const { return String(magic_enum::enum_name(type)); } + Int64 packets = 0; Int64 bytes = 0; + ConnectionType type = Local; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp index 0aa6abd4ae7..984acbdb350 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp @@ -20,10 +20,11 @@ namespace DB String ExchangeReceiveDetail::toJson() const { return fmt::format( - R"({{"receiver_source_task_id":{},"packets":{},"bytes":{}}})", + R"({{"receiver_source_task_id":{},"conn_type":{},"packets":{},"bytes":{}}})", receiver_source_task_id, - packets, - bytes); + conn_profile_info.getTypeString(), + conn_profile_info.packets, + conn_profile_info.bytes); } void ExchangeReceiverStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -46,8 +47,10 @@ void ExchangeReceiverStatistics::updateExchangeReceiveDetail( RUNTIME_CHECK(connection_profile_infos.size() == partition_num); for (size_t i = 0; i < partition_num; ++i) { - exchange_receive_details[i].packets += connection_profile_infos[i].packets; - exchange_receive_details[i].bytes += connection_profile_infos[i].bytes; + exchange_receive_details[i].conn_profile_info.type = connection_profile_infos[i].type; + exchange_receive_details[i].conn_profile_info.packets += connection_profile_infos[i].packets; + exchange_receive_details[i].conn_profile_info.bytes += connection_profile_infos[i].bytes; + base.updateReceiveConnectionInfo(connection_profile_infos[i]); } } diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h index 58b1a29714c..09996d02421 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h @@ -20,9 +20,10 @@ namespace DB { -struct ExchangeReceiveDetail : public ConnectionProfileInfo +struct ExchangeReceiveDetail { Int64 receiver_source_task_id; + ConnectionProfileInfo conn_profile_info; explicit ExchangeReceiveDetail(Int64 receiver_source_task_id_) : receiver_source_task_id(receiver_source_task_id_) diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index b78ae1efded..43d43adb1f5 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace DB @@ -23,13 +24,14 @@ namespace DB String MPPTunnelDetail::toJson() const { return fmt::format( - R"({{"tunnel_id":"{}","sender_target_task_id":{},"sender_target_host":"{}","is_local":{},"packets":{},"bytes":{}}})", + R"({{"tunnel_id":"{}","sender_target_task_id":{},"sender_target_host":"{}","is_local":{},"conn_type":{},"packets":{},"bytes":{}}})", tunnel_id, sender_target_task_id, sender_target_host, is_local, - packets, - bytes); + conn_profile_info.getTypeString(), + conn_profile_info.packets, + conn_profile_info.bytes); } void ExchangeSenderStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -53,8 +55,9 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() for (UInt16 i = 0; i < partition_num; ++i) { const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); - mpp_tunnel_details[i].packets = connection_profile_info.packets; - mpp_tunnel_details[i].bytes = connection_profile_info.bytes; + mpp_tunnel_details[i].conn_profile_info.packets += connection_profile_info.packets; + mpp_tunnel_details[i].conn_profile_info.bytes += connection_profile_info.bytes; + base.updateSendConnectionInfo(connection_profile_info); } } @@ -83,8 +86,12 @@ ExchangeSenderStatistics::ExchangeSenderStatistics(const tipb::Executor * execut sender_target_task_ids.push_back(task_meta.task_id()); const auto & mpp_tunnel = mpp_tunnels[i]; - mpp_tunnel_details - .emplace_back(mpp_tunnel->id(), task_meta.task_id(), task_meta.address(), mpp_tunnel->isLocal()); + mpp_tunnel_details.emplace_back( + mpp_tunnel->getConnectionProfileInfo(), + mpp_tunnel->id(), + task_meta.task_id(), + task_meta.address(), + mpp_tunnel->isLocal()); } // for root task, exchange_sender_executor.task_meta[0].address is blank or not tidb host diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.h b/dbms/src/Flash/Statistics/ExchangeSenderImpl.h index 6f00725b02f..b7efe4d7e1c 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.h +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.h @@ -20,18 +20,25 @@ namespace DB { -struct MPPTunnelDetail : public ConnectionProfileInfo +struct MPPTunnelDetail { String tunnel_id; Int64 sender_target_task_id; String sender_target_host; bool is_local; + ConnectionProfileInfo conn_profile_info; - MPPTunnelDetail(String tunnel_id_, Int64 sender_target_task_id_, String sender_target_host_, bool is_local_) + MPPTunnelDetail( + const ConnectionProfileInfo & conn_profile_info_, + String tunnel_id_, + Int64 sender_target_task_id_, + String sender_target_host_, + bool is_local_) : tunnel_id(std::move(tunnel_id_)) , sender_target_task_id(sender_target_task_id_) , sender_target_host(std::move(sender_target_host_)) , is_local(is_local_) + , conn_profile_info(conn_profile_info_) {} String toJson() const; diff --git a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp index 5f7b1222ac6..1f499d5d8b4 100644 --- a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp +++ b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp @@ -34,6 +34,12 @@ void fillTiExecutionSummary( execution_summary->mutable_tiflash_wait_summary()->set_pipeline_breaker_wait_ns( current.time_pipeline_breaker_wait_ns); execution_summary->mutable_tiflash_wait_summary()->set_pipeline_queue_wait_ns(current.time_pipeline_queue_ns); + execution_summary->mutable_tiflash_network_summary()->set_inner_zone_send_bytes(current.inner_zone_send_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inner_zone_receive_bytes( + current.inner_zone_receive_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inter_zone_send_bytes(current.inter_zone_send_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inter_zone_receive_bytes( + current.inter_zone_receive_bytes); RUNTIME_CHECK(current.ru_consumption.SerializeToString(execution_summary->mutable_ru_consumption())); // tree-based executors will have executor_id. diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 5a601e8ef0a..40d362b2932 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -13,25 +13,35 @@ // limitations under the License. #include +#include #include #include #include namespace DB { -String TableScanDetail::toJson() const +String TableScanTimeDetail::toJson() const { auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0; auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0; + return fmt::format(R"("max":{},"min":{})", max_cost_ms, min_cost_ms); +} +String LocalTableScanDetail::toJson() const +{ + return fmt::format(R"({{"is_local":false,"bytes":{},{}}})", bytes, time_detail.toJson()); +} +String RemoteTableScanDetail::toJson() const +{ return fmt::format( - R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})", - is_local, - packets, - bytes, - max_cost_ms, - min_cost_ms); + R"({{"type:":{},"packets":{},"bytes":{},"type:":{},"packets":{},"bytes":{},{}}})", + inner_zone_conn_profile_info.getTypeString(), + inner_zone_conn_profile_info.packets, + inner_zone_conn_profile_info.bytes, + inter_zone_conn_profile_info.getTypeString(), + inter_zone_conn_profile_info.packets, + inter_zone_conn_profile_info.bytes, + time_detail.toJson()); } - void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const { auto scan_ctx_it = dag_context.scan_context_map.find(executor_id); @@ -48,8 +58,19 @@ void TableScanStatistics::updateTableScanDetail(const std::vectorgetProfileInfo(); local_table_scan_detail.bytes += prof.bytes; const double this_execution_time = prof.execution_time * 1.0; - if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.max_stream_cost_ns < this_execution_time) - local_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.min_stream_cost_ns > this_execution_time) - local_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } else { @@ -93,23 +114,23 @@ void TableScanStatistics::collectExtraRuntimeDetail() { local_table_scan_detail.bytes += profile_info.operator_info->bytes; const double this_execution_time = profile_info.operator_info->execution_time * 1.0; - if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.max_stream_cost_ns < this_execution_time) - local_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.min_stream_cost_ns > this_execution_time) - local_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } else { updateTableScanDetail(profile_info.connection_profile_infos); const double this_execution_time = profile_info.operator_info->execution_time * 1.0; - if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || remote_table_scan_detail.max_stream_cost_ns < this_execution_time) - remote_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || remote_table_scan_detail.max_stream_cost_ns > this_execution_time) - remote_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + remote_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.time_detail.max_stream_cost_ns > this_execution_time) + remote_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } }); break; @@ -118,10 +139,10 @@ void TableScanStatistics::collectExtraRuntimeDetail() if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end()) { it->second->setStreamCost( - std::max(local_table_scan_detail.min_stream_cost_ns, 0.0), - std::max(local_table_scan_detail.max_stream_cost_ns, 0.0), - std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0), - std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0)); + std::max(local_table_scan_detail.time_detail.min_stream_cost_ns, 0.0), + std::max(local_table_scan_detail.time_detail.max_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.time_detail.min_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.time_detail.max_stream_cost_ns, 0.0)); } } diff --git a/dbms/src/Flash/Statistics/TableScanImpl.h b/dbms/src/Flash/Statistics/TableScanImpl.h index af5b8f43c93..b8203155ef7 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.h +++ b/dbms/src/Flash/Statistics/TableScanImpl.h @@ -20,16 +20,23 @@ namespace DB { -struct TableScanDetail : public ConnectionProfileInfo +struct TableScanTimeDetail { - const bool is_local; double min_stream_cost_ns = -1.0; double max_stream_cost_ns = -1.0; - - explicit TableScanDetail(bool is_local_) - : is_local(is_local_) - {} - + String toJson() const; +}; +struct LocalTableScanDetail +{ + Int64 bytes = 0; + TableScanTimeDetail time_detail; + String toJson() const; +}; +struct RemoteTableScanDetail +{ + ConnectionProfileInfo inner_zone_conn_profile_info{ConnectionProfileInfo::InnerZoneRemote}; + ConnectionProfileInfo inter_zone_conn_profile_info{ConnectionProfileInfo::InterZoneRemote}; + TableScanTimeDetail time_detail; String toJson() const; }; @@ -54,8 +61,8 @@ class TableScanStatistics : public TableScanStatisticsBase TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_); private: - TableScanDetail local_table_scan_detail{true}; - TableScanDetail remote_table_scan_detail{false}; + LocalTableScanDetail local_table_scan_detail; + RemoteTableScanDetail remote_table_scan_detail; protected: void appendExtraJson(FmtBuffer &) const override; diff --git a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp index 45d6f3d32fb..5c7fe9d2ef0 100644 --- a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp +++ b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -24,7 +25,10 @@ CoprocessorReaderSourceOp::CoprocessorReaderSourceOp( CoprocessorReaderPtr coprocessor_reader_) : SourceOp(exec_context_, req_id) , coprocessor_reader(coprocessor_reader_) - , io_profile_info(IOProfileInfo::createForRemote(profile_info_ptr, coprocessor_reader->getSourceNum())) + , io_profile_info(IOProfileInfo::createForRemote( + profile_info_ptr, + coprocessor_reader->getSourceNum(), + CoprocessorReader::conn_type_vec)) { assert(coprocessor_reader); setHeader(Block(getColumnWithTypeAndName(toNamesAndTypes(coprocessor_reader->getOutputSchema())))); @@ -90,9 +94,13 @@ OperatorStatus CoprocessorReaderSourceOp::readImpl(Block & block) } const auto & decode_detail = result.decode_detail; - auto & connection_profile_info = io_profile_info->connection_profile_infos[0]; - connection_profile_info.packets += decode_detail.packets; - connection_profile_info.bytes += decode_detail.packet_bytes; + auto conn_profile_info_index = CoprocessorReader::inner_zone_index; + if (!result.same_zone_flag) + { + conn_profile_info_index = CoprocessorReader::inter_zone_index; + } + io_profile_info->connection_profile_infos[conn_profile_info_index].packets += decode_detail.packets; + io_profile_info->connection_profile_infos[conn_profile_info_index].bytes += decode_detail.packet_bytes; total_rows += decode_detail.rows; LOG_TRACE( diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.h b/dbms/src/Operators/ExchangeReceiverSourceOp.h index 9185f51bf0d..3105c93ebd3 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.h +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.h @@ -32,7 +32,10 @@ class ExchangeReceiverSourceOp : public SourceOp : SourceOp(exec_context_, req_id) , exchange_receiver(exchange_receiver_) , stream_id(stream_id_) - , io_profile_info(IOProfileInfo::createForRemote(profile_info_ptr, exchange_receiver->getSourceNum())) + , io_profile_info(IOProfileInfo::createForRemote( + profile_info_ptr, + exchange_receiver->getSourceNum(), + exchange_receiver->getConnTypeVec())) { exchange_receiver->verifyStreamId(stream_id); setHeader(Block(getColumnWithTypeAndName(toNamesAndTypes(exchange_receiver->getOutputSchema())))); diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 036a33fb66d..2515096ad44 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -44,6 +45,21 @@ struct IOProfileInfo return info; } + static IOProfileInfoPtr createForRemote( + const OperatorProfileInfoPtr & profile_info, + size_t connections, + const ConnectionProfileInfo::ConnTypeVec & conn_type_vec) + { + RUNTIME_CHECK(connections == conn_type_vec.size()); + auto info = std::make_shared(profile_info, false); + info->connection_profile_infos.resize(connections); + for (size_t i = 0; i < connections; ++i) + { + info->connection_profile_infos[i].type = conn_type_vec[i]; + } + return info; + } + OperatorProfileInfoPtr operator_info; const bool is_local;