From 6d04578c9f5d8c96b30d584d8164e042e0f3544d Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 18 Dec 2024 13:25:33 +0800 Subject: [PATCH 01/17] update tipb protocol Signed-off-by: yibin --- contrib/tipb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tipb b/contrib/tipb index 0607513e7fa..246f9118835 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 0607513e7fa40f3b564f0b269da76c1c8dc90032 +Subproject commit 246f9118835736a7ee41c6a13755d56fe36aeae6 From 4f0c1cfb9c29eaff57595cee776f2fa384a393fa Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 18 Dec 2024 15:08:07 +0800 Subject: [PATCH 02/17] update tipb code Signed-off-by: yibin --- contrib/tipb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tipb b/contrib/tipb index 246f9118835..8408c760e82 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 246f9118835736a7ee41c6a13755d56fe36aeae6 +Subproject commit 8408c760e822f7cd7564ce280842af8a417f0a47 From f6d70e1e5b6715dc5535cbfb43f44f99ec4996dd Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 19 Dec 2024 11:59:21 +0800 Subject: [PATCH 03/17] update tipb commit Signed-off-by: yibin --- contrib/tipb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tipb b/contrib/tipb index 8408c760e82..07c1d4cf432 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 8408c760e822f7cd7564ce280842af8a417f0a47 +Subproject commit 07c1d4cf43236a98d6299afbe864b628b52c0eae From 20937bf66a9b21c41af1a7404ef368208224b1de Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 23 Dec 2024 13:54:01 +0800 Subject: [PATCH 04/17] Stats exchange sender network traffic Signed-off-by: yibin --- .../Flash/Coprocessor/ExecutionSummary.cpp | 16 ++++++++++ dbms/src/Flash/Coprocessor/ExecutionSummary.h | 4 +++ dbms/src/Flash/Mpp/MPPTask.cpp | 2 ++ dbms/src/Flash/Mpp/MPPTunnel.cpp | 6 ++++ dbms/src/Flash/Mpp/MPPTunnel.h | 2 ++ dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 6 ++-- .../Flash/Statistics/BaseRuntimeStatistics.h | 4 +++ .../Flash/Statistics/ConnectionProfileInfo.h | 28 +++++++++++++++- .../Flash/Statistics/ExchangeSenderImpl.cpp | 32 +++++++++++++++---- .../src/Flash/Statistics/ExchangeSenderImpl.h | 11 +++++-- .../Statistics/ExecutionSummaryHelper.cpp | 6 ++++ 11 files changed, 104 insertions(+), 13 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp index 584bc2ed05c..4b926e3399d 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp @@ -37,6 +37,10 @@ void ExecutionSummary::merge(const ExecutionSummary & other) num_produced_rows += other.num_produced_rows; num_iterations += other.num_iterations; concurrency += other.concurrency; + inner_zone_send_bytes += other.inner_zone_send_bytes; + inner_zone_receive_bytes += other.inner_zone_receive_bytes; + inter_zone_send_bytes += other.inter_zone_send_bytes; + inter_zone_receive_bytes += other.inter_zone_receive_bytes; ru_consumption = mergeRUConsumption(ru_consumption, other.ru_consumption); scan_context->merge(*other.scan_context); } @@ -53,6 +57,10 @@ void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other) num_produced_rows += other.num_produced_rows(); num_iterations += other.num_iterations(); concurrency += other.concurrency(); + inner_zone_send_bytes += other.tiflash_network_summary().inner_zone_send_bytes(); + inner_zone_receive_bytes += other.tiflash_network_summary().inner_zone_receive_bytes(); + inter_zone_send_bytes += other.tiflash_network_summary().inter_zone_send_bytes(); + inter_zone_receive_bytes += other.tiflash_network_summary().inter_zone_send_bytes(); ru_consumption = mergeRUConsumption(ru_consumption, parseRUConsumption(other)); scan_context->merge(other.tiflash_scan_context()); } @@ -66,6 +74,10 @@ void ExecutionSummary::fill(const BaseRuntimeStatistics & other) num_produced_rows = other.rows; num_iterations = other.blocks; concurrency = other.concurrency; + inner_zone_send_bytes = other.inner_zone_send_bytes; + inner_zone_receive_bytes = other.inner_zone_receive_bytes; + inter_zone_send_bytes = other.inter_zone_send_bytes; + inter_zone_receive_bytes = other.inter_zone_receive_bytes; } void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other) @@ -77,6 +89,10 @@ void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other) num_produced_rows = other.num_produced_rows(); num_iterations = other.num_iterations(); concurrency = other.concurrency(); + inner_zone_send_bytes = other.tiflash_network_summary().inner_zone_send_bytes(); + inner_zone_receive_bytes = other.tiflash_network_summary().inner_zone_receive_bytes(); + inter_zone_send_bytes = other.tiflash_network_summary().inter_zone_send_bytes(); + inter_zone_receive_bytes = other.tiflash_network_summary().inter_zone_send_bytes(); ru_consumption = parseRUConsumption(other); scan_context->deserialize(other.tiflash_scan_context()); } diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.h b/dbms/src/Flash/Coprocessor/ExecutionSummary.h index 400faab4cc5..21c15af5bce 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.h @@ -35,6 +35,10 @@ struct ExecutionSummary UInt64 num_produced_rows = 0; UInt64 num_iterations = 0; UInt64 concurrency = 0; + UInt64 inner_zone_send_bytes = 0; + UInt64 inner_zone_receive_bytes = 0; + UInt64 inter_zone_send_bytes = 0; + UInt64 inter_zone_receive_bytes = 0; resource_manager::Consumption ru_consumption{}; DM::ScanContextPtr scan_context; diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 8e2f65c16af..6e6c639d031 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -215,6 +215,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) std::max(5, context->getSettingsRef().max_threads * 5), tunnel_queue_memory_bound); // MPMCQueue can benefit from a slightly larger queue size + bool safe_zone_flag_fields = exchange_sender.same_zone_flag_size() == exchange_sender.encoded_task_meta_size(); for (int i = 0; i < exchange_sender.encoded_task_meta_size(); ++i) { // exchange sender will register the tunnels and wait receiver to found a connection. @@ -235,6 +236,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) queue_limit, is_local, is_async, + safe_zone_flag_fields ? exchange_sender.same_zone_flag().Get(i) : true, log->identifier()); LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index be692c7d00d..71962f526fc 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -23,6 +23,8 @@ #include +#include "Flash/Statistics/ConnectionProfileInfo.h" + namespace DB { namespace FailPoints @@ -74,6 +76,7 @@ MPPTunnel::MPPTunnel( const CapacityLimits & queue_limits_, bool is_local_, bool is_async_, + bool same_zone, const String & req_id) : MPPTunnel( fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id()), @@ -81,6 +84,7 @@ MPPTunnel::MPPTunnel( queue_limits_, is_local_, is_async_, + same_zone, req_id) {} @@ -90,6 +94,7 @@ MPPTunnel::MPPTunnel( const CapacityLimits & queue_limits_, bool is_local_, bool is_async_, + bool same_zone, const String & req_id) : status(TunnelStatus::Unconnected) , timeout(timeout_) @@ -107,6 +112,7 @@ MPPTunnel::MPPTunnel( mode = TunnelSenderMode::ASYNC_GRPC; else mode = TunnelSenderMode::SYNC_GRPC; + connection_profile_info.type = ConnectionProfileInfo::inferConnectionType(is_local_, same_zone); GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 4c2421437e4..799c92fd0a5 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -474,6 +474,7 @@ class MPPTunnel : private boost::noncopyable const CapacityLimits & queue_limits, bool is_local_, bool is_async_, + bool same_zone, const String & req_id); // For gtest usage @@ -483,6 +484,7 @@ class MPPTunnel : private boost::noncopyable const CapacityLimits & queue_limits, bool is_local_, bool is_async_, + bool same_zone, const String & req_id); ~MPPTunnel(); diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index ff11aef5ec1..3b50284bccc 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -255,19 +255,19 @@ class TestMPPTunnel : public testing::Test public: MPPTunnelPtr constructRemoteSyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, true, String("0")); return tunnel; } MPPTunnelPtr constructRemoteAsyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, true, String("0")); return tunnel; } MPPTunnelPtr constructLocalTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, true, String("0")); return tunnel; } diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index 2f51a6203e6..e2c024827ac 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -32,6 +32,10 @@ struct BaseRuntimeStatistics UInt64 minTSO_wait_time_ns = 0; UInt64 queue_wait_time_ns = 0; UInt64 pipeline_breaker_wait_time_ns = 0; + UInt64 inner_zone_send_bytes = 0; + UInt64 inner_zone_receive_bytes = 0; + UInt64 inter_zone_send_bytes = 0; + UInt64 inter_zone_receive_bytes = 0; void append(const BlockStreamProfileInfo &); void append(const OperatorProfileInfo &); diff --git a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h index fb0151ac9af..fe20a1918f0 100644 --- a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h +++ b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h @@ -16,11 +16,37 @@ #include +#include "common/defines.h" + namespace DB { + struct ConnectionProfileInfo { + enum ConnectionType + { + Local = 0, + InnerZoneRemote = 1, + InterZoneRemote = 2, + }; + static ALWAYS_INLINE ConnectionType inferConnectionType(bool is_local, bool same_zone) + { + if (is_local) + { + return Local; + } + else if (same_zone) + { + return InnerZoneRemote; + } + else + { + return InterZoneRemote; + } + } + Int64 packets = 0; Int64 bytes = 0; + ConnectionType type = Local; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index b78ae1efded..90ccebff9ed 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -18,18 +18,21 @@ #include #include +#include "Flash/Statistics/ConnectionProfileInfo.h" + namespace DB { String MPPTunnelDetail::toJson() const { return fmt::format( - R"({{"tunnel_id":"{}","sender_target_task_id":{},"sender_target_host":"{}","is_local":{},"packets":{},"bytes":{}}})", + R"({{"tunnel_id":"{}","sender_target_task_id":{},"sender_target_host":"{}","is_local":{},"conn_type":{},"packets":{},"bytes":{}}})", tunnel_id, sender_target_task_id, sender_target_host, is_local, - packets, - bytes); + static_cast(conn_profile_info.type), + conn_profile_info.packets, + conn_profile_info.bytes); } void ExchangeSenderStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -53,8 +56,19 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() for (UInt16 i = 0; i < partition_num; ++i) { const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); - mpp_tunnel_details[i].packets = connection_profile_info.packets; - mpp_tunnel_details[i].bytes = connection_profile_info.bytes; + mpp_tunnel_details[i].conn_profile_info.packets = connection_profile_info.packets; + mpp_tunnel_details[i].conn_profile_info.bytes = connection_profile_info.bytes; + switch (mpp_tunnel_details[i].conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + base.inner_zone_send_bytes += connection_profile_info.bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + base.inter_zone_send_bytes += connection_profile_info.bytes; + break; + default: + break; + } } } @@ -83,8 +97,12 @@ ExchangeSenderStatistics::ExchangeSenderStatistics(const tipb::Executor * execut sender_target_task_ids.push_back(task_meta.task_id()); const auto & mpp_tunnel = mpp_tunnels[i]; - mpp_tunnel_details - .emplace_back(mpp_tunnel->id(), task_meta.task_id(), task_meta.address(), mpp_tunnel->isLocal()); + mpp_tunnel_details.emplace_back( + mpp_tunnel->getConnectionProfileInfo(), + mpp_tunnel->id(), + task_meta.task_id(), + task_meta.address(), + mpp_tunnel->isLocal()); } // for root task, exchange_sender_executor.task_meta[0].address is blank or not tidb host diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.h b/dbms/src/Flash/Statistics/ExchangeSenderImpl.h index 6f00725b02f..b7efe4d7e1c 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.h +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.h @@ -20,18 +20,25 @@ namespace DB { -struct MPPTunnelDetail : public ConnectionProfileInfo +struct MPPTunnelDetail { String tunnel_id; Int64 sender_target_task_id; String sender_target_host; bool is_local; + ConnectionProfileInfo conn_profile_info; - MPPTunnelDetail(String tunnel_id_, Int64 sender_target_task_id_, String sender_target_host_, bool is_local_) + MPPTunnelDetail( + const ConnectionProfileInfo & conn_profile_info_, + String tunnel_id_, + Int64 sender_target_task_id_, + String sender_target_host_, + bool is_local_) : tunnel_id(std::move(tunnel_id_)) , sender_target_task_id(sender_target_task_id_) , sender_target_host(std::move(sender_target_host_)) , is_local(is_local_) + , conn_profile_info(conn_profile_info_) {} String toJson() const; diff --git a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp index 5f7b1222ac6..1f499d5d8b4 100644 --- a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp +++ b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp @@ -34,6 +34,12 @@ void fillTiExecutionSummary( execution_summary->mutable_tiflash_wait_summary()->set_pipeline_breaker_wait_ns( current.time_pipeline_breaker_wait_ns); execution_summary->mutable_tiflash_wait_summary()->set_pipeline_queue_wait_ns(current.time_pipeline_queue_ns); + execution_summary->mutable_tiflash_network_summary()->set_inner_zone_send_bytes(current.inner_zone_send_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inner_zone_receive_bytes( + current.inner_zone_receive_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inter_zone_send_bytes(current.inter_zone_send_bytes); + execution_summary->mutable_tiflash_network_summary()->set_inter_zone_receive_bytes( + current.inter_zone_receive_bytes); RUNTIME_CHECK(current.ru_consumption.SerializeToString(execution_summary->mutable_ru_consumption())); // tree-based executors will have executor_id. From e63021f55e20dd48b0d12d5334693b8bafbf145c Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 23 Dec 2024 15:17:47 +0800 Subject: [PATCH 05/17] Save work Signed-off-by: yibin --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 2 +- .../Flash/Statistics/ConnectionProfileInfo.h | 12 +++++++++- .../Flash/Statistics/ExchangeReceiverImpl.cpp | 24 +++++++++++++++---- .../Flash/Statistics/ExchangeReceiverImpl.h | 3 ++- .../Flash/Statistics/ExchangeSenderImpl.cpp | 9 +++---- 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 71962f526fc..ba99ee9890b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -102,6 +102,7 @@ MPPTunnel::MPPTunnel( , tunnel_id(tunnel_id_) , mem_tracker(current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr) , queue_limit(queue_limits_) + , connection_profile_info(is_local_, same_zone) , log(Logger::get(req_id, tunnel_id)) , data_size_in_queue(0) { @@ -112,7 +113,6 @@ MPPTunnel::MPPTunnel( mode = TunnelSenderMode::ASYNC_GRPC; else mode = TunnelSenderMode::SYNC_GRPC; - connection_profile_info.type = ConnectionProfileInfo::inferConnectionType(is_local_, same_zone); GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); } diff --git a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h index fe20a1918f0..2a7df49565f 100644 --- a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h +++ b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h @@ -14,9 +14,10 @@ #pragma once +#include #include -#include "common/defines.h" +#include namespace DB { @@ -44,6 +45,15 @@ struct ConnectionProfileInfo return InterZoneRemote; } } + ConnectionProfileInfo() = default; + explicit ConnectionProfileInfo(ConnectionType type_) + : type(type_) + {} + ConnectionProfileInfo(bool is_local, bool same_zone) + : ConnectionProfileInfo(inferConnectionType(is_local, same_zone)) + {} + + String getTypeString() const { return String(magic_enum::enum_name(type)); } Int64 packets = 0; Int64 bytes = 0; diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp index 0aa6abd4ae7..fb2e21e51b7 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp @@ -20,10 +20,11 @@ namespace DB String ExchangeReceiveDetail::toJson() const { return fmt::format( - R"({{"receiver_source_task_id":{},"packets":{},"bytes":{}}})", + R"({{"receiver_source_task_id":{},"conn_type":{},"packets":{},"bytes":{}}})", receiver_source_task_id, - packets, - bytes); + conn_profile_info.getTypeString(), + conn_profile_info.packets, + conn_profile_info.bytes); } void ExchangeReceiverStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -46,8 +47,21 @@ void ExchangeReceiverStatistics::updateExchangeReceiveDetail( RUNTIME_CHECK(connection_profile_infos.size() == partition_num); for (size_t i = 0; i < partition_num; ++i) { - exchange_receive_details[i].packets += connection_profile_infos[i].packets; - exchange_receive_details[i].bytes += connection_profile_infos[i].bytes; + exchange_receive_details[i].conn_profile_info.type = connection_profile_infos[i].type; + exchange_receive_details[i].conn_profile_info.packets += connection_profile_infos[i].packets; + auto bytes = connection_profile_infos[i].bytes; + exchange_receive_details[i].conn_profile_info.bytes += bytes; + switch (exchange_receive_details[i].conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + base.inner_zone_receive_bytes += bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + base.inter_zone_receive_bytes += bytes; + break; + default: + break; + } } } diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h index 58b1a29714c..09996d02421 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.h @@ -20,9 +20,10 @@ namespace DB { -struct ExchangeReceiveDetail : public ConnectionProfileInfo +struct ExchangeReceiveDetail { Int64 receiver_source_task_id; + ConnectionProfileInfo conn_profile_info; explicit ExchangeReceiveDetail(Int64 receiver_source_task_id_) : receiver_source_task_id(receiver_source_task_id_) diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index 90ccebff9ed..ba800981caf 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -30,7 +30,7 @@ String MPPTunnelDetail::toJson() const sender_target_task_id, sender_target_host, is_local, - static_cast(conn_profile_info.type), + conn_profile_info.getTypeString(), conn_profile_info.packets, conn_profile_info.bytes); } @@ -57,14 +57,15 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() { const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); mpp_tunnel_details[i].conn_profile_info.packets = connection_profile_info.packets; - mpp_tunnel_details[i].conn_profile_info.bytes = connection_profile_info.bytes; + auto bytes = connection_profile_info.bytes; + mpp_tunnel_details[i].conn_profile_info.bytes += bytes; switch (mpp_tunnel_details[i].conn_profile_info.type) { case ConnectionProfileInfo::InnerZoneRemote: - base.inner_zone_send_bytes += connection_profile_info.bytes; + base.inner_zone_send_bytes += bytes; break; case DB::ConnectionProfileInfo::InterZoneRemote: - base.inter_zone_send_bytes += connection_profile_info.bytes; + base.inter_zone_send_bytes += bytes; break; default: break; From 3b78e35685418a0689f4d09954a80806fb9ddcdd Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 23 Dec 2024 16:21:42 +0800 Subject: [PATCH 06/17] Handle exchange receiver network traffic Signed-off-by: yibin --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 6 ++++++ dbms/src/Flash/Mpp/ExchangeReceiver.h | 4 ++++ dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 12 +++++++++++- dbms/src/Flash/Mpp/GRPCReceiverContext.h | 4 ++++ dbms/src/Flash/Statistics/ConnectionProfileInfo.h | 1 + dbms/src/Operators/ExchangeReceiverSourceOp.h | 5 ++++- dbms/src/Operators/IOProfileInfo.h | 14 ++++++++++++++ 7 files changed, 44 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 10bc0db9d29..7e86b89ea30 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -382,6 +382,12 @@ ExchangeReceiverBase::~ExchangeReceiverBase() } } +template +const ConnTypeVec & ExchangeReceiverBase::getConnTypeVec() const +{ + return rpc_context->getConnTypeVec(); +} + template void ExchangeReceiverBase::handleConnectionAfterException() { diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 1adc9db1b16..2adcf080bfc 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -19,10 +19,13 @@ #include #include #include +#include #include #include + + namespace DB { constexpr Int32 batch_packet_count_v1 = 16; @@ -142,6 +145,7 @@ class ExchangeReceiverBase std::atomic * getDataSizeInQueue() { return &data_size_in_queue; } void verifyStreamId(size_t stream_id) const; + const ConnTypeVec & getConnTypeVec() const; private: std::shared_ptr mem_tracker; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index 7b30187dae9..933bf705b73 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -22,6 +22,7 @@ #include #include +#include "Flash/Statistics/ConnectionProfileInfo.h" namespace DB { @@ -104,7 +105,9 @@ GRPCReceiverContext::GRPCReceiverContext( , task_manager(std::move(task_manager_)) , enable_local_tunnel(enable_local_tunnel_) , enable_async_grpc(enable_async_grpc_) -{} +{ + conn_type_vec.resize(exchange_receiver_meta.encoded_task_meta_size(), ConnectionProfileInfo::Local); +} ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const { @@ -120,6 +123,13 @@ ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const req.recv_task_id = task_meta.task_id(); req.req.set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); // NOLINT req.req.set_allocated_sender_meta(sender_task.release()); // NOLINT + + bool valid_zone_flag = exchange_receiver_meta.same_zone_flag_size() == exchange_receiver_meta.encoded_task_meta_size(); + if likely (valid_zone_flag) { + conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, exchange_receiver_meta.same_zone_flag().Get(index)); + } else { + conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, true); + } return req; } diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index 431b8418f14..78a8d5a8a35 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -104,12 +105,15 @@ class GRPCReceiverContext LocalRequestHandler & local_request_handler, bool has_remote_conn); + const ConnTypeVec & getConnTypeVec() const { return conn_type_vec; } + static std::tuple establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, const std::shared_ptr & task_manager); private: tipb::ExchangeReceiver exchange_receiver_meta; + mutable ConnTypeVec conn_type_vec; mpp::TaskMeta task_meta; pingcap::kv::Cluster * cluster; std::shared_ptr task_manager; diff --git a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h index 2a7df49565f..89b5856c863 100644 --- a/dbms/src/Flash/Statistics/ConnectionProfileInfo.h +++ b/dbms/src/Flash/Statistics/ConnectionProfileInfo.h @@ -30,6 +30,7 @@ struct ConnectionProfileInfo InnerZoneRemote = 1, InterZoneRemote = 2, }; + using ConnTypeVec = std::vector; static ALWAYS_INLINE ConnectionType inferConnectionType(bool is_local, bool same_zone) { if (is_local) diff --git a/dbms/src/Operators/ExchangeReceiverSourceOp.h b/dbms/src/Operators/ExchangeReceiverSourceOp.h index 9185f51bf0d..3105c93ebd3 100644 --- a/dbms/src/Operators/ExchangeReceiverSourceOp.h +++ b/dbms/src/Operators/ExchangeReceiverSourceOp.h @@ -32,7 +32,10 @@ class ExchangeReceiverSourceOp : public SourceOp : SourceOp(exec_context_, req_id) , exchange_receiver(exchange_receiver_) , stream_id(stream_id_) - , io_profile_info(IOProfileInfo::createForRemote(profile_info_ptr, exchange_receiver->getSourceNum())) + , io_profile_info(IOProfileInfo::createForRemote( + profile_info_ptr, + exchange_receiver->getSourceNum(), + exchange_receiver->getConnTypeVec())) { exchange_receiver->verifyStreamId(stream_id); setHeader(Block(getColumnWithTypeAndName(toNamesAndTypes(exchange_receiver->getOutputSchema())))); diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 036a33fb66d..5b1f42ecf08 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -44,6 +44,20 @@ struct IOProfileInfo return info; } + static IOProfileInfoPtr createForRemote( + const OperatorProfileInfoPtr & profile_info, + size_t connections, + const ConnTypeVec & conn_type_vec) + { + auto info = std::make_shared(profile_info, false); + info->connection_profile_infos.resize(connections); + for (size_t i = 0; i < connections; ++i) + { + info->connection_profile_infos[i].type = conn_type_vec[i]; + } + return info; + } + OperatorProfileInfoPtr operator_info; const bool is_local; From 8f8b27f387f0b6ca2cf4d48267e2eed41f6f7ad1 Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 23 Dec 2024 16:50:18 +0800 Subject: [PATCH 07/17] Little refact Signed-off-by: yibin --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Mpp/ExchangeReceiver.h | 3 +-- dbms/src/Flash/Mpp/GRPCReceiverContext.h | 4 ++-- dbms/src/Operators/IOProfileInfo.h | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 7e86b89ea30..3ca078b2144 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -383,7 +383,7 @@ ExchangeReceiverBase::~ExchangeReceiverBase() } template -const ConnTypeVec & ExchangeReceiverBase::getConnTypeVec() const +const ConnectionProfileInfo::ConnTypeVec & ExchangeReceiverBase::getConnTypeVec() const { return rpc_context->getConnTypeVec(); } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 2adcf080bfc..08bf7031a39 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -25,7 +25,6 @@ #include - namespace DB { constexpr Int32 batch_packet_count_v1 = 16; @@ -145,7 +144,7 @@ class ExchangeReceiverBase std::atomic * getDataSizeInQueue() { return &data_size_in_queue; } void verifyStreamId(size_t stream_id) const; - const ConnTypeVec & getConnTypeVec() const; + const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const; private: std::shared_ptr mem_tracker; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.h b/dbms/src/Flash/Mpp/GRPCReceiverContext.h index 78a8d5a8a35..a4822252c0d 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.h +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.h @@ -105,7 +105,7 @@ class GRPCReceiverContext LocalRequestHandler & local_request_handler, bool has_remote_conn); - const ConnTypeVec & getConnTypeVec() const { return conn_type_vec; } + const ConnectionProfileInfo::ConnTypeVec & getConnTypeVec() const { return conn_type_vec; } static std::tuple establishMPPConnectionLocalV1( const ::mpp::EstablishMPPConnectionRequest * request, @@ -113,7 +113,7 @@ class GRPCReceiverContext private: tipb::ExchangeReceiver exchange_receiver_meta; - mutable ConnTypeVec conn_type_vec; + mutable ConnectionProfileInfo::ConnTypeVec conn_type_vec; mpp::TaskMeta task_meta; pingcap::kv::Cluster * cluster; std::shared_ptr task_manager; diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 5b1f42ecf08..4562e787967 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -47,7 +47,7 @@ struct IOProfileInfo static IOProfileInfoPtr createForRemote( const OperatorProfileInfoPtr & profile_info, size_t connections, - const ConnTypeVec & conn_type_vec) + const ConnectionProfileInfo::ConnTypeVec & conn_type_vec) { auto info = std::make_shared(profile_info, false); info->connection_profile_infos.resize(connections); From 7791104e313e0379c57487a544fc68062ebe087d Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 23 Dec 2024 17:25:22 +0800 Subject: [PATCH 08/17] Little refact Signed-off-by: yibin --- dbms/src/Flash/Mpp/ExchangeReceiver.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 08bf7031a39..f17ff0165f2 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -202,9 +202,7 @@ class ExchangeReceiverBase std::shared_ptr rpc_context; - const tipb::ExchangeReceiver pb_exchange_receiver; const size_t source_num; - const ::mpp::TaskMeta task_meta; const bool enable_fine_grained_shuffle_flag; const size_t output_stream_count; const size_t max_buffer_size; From 055d64d40f272c7b900d3063bc52ff7a537339dd Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 27 Dec 2024 11:05:01 +0800 Subject: [PATCH 09/17] Add coprocessor remote read connection network stats Signed-off-by: yibin --- .../src/Flash/Coprocessor/CoprocessorReader.h | 20 ++++++++++++++----- .../Statistics/BaseRuntimeStatistics.cpp | 15 ++++++++++++++ .../Flash/Statistics/BaseRuntimeStatistics.h | 3 +++ .../Flash/Statistics/ExchangeReceiverImpl.cpp | 15 ++------------ .../Flash/Statistics/ExchangeSenderImpl.cpp | 15 ++------------ dbms/src/Flash/Statistics/TableScanImpl.cpp | 16 ++++++++------- dbms/src/Flash/Statistics/TableScanImpl.h | 3 ++- .../Operators/CoprocessorReaderSourceOp.cpp | 20 +++++++++++++++---- dbms/src/Operators/IOProfileInfo.h | 3 +++ 9 files changed, 67 insertions(+), 43 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 7a246ba1e74..44a6a112111 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -23,6 +23,8 @@ #include #include +#include "Flash/Statistics/ConnectionProfileInfo.h" + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wdeprecated-declarations" @@ -134,6 +136,7 @@ struct CoprocessorReaderResult bool meet_error; String error_msg; bool eof; + bool same_zone_flag; String req_info = "cop request"; DecodeDetail decode_detail; @@ -142,11 +145,13 @@ struct CoprocessorReaderResult bool meet_error_ = false, const String & error_msg_ = "", bool eof_ = false, - DecodeDetail decode_detail_ = {}) + DecodeDetail decode_detail_ = {}, + bool same_zone_flag_ = true) : resp(resp_) , meet_error(meet_error_) , error_msg(error_msg_) , eof(eof_) + , same_zone_flag(same_zone_flag_) , decode_detail(decode_detail_) {} }; @@ -158,6 +163,9 @@ class CoprocessorReader public: static constexpr bool is_streaming_reader = false; static constexpr auto name = "CoprocessorReader"; + static const inline ConnectionProfileInfo::ConnTypeVec conn_type_vec{ + ConnectionProfileInfo::InnerZoneRemote, + ConnectionProfileInfo::InterZoneRemote}; private: const DAGSchema schema; @@ -177,7 +185,8 @@ class CoprocessorReader size_t queue_size, UInt64 cop_timeout, const pingcap::kv::LabelFilter & tiflash_label_filter_, - const String & source_identifier) + const String & source_identifier, + const String & store_zone_label = "") : schema(schema_) , has_enforce_encode_type(has_enforce_encode_type_) , concurrency(concurrency_) @@ -189,7 +198,8 @@ class CoprocessorReader concurrency_, &Poco::Logger::get(fmt::format("{} pingcap/coprocessor", source_identifier)), cop_timeout, - tiflash_label_filter_) + tiflash_label_filter_, + store_zone_label) {} const DAGSchema & getOutputSchema() const { return schema; } @@ -291,7 +301,7 @@ class CoprocessorReader false}; } auto detail = decodeChunks(resp, block_queue, header, schema); - return {resp, false, "", false, detail}; + return {resp, false, "", false, detail, result.same_zone}; } else { @@ -311,7 +321,7 @@ class CoprocessorReader return toResult(result_pair, block_queue, header); } - static size_t getSourceNum() { return 1; } + static size_t getSourceNum() { return 2; } size_t getConcurrency() const { return concurrency; } diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp index 16939b28acc..dc46c685219 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp @@ -42,4 +42,19 @@ void BaseRuntimeStatistics::append(const OperatorProfileInfo & profile_info) } ++concurrency; } + +void BaseRuntimeStatistics::updateConnectionInfo(const ConnectionProfileInfo & conn_profile_info) +{ + switch (conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + inner_zone_receive_bytes += bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + inter_zone_receive_bytes += bytes; + break; + default: + break; + } +} } // namespace DB diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index e2c024827ac..ccba983a8fb 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -17,6 +17,8 @@ #include #include +#include "Flash/Statistics/ConnectionProfileInfo.h" + namespace DB { struct BlockStreamProfileInfo; @@ -39,5 +41,6 @@ struct BaseRuntimeStatistics void append(const BlockStreamProfileInfo &); void append(const OperatorProfileInfo &); + void updateConnectionInfo(const ConnectionProfileInfo & conn_profile_info); }; } // namespace DB diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp index fb2e21e51b7..ba99efafd5e 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp @@ -49,19 +49,8 @@ void ExchangeReceiverStatistics::updateExchangeReceiveDetail( { exchange_receive_details[i].conn_profile_info.type = connection_profile_infos[i].type; exchange_receive_details[i].conn_profile_info.packets += connection_profile_infos[i].packets; - auto bytes = connection_profile_infos[i].bytes; - exchange_receive_details[i].conn_profile_info.bytes += bytes; - switch (exchange_receive_details[i].conn_profile_info.type) - { - case ConnectionProfileInfo::InnerZoneRemote: - base.inner_zone_receive_bytes += bytes; - break; - case DB::ConnectionProfileInfo::InterZoneRemote: - base.inter_zone_receive_bytes += bytes; - break; - default: - break; - } + exchange_receive_details[i].conn_profile_info.bytes += connection_profile_infos[i].bytes; + base.updateConnectionInfo(connection_profile_infos[i]); } } diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index ba800981caf..29335bc58b5 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -57,19 +57,8 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() { const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); mpp_tunnel_details[i].conn_profile_info.packets = connection_profile_info.packets; - auto bytes = connection_profile_info.bytes; - mpp_tunnel_details[i].conn_profile_info.bytes += bytes; - switch (mpp_tunnel_details[i].conn_profile_info.type) - { - case ConnectionProfileInfo::InnerZoneRemote: - base.inner_zone_send_bytes += bytes; - break; - case DB::ConnectionProfileInfo::InterZoneRemote: - base.inter_zone_send_bytes += bytes; - break; - default: - break; - } + mpp_tunnel_details[i].conn_profile_info.bytes += connection_profile_info.bytes; + base.updateConnectionInfo(connection_profile_info); } } diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 5a601e8ef0a..706f27e0bcf 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -24,10 +24,11 @@ String TableScanDetail::toJson() const auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0; auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0; return fmt::format( - R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})", + R"({{"is_local":{},"type:":{},"packets":{},"bytes":{},"max":{},"min":{}}})", is_local, - packets, - bytes, + conn_profile_info.getTypeString(), + conn_profile_info.packets, + conn_profile_info.bytes, max_cost_ms, min_cost_ms); } @@ -48,8 +49,9 @@ void TableScanStatistics::updateTableScanDetail(const std::vectorgetProfileInfo(); - local_table_scan_detail.bytes += prof.bytes; + local_table_scan_detail.conn_profile_info.bytes += prof.bytes; const double this_execution_time = prof.execution_time * 1.0; if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited || local_table_scan_detail.max_stream_cost_ns < this_execution_time) @@ -91,7 +93,7 @@ void TableScanStatistics::collectExtraRuntimeDetail() transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) { if (profile_info.is_local) { - local_table_scan_detail.bytes += profile_info.operator_info->bytes; + local_table_scan_detail.conn_profile_info.bytes += profile_info.operator_info->bytes; const double this_execution_time = profile_info.operator_info->execution_time * 1.0; if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited || local_table_scan_detail.max_stream_cost_ns < this_execution_time) diff --git a/dbms/src/Flash/Statistics/TableScanImpl.h b/dbms/src/Flash/Statistics/TableScanImpl.h index af5b8f43c93..927bcf3901c 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.h +++ b/dbms/src/Flash/Statistics/TableScanImpl.h @@ -20,9 +20,10 @@ namespace DB { -struct TableScanDetail : public ConnectionProfileInfo +struct TableScanDetail { const bool is_local; + ConnectionProfileInfo conn_profile_info; double min_stream_cost_ns = -1.0; double max_stream_cost_ns = -1.0; diff --git a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp index 45d6f3d32fb..230321df323 100644 --- a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp +++ b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp @@ -16,6 +16,8 @@ #include #include +#include "Flash/Coprocessor/CoprocessorReader.h" + namespace DB { CoprocessorReaderSourceOp::CoprocessorReaderSourceOp( @@ -24,7 +26,10 @@ CoprocessorReaderSourceOp::CoprocessorReaderSourceOp( CoprocessorReaderPtr coprocessor_reader_) : SourceOp(exec_context_, req_id) , coprocessor_reader(coprocessor_reader_) - , io_profile_info(IOProfileInfo::createForRemote(profile_info_ptr, coprocessor_reader->getSourceNum())) + , io_profile_info(IOProfileInfo::createForRemote( + profile_info_ptr, + coprocessor_reader->getSourceNum(), + CoprocessorReader::conn_type_vec)) { assert(coprocessor_reader); setHeader(Block(getColumnWithTypeAndName(toNamesAndTypes(coprocessor_reader->getOutputSchema())))); @@ -90,9 +95,16 @@ OperatorStatus CoprocessorReaderSourceOp::readImpl(Block & block) } const auto & decode_detail = result.decode_detail; - auto & connection_profile_info = io_profile_info->connection_profile_infos[0]; - connection_profile_info.packets += decode_detail.packets; - connection_profile_info.bytes += decode_detail.packet_bytes; + if (result.same_zone_flag) + { + io_profile_info->connection_profile_infos[0].packets += decode_detail.packets; + io_profile_info->connection_profile_infos[0].bytes += decode_detail.packet_bytes; + } + else + { + io_profile_info->connection_profile_infos[1].packets += decode_detail.packets; + io_profile_info->connection_profile_infos[1].bytes += decode_detail.packet_bytes; + } total_rows += decode_detail.rows; LOG_TRACE( diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 4562e787967..188301ae701 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -18,6 +18,8 @@ #include #include +#include "Common/Exception.h" + namespace DB { struct IOProfileInfo; @@ -49,6 +51,7 @@ struct IOProfileInfo size_t connections, const ConnectionProfileInfo::ConnTypeVec & conn_type_vec) { + RUNTIME_CHECK(connections == conn_type_vec.size()); auto info = std::make_shared(profile_info, false); info->connection_profile_infos.resize(connections); for (size_t i = 0; i < connections; ++i) From 7b08b91646fdc5eac0360691bc31e02031c5a9c3 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 27 Dec 2024 12:02:35 +0800 Subject: [PATCH 10/17] Get store zone label using basic way Signed-off-by: yibin --- .../Flash/Coprocessor/DAGStorageInterpreter.cpp | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 8fa8ac976be..ad58d27519b 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -739,7 +739,18 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve : concurrent_num * 4; bool enable_cop_stream = context.getSettingsRef().enable_cop_stream_for_remote_read; UInt64 cop_timeout = context.getSettingsRef().cop_timeout_for_remote_read; - + String store_zone_label; + auto kv_store = tmt.getKVStore(); + if likely (kv_store) + { + for (int i = 0; i < kv_store->getStoreMeta().labels_size(); ++i) + { + if (kv_store->getStoreMeta().labels().at(i).key() == "zone") + { + store_zone_label = kv_store->getStoreMeta().labels().at(i).value(); + } + } + } auto coprocessor_reader = std::make_shared( schema, cluster, @@ -750,7 +761,8 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve queue_size, cop_timeout, tiflash_label_filter, - log->identifier()); + log->identifier(), + store_zone_label); context.getDAGContext()->addCoprocessorReader(coprocessor_reader); return coprocessor_reader; From 8a66f36684eccb944d0ac90bb811a7c69ee48d68 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 31 Dec 2024 11:19:50 +0800 Subject: [PATCH 11/17] Save work Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index ad58d27519b..50453d232af 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -751,6 +751,7 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve } } } + LOG_DEBUG(log, "KVStore Zone Label {}", store_zone_label); auto coprocessor_reader = std::make_shared( schema, cluster, From 6c561e0e55ac693251f1b92a8eda89145edf1cb3 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 31 Dec 2024 16:46:35 +0800 Subject: [PATCH 12/17] refine tableScan statistics Signed-off-by: yibin --- .../Coprocessor/DAGStorageInterpreter.cpp | 1 - dbms/src/Flash/Statistics/TableScanImpl.cpp | 88 +++++++++++-------- dbms/src/Flash/Statistics/TableScanImpl.h | 28 +++--- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 50453d232af..ad58d27519b 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -751,7 +751,6 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve } } } - LOG_DEBUG(log, "KVStore Zone Label {}", store_zone_label); auto coprocessor_reader = std::make_shared( schema, cluster, diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 706f27e0bcf..5d4a1e162a2 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -17,22 +17,32 @@ #include #include +#include "Flash/Statistics/ConnectionProfileInfo.h" + namespace DB { -String TableScanDetail::toJson() const +String TableScanTimeDetail::toJson() const { auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0; auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0; + return fmt::format(R"("max":{},"min":{})", max_cost_ms, min_cost_ms); +} +String LocalTableScanDetail::toJson() const +{ + return fmt::format(R"({{"is_local":false,"bytes":{},{}}})", bytes, time_detail.toJson()); +} +String RemoteTableScanDetail::toJson() const +{ return fmt::format( - R"({{"is_local":{},"type:":{},"packets":{},"bytes":{},"max":{},"min":{}}})", - is_local, - conn_profile_info.getTypeString(), - conn_profile_info.packets, - conn_profile_info.bytes, - max_cost_ms, - min_cost_ms); + R"({{"type:":{},"packets":{},"bytes":{},"type:":{},"packets":{},"bytes":{},{}}})", + inner_zone_conn_profile_info.getTypeString(), + inner_zone_conn_profile_info.packets, + inner_zone_conn_profile_info.bytes, + inter_zone_conn_profile_info.getTypeString(), + inter_zone_conn_profile_info.packets, + inter_zone_conn_profile_info.bytes, + time_detail.toJson()); } - void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const { auto scan_ctx_it = dag_context.scan_context_map.find(executor_id); @@ -49,8 +59,16 @@ void TableScanStatistics::updateTableScanDetail(const std::vectorgetProfileInfo(); - local_table_scan_detail.conn_profile_info.bytes += prof.bytes; + local_table_scan_detail.bytes += prof.bytes; const double this_execution_time = prof.execution_time * 1.0; - if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.max_stream_cost_ns < this_execution_time) - local_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.min_stream_cost_ns > this_execution_time) - local_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } else { @@ -93,25 +111,25 @@ void TableScanStatistics::collectExtraRuntimeDetail() transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) { if (profile_info.is_local) { - local_table_scan_detail.conn_profile_info.bytes += profile_info.operator_info->bytes; + local_table_scan_detail.bytes += profile_info.operator_info->bytes; const double this_execution_time = profile_info.operator_info->execution_time * 1.0; - if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.max_stream_cost_ns < this_execution_time) - local_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || local_table_scan_detail.min_stream_cost_ns > this_execution_time) - local_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.time_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } else { updateTableScanDetail(profile_info.connection_profile_infos); const double this_execution_time = profile_info.operator_info->execution_time * 1.0; - if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited - || remote_table_scan_detail.max_stream_cost_ns < this_execution_time) - remote_table_scan_detail.max_stream_cost_ns = this_execution_time; - if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited - || remote_table_scan_detail.max_stream_cost_ns > this_execution_time) - remote_table_scan_detail.min_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.time_detail.max_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.time_detail.max_stream_cost_ns < this_execution_time) + remote_table_scan_detail.time_detail.max_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.time_detail.min_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.time_detail.max_stream_cost_ns > this_execution_time) + remote_table_scan_detail.time_detail.min_stream_cost_ns = this_execution_time; } }); break; @@ -120,10 +138,10 @@ void TableScanStatistics::collectExtraRuntimeDetail() if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end()) { it->second->setStreamCost( - std::max(local_table_scan_detail.min_stream_cost_ns, 0.0), - std::max(local_table_scan_detail.max_stream_cost_ns, 0.0), - std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0), - std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0)); + std::max(local_table_scan_detail.time_detail.min_stream_cost_ns, 0.0), + std::max(local_table_scan_detail.time_detail.max_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.time_detail.min_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.time_detail.max_stream_cost_ns, 0.0)); } } diff --git a/dbms/src/Flash/Statistics/TableScanImpl.h b/dbms/src/Flash/Statistics/TableScanImpl.h index 927bcf3901c..64f239bf8f3 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.h +++ b/dbms/src/Flash/Statistics/TableScanImpl.h @@ -18,19 +18,27 @@ #include #include +#include "common/types.h" + namespace DB { -struct TableScanDetail +struct TableScanTimeDetail { - const bool is_local; - ConnectionProfileInfo conn_profile_info; double min_stream_cost_ns = -1.0; double max_stream_cost_ns = -1.0; - - explicit TableScanDetail(bool is_local_) - : is_local(is_local_) - {} - + String toJson() const; +}; +struct LocalTableScanDetail +{ + Int64 bytes = 0; + TableScanTimeDetail time_detail; + String toJson() const; +}; +struct RemoteTableScanDetail +{ + ConnectionProfileInfo inner_zone_conn_profile_info{ConnectionProfileInfo::InnerZoneRemote}; + ConnectionProfileInfo inter_zone_conn_profile_info{ConnectionProfileInfo::InterZoneRemote}; + TableScanTimeDetail time_detail; String toJson() const; }; @@ -55,8 +63,8 @@ class TableScanStatistics : public TableScanStatisticsBase TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_); private: - TableScanDetail local_table_scan_detail{true}; - TableScanDetail remote_table_scan_detail{false}; + LocalTableScanDetail local_table_scan_detail; + RemoteTableScanDetail remote_table_scan_detail; protected: void appendExtraJson(FmtBuffer &) const override; From 7d4a2099c1845f01efeb673ecb4c3a335766e031 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 2 Jan 2025 10:19:19 +0800 Subject: [PATCH 13/17] Little refact Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/CoprocessorReader.h | 3 +-- dbms/src/Flash/Mpp/ExchangeReceiver.h | 1 - dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 16 +++++++++++----- dbms/src/Flash/Mpp/MPPTunnel.cpp | 3 +-- .../src/Flash/Statistics/BaseRuntimeStatistics.h | 3 +-- dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp | 3 +-- dbms/src/Flash/Statistics/TableScanImpl.cpp | 3 +-- dbms/src/Operators/CoprocessorReaderSourceOp.cpp | 3 +-- dbms/src/Operators/IOProfileInfo.h | 3 +-- 9 files changed, 18 insertions(+), 20 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 44a6a112111..3ebe782b88d 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -21,10 +21,9 @@ #include #include #include +#include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" - #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wdeprecated-declarations" diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index f17ff0165f2..038f445afad 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -24,7 +24,6 @@ #include #include - namespace DB { constexpr Int32 batch_packet_count_v1 = 16; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index 933bf705b73..d1ea9e1a4a1 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -17,12 +17,12 @@ #include #include #include +#include #include #include #include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" namespace DB { @@ -124,10 +124,16 @@ ExchangeRecvRequest GRPCReceiverContext::makeRequest(int index) const req.req.set_allocated_receiver_meta(new mpp::TaskMeta(task_meta)); // NOLINT req.req.set_allocated_sender_meta(sender_task.release()); // NOLINT - bool valid_zone_flag = exchange_receiver_meta.same_zone_flag_size() == exchange_receiver_meta.encoded_task_meta_size(); - if likely (valid_zone_flag) { - conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, exchange_receiver_meta.same_zone_flag().Get(index)); - } else { + bool valid_zone_flag + = exchange_receiver_meta.same_zone_flag_size() == exchange_receiver_meta.encoded_task_meta_size(); + if likely (valid_zone_flag) + { + conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType( + req.is_local, + exchange_receiver_meta.same_zone_flag().Get(index)); + } + else + { conn_type_vec[index] = ConnectionProfileInfo::inferConnectionType(req.is_local, true); } return req; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index ba99ee9890b..d5c06273157 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -19,12 +19,11 @@ #include #include #include +#include #include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" - namespace DB { namespace FailPoints diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index ccba983a8fb..c05a6029c2a 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -15,10 +15,9 @@ #pragma once #include +#include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" - namespace DB { struct BlockStreamProfileInfo; diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index 29335bc58b5..b915fe2c469 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -16,10 +16,9 @@ #include #include #include +#include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" - namespace DB { String MPPTunnelDetail::toJson() const diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 5d4a1e162a2..26ec32aec9c 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -13,12 +13,11 @@ // limitations under the License. #include +#include #include #include #include -#include "Flash/Statistics/ConnectionProfileInfo.h" - namespace DB { String TableScanTimeDetail::toJson() const diff --git a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp index 230321df323..04b82f73149 100644 --- a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp +++ b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp @@ -13,11 +13,10 @@ // limitations under the License. #include +#include #include #include -#include "Flash/Coprocessor/CoprocessorReader.h" - namespace DB { CoprocessorReaderSourceOp::CoprocessorReaderSourceOp( diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 188301ae701..2515096ad44 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -14,12 +14,11 @@ #pragma once +#include #include #include #include -#include "Common/Exception.h" - namespace DB { struct IOProfileInfo; From f8852efeb6be8f9133fe649f1e3499340e812548 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 2 Jan 2025 17:20:27 +0800 Subject: [PATCH 14/17] Fix incorrect update issue Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/CoprocessorReader.h | 2 ++ .../Flash/Statistics/BaseRuntimeStatistics.cpp | 17 ++++++++++++++++- .../Flash/Statistics/BaseRuntimeStatistics.h | 3 ++- .../Flash/Statistics/ExchangeReceiverImpl.cpp | 2 +- .../src/Flash/Statistics/ExchangeSenderImpl.cpp | 2 +- dbms/src/Flash/Statistics/TableScanImpl.cpp | 4 +++- dbms/src/Flash/Statistics/TableScanImpl.h | 2 -- .../src/Operators/CoprocessorReaderSourceOp.cpp | 13 +++++-------- 8 files changed, 30 insertions(+), 15 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 3ebe782b88d..be2d8ea466d 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -165,6 +165,8 @@ class CoprocessorReader static const inline ConnectionProfileInfo::ConnTypeVec conn_type_vec{ ConnectionProfileInfo::InnerZoneRemote, ConnectionProfileInfo::InterZoneRemote}; + static const Int32 inner_zone_index = 0; + static const Int32 inter_zone_index = 1; private: const DAGSchema schema; diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp index dc46c685219..ce2bae00758 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.cpp @@ -43,7 +43,7 @@ void BaseRuntimeStatistics::append(const OperatorProfileInfo & profile_info) ++concurrency; } -void BaseRuntimeStatistics::updateConnectionInfo(const ConnectionProfileInfo & conn_profile_info) +void BaseRuntimeStatistics::updateReceiveConnectionInfo(const ConnectionProfileInfo & conn_profile_info) { switch (conn_profile_info.type) { @@ -57,4 +57,19 @@ void BaseRuntimeStatistics::updateConnectionInfo(const ConnectionProfileInfo & c break; } } + +void BaseRuntimeStatistics::updateSendConnectionInfo(const ConnectionProfileInfo & conn_profile_info) +{ + switch (conn_profile_info.type) + { + case ConnectionProfileInfo::InnerZoneRemote: + inner_zone_send_bytes += bytes; + break; + case DB::ConnectionProfileInfo::InterZoneRemote: + inter_zone_send_bytes += bytes; + break; + default: + break; + } +} } // namespace DB diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index c05a6029c2a..1874b5dbd69 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -40,6 +40,7 @@ struct BaseRuntimeStatistics void append(const BlockStreamProfileInfo &); void append(const OperatorProfileInfo &); - void updateConnectionInfo(const ConnectionProfileInfo & conn_profile_info); + void updateSendConnectionInfo(const ConnectionProfileInfo & conn_profile_info); + void updateReceiveConnectionInfo(const ConnectionProfileInfo & conn_profile_info); }; } // namespace DB diff --git a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp index ba99efafd5e..984acbdb350 100644 --- a/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeReceiverImpl.cpp @@ -50,7 +50,7 @@ void ExchangeReceiverStatistics::updateExchangeReceiveDetail( exchange_receive_details[i].conn_profile_info.type = connection_profile_infos[i].type; exchange_receive_details[i].conn_profile_info.packets += connection_profile_infos[i].packets; exchange_receive_details[i].conn_profile_info.bytes += connection_profile_infos[i].bytes; - base.updateConnectionInfo(connection_profile_infos[i]); + base.updateReceiveConnectionInfo(connection_profile_infos[i]); } } diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index b915fe2c469..1000dca8ebd 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -57,7 +57,7 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); mpp_tunnel_details[i].conn_profile_info.packets = connection_profile_info.packets; mpp_tunnel_details[i].conn_profile_info.bytes += connection_profile_info.bytes; - base.updateConnectionInfo(connection_profile_info); + base.updateSendConnectionInfo(connection_profile_info); } } diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 26ec32aec9c..40d362b2932 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -68,7 +68,9 @@ void TableScanStatistics::updateTableScanDetail(const std::vector #include -#include "common/types.h" - namespace DB { struct TableScanTimeDetail diff --git a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp index 04b82f73149..5c7fe9d2ef0 100644 --- a/dbms/src/Operators/CoprocessorReaderSourceOp.cpp +++ b/dbms/src/Operators/CoprocessorReaderSourceOp.cpp @@ -94,16 +94,13 @@ OperatorStatus CoprocessorReaderSourceOp::readImpl(Block & block) } const auto & decode_detail = result.decode_detail; - if (result.same_zone_flag) + auto conn_profile_info_index = CoprocessorReader::inner_zone_index; + if (!result.same_zone_flag) { - io_profile_info->connection_profile_infos[0].packets += decode_detail.packets; - io_profile_info->connection_profile_infos[0].bytes += decode_detail.packet_bytes; - } - else - { - io_profile_info->connection_profile_infos[1].packets += decode_detail.packets; - io_profile_info->connection_profile_infos[1].bytes += decode_detail.packet_bytes; + conn_profile_info_index = CoprocessorReader::inter_zone_index; } + io_profile_info->connection_profile_infos[conn_profile_info_index].packets += decode_detail.packets; + io_profile_info->connection_profile_infos[conn_profile_info_index].bytes += decode_detail.packet_bytes; total_rows += decode_detail.rows; LOG_TRACE( From 476a33f0fe8dfd9fce25741356fa5b1b7f6d0113 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 2 Jan 2025 17:40:16 +0800 Subject: [PATCH 15/17] Little fix and refact Signed-off-by: yibin --- dbms/src/Flash/Mpp/MPPTask.cpp | 4 ++-- dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 6e6c639d031..5d500e994c1 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -215,7 +215,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) std::max(5, context->getSettingsRef().max_threads * 5), tunnel_queue_memory_bound); // MPMCQueue can benefit from a slightly larger queue size - bool safe_zone_flag_fields = exchange_sender.same_zone_flag_size() == exchange_sender.encoded_task_meta_size(); + bool valid_zone_flag_fields = exchange_sender.same_zone_flag_size() == exchange_sender.encoded_task_meta_size(); for (int i = 0; i < exchange_sender.encoded_task_meta_size(); ++i) { // exchange sender will register the tunnels and wait receiver to found a connection. @@ -236,7 +236,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) queue_limit, is_local, is_async, - safe_zone_flag_fields ? exchange_sender.same_zone_flag().Get(i) : true, + valid_zone_flag_fields ? exchange_sender.same_zone_flag().Get(i) : true, log->identifier()); LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); diff --git a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp index 1000dca8ebd..43d43adb1f5 100644 --- a/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp +++ b/dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp @@ -55,7 +55,7 @@ void ExchangeSenderStatistics::collectExtraRuntimeDetail() for (UInt16 i = 0; i < partition_num; ++i) { const auto & connection_profile_info = mpp_tunnels[i]->getConnectionProfileInfo(); - mpp_tunnel_details[i].conn_profile_info.packets = connection_profile_info.packets; + mpp_tunnel_details[i].conn_profile_info.packets += connection_profile_info.packets; mpp_tunnel_details[i].conn_profile_info.bytes += connection_profile_info.bytes; base.updateSendConnectionInfo(connection_profile_info); } From 1246495db107061a0108fc94e05a841c7315755b Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 2 Jan 2025 17:52:15 +0800 Subject: [PATCH 16/17] Address comments Signed-off-by: yibin --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index ad58d27519b..80a92b76ec0 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -748,6 +748,7 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve if (kv_store->getStoreMeta().labels().at(i).key() == "zone") { store_zone_label = kv_store->getStoreMeta().labels().at(i).value(); + break; } } } From e1de41638469c12024a2cf66a9ae80258817136a Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 3 Jan 2025 11:42:56 +0800 Subject: [PATCH 17/17] Update client-c version Signed-off-by: yibin --- contrib/client-c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/client-c b/contrib/client-c index 6231077e78d..5a89431409b 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 6231077e78d50f70e0fbe8dab1452b16185f5643 +Subproject commit 5a89431409b0a0e2f71b50d1d10cc984617948c4