From 96313442e9100037af3c494bd2cafa5de9ff0f0e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 16 Oct 2024 11:46:04 +0800 Subject: [PATCH 1/9] log Signed-off-by: guo-shaoge --- .../TaskQueues/ResourceControlQueue.cpp | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp index 515155aee6d..a7c935a9db3 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp @@ -120,16 +120,6 @@ bool ResourceControlQueue::take(TaskPtr & task) const ResourceGroupInfo & group_info = resource_group_infos.top(); const bool ru_exhausted = LocalAdmissionController::isRUExhausted(group_info.priority); - LOG_TRACE( - logger, - "trying to schedule task of resource group {}, priority: {}, ru exhausted: {}, is_finished: {}, " - "task_queue.empty(): {}", - group_info.name, - group_info.priority, - ru_exhausted, - is_finished, - group_info.task_queue->empty()); - // When highest priority of resource group is less than zero, means RU of all resource groups are exhausted. // Should not take any task from nested task queue for this situation. if (!ru_exhausted) @@ -138,6 +128,18 @@ bool ResourceControlQueue::take(TaskPtr & task) return true; } wait_dura = LocalAdmissionController::global_instance->estWaitDuraMS(group_info.name); + + LOG_DEBUG( + logger, + "trying to schedule task of resource group {}, priority: {}, ru exhausted: {}, is_finished: {}, " + "task_queue.empty(): {}, wait_dura: {}", + group_info.name, + group_info.priority, + ru_exhausted, + is_finished, + group_info.task_queue->empty(), + wait_dura); + } assert(!task); From 19e4e9fed280b982bc8a2e48ac51ed16edc2d95e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 17 Oct 2024 16:24:20 +0800 Subject: [PATCH 2/9] queue log Signed-off-by: guo-shaoge --- .../Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h | 7 ++++++- .../DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp | 1 + dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h | 2 ++ dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h | 5 +++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index 196d325462d..f9e384ed025 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -80,8 +80,13 @@ class PipeConditionVariable TaskScheduler::instance->submit(std::move(task)); } + size_t getTaskCnt() const + { + std::lock_guard lock(mu); + return tasks.size(); + } private: - std::mutex mu; + mutable std::mutex mu; std::deque tasks; }; } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index e482a541024..9851b69859f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -156,6 +156,7 @@ bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & { if (pool->getFreeBlockSlots() <= 0) { + LOG_DEBUG(Logger::get(), "gjt debug needScheduleToRead failed: {}", pool->getFreeBlockSlotsInfo()); GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); return false; } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index 048bbd38a6a..8bbe203c9ba 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -74,7 +74,9 @@ class WorkQueue void registerPipeTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}", queue.size(), done, pipe_task_cnt); if (queue.empty() && !done) { pipe_cv.registerTask(std::move(task)); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c53990034d5..5cfb4161b66 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -202,6 +202,11 @@ class SegmentReadTaskPool const LoggerPtr & getLogger() const { return log; } + String getFreeBlockSlotsInfo() const + { + return fmt::format("block_slot_limit: {}, pending: {}", + block_slot_limit, blk_stat.pendingCount()); + } private: Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; From 8a4f5ffa778368d125ca4318f420896cf5906ded Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 18 Oct 2024 10:45:19 +0800 Subject: [PATCH 3/9] disable storage consume ru Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 8fc6b49c76b..ae7b5cb2b04 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -774,10 +774,10 @@ class LACBytesCollector void consume() { assert(delta_bytes != 0); - if (!resource_group_name.empty()) - LocalAdmissionController::global_instance->consumeBytesResource( - resource_group_name, - bytesToRU(delta_bytes)); + // if (!resource_group_name.empty()) + // LocalAdmissionController::global_instance->consumeBytesResource( + // resource_group_name, + // bytesToRU(delta_bytes)); } const std::string resource_group_name; From d805a02ce629d3f2476e065a1f31471acf9e29e4 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 18 Oct 2024 15:56:29 +0800 Subject: [PATCH 4/9] enable storage ru Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index ae7b5cb2b04..8fc6b49c76b 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -774,10 +774,10 @@ class LACBytesCollector void consume() { assert(delta_bytes != 0); - // if (!resource_group_name.empty()) - // LocalAdmissionController::global_instance->consumeBytesResource( - // resource_group_name, - // bytesToRU(delta_bytes)); + if (!resource_group_name.empty()) + LocalAdmissionController::global_instance->consumeBytesResource( + resource_group_name, + bytesToRU(delta_bytes)); } const std::string resource_group_name; From 230922ef1280f3f0e02da3019456372dee808b4f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 18 Oct 2024 21:58:45 +0800 Subject: [PATCH 5/9] pipe_cv_cnt log Signed-off-by: guo-shaoge --- dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index 8bbe203c9ba..afb921d423e 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -76,7 +76,7 @@ class WorkQueue { const auto pipe_task_cnt = pipe_cv.getTaskCnt(); std::lock_guard lock(mu); - LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}", queue.size(), done, pipe_task_cnt); + LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}, pipe_task_cnt: {}", queue.size(), done, pipe_task_cnt); if (queue.empty() && !done) { pipe_cv.registerTask(std::move(task)); From 9a5b7da86ece3a880fde5c1b507e2b64952fe536 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 18 Oct 2024 22:17:38 +0800 Subject: [PATCH 6/9] print addr Signed-off-by: guo-shaoge --- dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h | 11 ++++++++++- dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index afb921d423e..599e94a1456 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace DB::DM { @@ -71,12 +72,20 @@ class WorkQueue , pop_empty_times(0) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeTask(TaskPtr && task) { { const auto pipe_task_cnt = pipe_cv.getTaskCnt(); std::lock_guard lock(mu); - LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}, pipe_task_cnt: {}", queue.size(), done, pipe_task_cnt); + LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), done, pipe_task_cnt, getAddr()); if (queue.empty() && !done) { pipe_cv.registerTask(std::move(task)); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 5cfb4161b66..0e51a016db5 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -204,8 +204,8 @@ class SegmentReadTaskPool String getFreeBlockSlotsInfo() const { - return fmt::format("block_slot_limit: {}, pending: {}", - block_slot_limit, blk_stat.pendingCount()); + return fmt::format("block_slot_limit: {}, pending: {}, workqueue: {}", + block_slot_limit, blk_stat.pendingCount(), q.getAddr()); } private: Int64 getFreeActiveSegmentsUnlock() const; From dbd6afed1c6b8a4ed4bd4cdb7a3a7b30105ccf5f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 18 Oct 2024 22:23:23 +0800 Subject: [PATCH 7/9] RCQ more log Signed-off-by: guo-shaoge --- .../Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp index a7c935a9db3..82159d66cef 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp @@ -115,6 +115,8 @@ bool ResourceControlQueue::take(TaskPtr & task) continue; UInt64 wait_dura = LocalAdmissionController::DEFAULT_FETCH_GAC_INTERVAL_MS; + LOG_DEBUG(logger, "gjt debug RCQ::take() wait_dura: {}, resource_group_infos: {}", + wait_dura, resource_group_infos.size()); if (!resource_group_infos.empty()) { const ResourceGroupInfo & group_info = resource_group_infos.top(); @@ -124,6 +126,7 @@ bool ResourceControlQueue::take(TaskPtr & task) // Should not take any task from nested task queue for this situation. if (!ru_exhausted) { + LOG_DEBUG(logger, "gjt debug RCQ::take() ru enough, returns ok"); mustTakeTask(group_info.task_queue, task); return true; } From 8faaf5afc7971001227b544beae8aa950b82b8a6 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 19 Oct 2024 12:51:27 +0800 Subject: [PATCH 8/9] sender log Signed-off-by: guo-shaoge --- dbms/src/Common/LooseBoundedMPMCQueue.h | 19 +++++++++++++++++++ .../ReadThread/SegmentReadTaskScheduler.cpp | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/LooseBoundedMPMCQueue.h b/dbms/src/Common/LooseBoundedMPMCQueue.h index 6611d9bf37e..bead1e5e1c1 100644 --- a/dbms/src/Common/LooseBoundedMPMCQueue.h +++ b/dbms/src/Common/LooseBoundedMPMCQueue.h @@ -50,10 +50,20 @@ class LooseBoundedMPMCQueue , push_callback(std::move(push_callback_)) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeReadTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_reader_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeReadTask queue size: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (queue.empty() && status == MPMCQueueStatus::NORMAL) { pipe_reader_cv.registerTask(std::move(task)); @@ -66,7 +76,10 @@ class LooseBoundedMPMCQueue void registerPipeWriteTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_writer_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeWriteTask isFullWithoutLock: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + isFullWithoutLock(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL) { pipe_writer_cv.registerTask(std::move(task)); @@ -259,18 +272,24 @@ class LooseBoundedMPMCQueue private: void notifyOneReader() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneReader: {}", + getAddr()); reader_cv.notify_one(); pipe_reader_cv.notifyOne(); } void notifyOneWriter() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneWriter: {}", + getAddr()); writer_cv.notify_one(); pipe_writer_cv.notifyOne(); } void notifyAll() { + LOG_DEBUG(Logger::get(), "gjt debug notifyAll: {}", + getAddr()); reader_cv.notify_all(); pipe_reader_cv.notifyAll(); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index 9851b69859f..9219dc2f9d6 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -156,7 +156,7 @@ bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & { if (pool->getFreeBlockSlots() <= 0) { - LOG_DEBUG(Logger::get(), "gjt debug needScheduleToRead failed: {}", pool->getFreeBlockSlotsInfo()); + // LOG_DEBUG(Logger::get(), "gjt debug needScheduleToRead failed: {}", pool->getFreeBlockSlotsInfo()); GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); return false; } From 1f0cdac6896440ddcb428d8335304688571fc405 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 19 Oct 2024 14:39:40 +0800 Subject: [PATCH 9/9] fix Signed-off-by: guo-shaoge --- dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp index 072fcc2bee3..6450f491559 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp @@ -432,8 +432,8 @@ void MPPTunnelSetWriterBase::fineGrainedShuffleWrite( compression_method, original_size); - if unlikely (tracked_packet->getPacket().chunks_size() <= 0) - return; + // if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + // return; auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); checkPacketSize(packet_bytes); @@ -457,8 +457,8 @@ void MPPTunnelSetWriterBase::fineGrainedShuffleWrite( num_columns, result_field_types); - if unlikely (tracked_packet->getPacket().chunks_size() <= 0) - return; + // if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + // return; auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); checkPacketSize(packet_bytes);