From 2dd66e589a65d1d5804a1641f3b1e74945f4adf3 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 15 Mar 2024 07:14:18 +0100 Subject: [PATCH] [FEAT] DPL: use Signposts everywhere in DataProcessingDevice (#12868) --- Framework/CHANGELOG.md | 5 + Framework/Core/COOKBOOK.md | 3 +- Framework/Core/src/DataProcessingDevice.cxx | 118 +++++++++++++------- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/Framework/CHANGELOG.md b/Framework/CHANGELOG.md index 044274d9515c4..331805f8538fd 100644 --- a/Framework/CHANGELOG.md +++ b/Framework/CHANGELOG.md @@ -1,3 +1,8 @@ +# 2024-03-14: Move DataProcessingDevice to use Signposts + +All the messages from DataProcessingDevice have been migrated to use Signpost. +This will hopefully simplify debugging. + # 2024-02-22: Drop Tracy support Tracy support never took off, so I am dropping it. This was mostly because people do not know about it and having a per process profile GUI was way unpractical. Moreover, needing an extra compile time flag meant one most likely did not have the support compiled in when needed. diff --git a/Framework/Core/COOKBOOK.md b/Framework/Core/COOKBOOK.md index d92006e63d650..c327651ae53ca 100644 --- a/Framework/Core/COOKBOOK.md +++ b/Framework/Core/COOKBOOK.md @@ -524,8 +524,7 @@ Sometimes (e.g. when running a child inside valgrind) it might be useful to disa some-workflow --monitoring-backend=no-op:// ``` -notice that the -will not function properly if you do so. +notice that the will not function properly if you do so. ## Profiling diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 58ea6524f0b7d..70654bc43e84b 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -119,7 +119,8 @@ void on_transition_requested_expired(uv_timer_t* handle) { auto* state = (DeviceState*)handle->data; state->loopReason |= DeviceState::TIMER_EXPIRED; - LOGP(info, "Timer expired. Forcing transition to READY"); + O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "callback", "Grace period for calibration expired. Exiting."); state->transitionHandling = TransitionHandlingState::Expired; } @@ -218,8 +219,13 @@ void run_callback(uv_work_t* handle) { auto* task = (TaskStreamInfo*)handle->data; auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)}; + // We create a new signpost interval for this specific data processor. Same id, same data processor. + auto& dataProcessorContext = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext); + O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index); DataProcessingDevice::doPrepare(ref); DataProcessingDevice::doRun(ref); + O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index); } // Once the processing in a thread is done, this is executed on the main thread. @@ -1207,6 +1213,8 @@ void DataProcessingDevice::Run() auto& state = ref.get(); state.loopReason = DeviceState::LoopReason::FIRST_LOOP; bool firstLoop = true; + O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop); + O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop"); while (state.transitionHandling != TransitionHandlingState::Expired) { if (state.nextFairMQState.empty() == false) { (void)this->ChangeState(state.nextFairMQState.back()); @@ -1236,6 +1244,7 @@ void DataProcessingDevice::Run() state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE; } if (NewStatePending()) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled."); shouldNotWait = true; state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING; } @@ -1261,25 +1270,26 @@ void DataProcessingDevice::Run() uv_update_time(state.loop); uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0); if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state requested. Waiting for {} seconds before quitting.", timeout); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout); } else { - LOGP(info, "New state requested. Waiting for {} seconds before switching to READY state.", timeout); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", (int)deviceContext.exitTransitionTimeout); } } else { state.transitionHandling = TransitionHandlingState::Expired; - if (timeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state requested. No timeout set, quitting immediately as per --completion-policy"); - } else if (timeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { - LOGP(info, "New state requested. No timeout set, switching to READY state immediately"); + if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); + } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state pending and we are already idle, quitting immediately as per --completion-policy"); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); } else { - LOGP(info, "New state pending and we are already idle, switching to READY immediately."); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately."); } } } // If we are Idle, we can then consider the transition to be expired. if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed."); state.transitionHandling = TransitionHandlingState::Expired; } if (state.severityStack.empty() == false) { @@ -1299,8 +1309,8 @@ void DataProcessingDevice::Run() // - we can trigger further events from the queue // - we can guarantee this is the last thing we do in the loop ( // assuming no one else is adding to the queue before this point). - auto onDrop = [®istry = mServiceRegistry](TimesliceSlot slot, std::vector& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { - LOGP(debug, "Dropping message from slot {}. Forwarding as needed.", slot.index); + auto onDrop = [®istry = mServiceRegistry, lid](TimesliceSlot slot, std::vector& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { + O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index); ServiceRegistryRef ref{registry}; ref.get(); ref.get(); @@ -1319,7 +1329,9 @@ void DataProcessingDevice::Run() auto& dpContext = ref.get(); dpContext.preLoopCallbacks(ref); } + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event."); uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE); + O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason); if ((state.loopReason & state.tracingFlags) != 0) { state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity()); fair::Logger::SetConsoleSeverity(fair::Severity::trace); @@ -1327,16 +1339,15 @@ void DataProcessingDevice::Run() fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back()); state.severityStack.pop_back(); } - LOGP(debug, "Loop reason mask {:b} & {:b} = {:b}", - state.loopReason, state.tracingFlags, - state.loopReason & state.tracingFlags); + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags); if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) { - LOGP(debug, "We were awakened by a OOB event. Rescanning everything."); + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything."); relayer.rescan(); } if (!state.pendingOffers.empty()) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size()); ref.get().updateOffers(state.pendingOffers, uv_now(state.loop)); } } @@ -1411,6 +1422,8 @@ void DataProcessingDevice::Run() mWasActive = false; } } + + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling); auto& spec = ref.get(); /// Cleanup messages which are still pending on exit. for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) { @@ -1425,6 +1438,8 @@ void DataProcessingDevice::Run() void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) { auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); + O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare."); *context.wasActive = false; { @@ -1441,17 +1456,19 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // real data input channels, they have to signal EndOfStream themselves. auto& state = ref.get(); auto& spec = ref.get(); - context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [](const auto& info) { + O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data()); + O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states."); + context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) { if (info.channel) { - LOGP(debug, "Input channel {}{} has {} parts left and is in state {}.", info.channel->GetName(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.", + info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state); } else { - LOGP(debug, "External channel {} is in state {}.", info.id.value, (int)info.state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state); } return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull); }); - - // Whether or not all the channels are completed - LOGP(debug, "Processing {} input channels.", spec.inputChannels.size()); + O2_SIGNPOST_END(device, cid, "do_prepare", "End report."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size()); /// Sort channels by oldest possible timeframe and /// process them in such order. static std::vector pollOrder; @@ -1463,13 +1480,14 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // Nothing to poll... if (pollOrder.empty()) { + O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration."); return; } auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel; auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel; auto delta = currentNewest.value - currentOldest.value; - LOGP(debug, "oldest possible timeframe range {}, {} => {} delta", currentOldest.value, currentNewest.value, - delta); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64, + (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta); auto& infos = state.inputChannelInfos; if (context.balancingInputs) { @@ -1495,12 +1513,13 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) } pollOrder.erase(newEnd, pollOrder.end()); } - LOGP(debug, "processing {} channels", pollOrder.size()); + O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size()); for (auto sci : pollOrder) { auto& info = state.inputChannelInfos[sci]; auto& channelSpec = spec.inputChannels[sci]; - LOGP(debug, "Processing channel {}", channelSpec.name); + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); + O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str()); if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) { context.allDone = false; @@ -1511,14 +1530,19 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) if (info.parts.Size()) { DataProcessingDevice::handleData(ref, info); } - LOGP(debug, "Flushing channel {} which is in state {} and has {} parts still pending.", channelSpec.name, (int)info.state, info.parts.Size()); + O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } if (info.channel == nullptr) { + O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } // Only poll DPL channels for now. if (info.channelType != ChannelAccountingType::DPL) { + O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } auto& socket = info.channel->GetSocket(); @@ -1530,7 +1554,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) socket.Events(&info.hasPendingEvents); // If we do not read, we can continue. if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) { - LOGP(debug, "No pending events and no remaining parts to process for channel {}", channelSpec.name); + O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str()); continue; } } @@ -1547,13 +1571,13 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // to process. bool newMessages = false; while (true) { - LOGP(debug, "Receiving loop called for channel {} ({}) with oldest possible timeslice {}", - info.channel->GetName(), info.id.value, info.oldestForChannel.value); + O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu", + channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value); if (info.parts.Size() < 64) { fair::mq::Parts parts; info.channel->Receive(parts, 0); if (parts.Size()) { - LOGP(debug, "Receiving some parts {}", parts.Size()); + O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value); } for (auto&& part : parts) { info.parts.fParts.emplace_back(std::move(part)); @@ -1578,15 +1602,21 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) info.readPolled = false; *context.wasActive |= newMessages; } + O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).", + channelSpec.name.c_str(), info.id.value); } } void DataProcessingDevice::doRun(ServiceRegistryRef ref) { auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); auto switchState = [ref](StreamingState newState) { auto& state = ref.get(); - LOG(detail) << "New state " << (int)newState << " old state " << (int)state.streaming; + auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); + O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming); + O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState); state.streaming = newState; ref.get().notifyStreamingState(state.streaming); }; @@ -1625,7 +1655,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) } if (state.streaming == StreamingState::EndOfStreaming) { - LOGP(detail, "We are in EndOfStreaming. Flushing queues."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues."); // We keep processing data until we are Idle. // FIXME: not sure this is the correct way to drain the queues, but // I guess we will see. @@ -1646,7 +1676,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) context.postEOSCallbacks(eosContext); for (auto& channel : spec.outputChannels) { - LOGP(detail, "Sending end of stream to {}", channel.name); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str()); DataProcessingHelpers::sendEndOfStream(ref, channel); } // This is needed because the transport is deleted before the device. @@ -1658,7 +1688,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) *context.wasActive = true; } // On end of stream we shut down all output pollers. - LOGP(detail, "Shutting down output pollers"); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { uv_poll_stop(poller); } @@ -1667,7 +1697,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) if (state.streaming == StreamingState::Idle) { // On end of stream we shut down all output pollers. - LOGP(detail, "We are in Idle. Shutting down output pollers."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { uv_poll_stop(poller); } @@ -1708,6 +1738,10 @@ struct WaitBackpressurePolicy { void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& info) { auto& context = ref.get(); + // This is the same id as the upper level function, so we get the events + // associated with the same interval. We will simply use "handle_data" as + // the category. + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); enum struct InputType : int { Invalid = 0, @@ -1731,6 +1765,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& // than an input, because we do not want the outer loop actually be exposed // to the implementation details of the messaging layer. auto getInputTypes = [&info, &context]() -> std::optional> { + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); auto ref = ServiceRegistryRef{*context.registry}; auto& stats = ref.get(); auto& parts = info.parts; @@ -1752,7 +1787,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto* headerData = parts.At(pi)->GetData(); auto sih = o2::header::get(headerData); if (sih) { - LOGP(debug, "Got SourceInfoHeader with state {}", (int)sih->state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state); info.state = sih->state; insertInputInfo(pi, 2, InputType::SourceInfo); *context.wasActive = true; @@ -1767,12 +1802,12 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto dh = o2::header::get(headerData); if (!dh) { insertInputInfo(pi, 0, InputType::Invalid); - LOGP(error, "Header is not a DataHeader?"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?"); continue; } if (dh->payloadSize > parts.At(pi + 1)->GetSize()) { insertInputInfo(pi, 0, InputType::Invalid); - LOGP(error, "DataHeader payloadSize mismatch"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch"); continue; } auto dph = o2::header::get(headerData); @@ -1785,7 +1820,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& } if (!dph) { insertInputInfo(pi, 2, InputType::Invalid); - LOGP(error, "Header stack does not contain DataProcessingHeader"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader"); continue; } if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { @@ -1800,7 +1835,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& // pair. size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; if (finalSplitPayloadIndex > parts.Size()) { - LOGP(error, "DataHeader::splitPayloadParts invalid"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid"); insertInputInfo(pi, 0, InputType::Invalid); continue; } @@ -1811,7 +1846,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& } } if (results.size() + nTotalPayloads != parts.Size()) { - LOG(error) << "inconsistent number of inputs extracted"; + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size()); return std::nullopt; } return results; @@ -2429,6 +2464,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (action.op == CompletionPolicy::CompletionOp::Process) { cleanTimers(action.slot, record); } + O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str()); } O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");