diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 9f69d796f086f..f5c9b7947afde 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -73,6 +73,7 @@ #include #include +#include #include #include #include @@ -83,6 +84,14 @@ #include #include +// Formatter to avoid having to rewrite the ostream operator for the enum +namespace fmt +{ +template <> +struct formatter : ostream_formatter { +}; +} // namespace fmt + O2_DECLARE_DYNAMIC_LOG(device); using namespace o2::framework; @@ -2253,28 +2262,21 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v }; // This is the main dispatching loop - LOGP(debug, "Processing actions:"); auto& state = ref.get(); auto& spec = ref.get(); auto& dpContext = ref.get(); auto& streamContext = ref.get(); + O2_SIGNPOST_ID_GENERATE(sid, device); + O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions"); for (auto action : getReadyActions()) { - LOGP(debug, " Begin action"); + O2_SIGNPOST_ID_GENERATE(aid, device); + O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str()); if (action.op == CompletionPolicy::CompletionOp::Wait) { - LOGP(debug, " - Action is to Wait"); + O2_SIGNPOST_END(device, aid, "device", "Waiting for more data."); continue; } - switch (action.op) { - case CompletionPolicy::CompletionOp::Consume: - LOG(debug) << " - Action is to " << action.op << " " << action.slot.index; - break; - default: - LOG(debug) << " - Action is to " << action.op << " " << action.slot.index; - break; - } - prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot}); bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::Discard; @@ -2285,20 +2287,19 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v *context.registry}; ProcessingContext processContext{record, ref, ref.get()}; { - ZoneScopedN("service pre processing"); // Notice this should be thread safe and reentrant // as it is called from many threads. streamContext.preProcessingCallbacks(processContext); dpContext.preProcessingCallbacks(processContext); } if (action.op == CompletionPolicy::CompletionOp::Discard) { - LOGP(debug, " - Action is to Discard"); context.postDispatchingCallbacks(processContext); if (spec.forwards.empty() == false) { auto& timesliceIndex = ref.get(); forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false); + O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false); + continue; } - continue; } // If there is no optional inputs we canForwardEarly // the messages to that parallel processing can happen. @@ -2308,7 +2309,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting; if (context.canForwardEarly && hasForwards && consumeSomething) { - LOGP(debug, " - Early forwarding"); + O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str()); auto& timesliceIndex = ref.get(); forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume); } @@ -2325,9 +2326,20 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v auto& spec = ref.get(); auto& streamContext = ref.get(); auto& dpContext = ref.get(); + auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool { + switch (action.op) { + case CompletionPolicy::CompletionOp::Consume: + case CompletionPolicy::CompletionOp::ConsumeExisting: + case CompletionPolicy::CompletionOp::ConsumeAndRescan: + case CompletionPolicy::CompletionOp::Process: + return true; + break; + default: + return false; + } + }; if (state.quitRequested == false) { { - ZoneScopedN("service post processing"); // Callbacks from services dpContext.preProcessingCallbacks(processContext); streamContext.preProcessingCallbacks(processContext); @@ -2335,18 +2347,27 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // Callbacks from users ref.get().call(o2::framework::ServiceRegistryRef{ref}, (int)action.op); } - if (context.statefulProcess) { - ZoneScopedN("statefull process"); + O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext); + if (context.statefulProcess && shouldProcess(action)) { + // This way, usercode can use the the same processing context to identify + // its signposts and we can map user code to device iterations. + O2_SIGNPOST_START(device, pcid, "device", "Stateful process"); (context.statefulProcess)(processContext); - } else if (context.statelessProcess) { - ZoneScopedN("stateless process"); + O2_SIGNPOST_END(device, pcid, "device", "Stateful process"); + } else if (context.statelessProcess && shouldProcess(action)) { + O2_SIGNPOST_START(device, pcid, "device", "Stateful process"); (context.statelessProcess)(processContext); + O2_SIGNPOST_END(device, pcid, "device", "Stateful process"); + } else if (context.statelessProcess || context.statefulProcess) { + O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding."); } else { + O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle"); state.streaming = StreamingState::Idle; } // Notify the sink we just consumed some timeframe data if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) { + O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary"); auto& allocator = ref.get(); allocator.make(OutputRef{"dpl-summary", compile_time_hash(spec.name.c_str())}, 1); } @@ -2361,7 +2382,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v } { - ZoneScopedN("service post processing"); ref.get().call(o2::framework::ServiceRegistryRef{ref}, (int)action.op); dpContext.postProcessingCallbacks(processContext); streamContext.postProcessingCallbacks(processContext); @@ -2408,7 +2428,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v ref.get().call(o2::framework::ServiceRegistryRef{ref}); } if ((context.canForwardEarly == false) && hasForwards && consumeSomething) { - LOGP(debug, "Late forwarding"); + O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding"); auto& timesliceIndex = ref.get(); forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume); } @@ -2421,6 +2441,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v cleanTimers(action.slot, record); } } + O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions"); // We now broadcast the end of stream if it was requested if (state.streaming == StreamingState::EndOfStreaming) {