diff --git a/DataFormats/Headers/include/Headers/DataHeader.h b/DataFormats/Headers/include/Headers/DataHeader.h index c37eff9b34f20..e4ddaded20aba 100644 --- a/DataFormats/Headers/include/Headers/DataHeader.h +++ b/DataFormats/Headers/include/Headers/DataHeader.h @@ -372,8 +372,9 @@ struct BaseHeader { union { uint32_t flags; struct { - uint32_t flagsNextHeader : 1, // do we have a next header after this one? - flagsUnused : 31; // currently unused + uint32_t flagsNextHeader : 1, // do we have a next header after this one? + flagsReserved : 15, // reserved for future use + flagsDerivedHeader : 16; // reserved for usage by the derived header }; }; diff --git a/Framework/CHANGELOG.md b/Framework/CHANGELOG.md index 044274d9515c4..a1c0accde0886 100644 --- a/Framework/CHANGELOG.md +++ b/Framework/CHANGELOG.md @@ -1,3 +1,13 @@ +# 2024-03-14: Introduce calibration mode + +This introduces a new --data-processing-timeout `` option which can be used to specify +that data processing should finish after `` seconds. After it, only the messages which are produced on EndOfStream will be send. + +# 2024-02-23: Move DataProcessingDevice to use Signposts + +All the messages from DataProcessingDevice have been migrated to use Signpost. +This will hopefully simplify debugging. + # 2024-02-22: Drop Tracy support Tracy support never took off, so I am dropping it. This was mostly because people do not know about it and having a per process profile GUI was way unpractical. Moreover, needing an extra compile time flag meant one most likely did not have the support compiled in when needed. diff --git a/Framework/Core/COOKBOOK.md b/Framework/Core/COOKBOOK.md index d92006e63d650..c327651ae53ca 100644 --- a/Framework/Core/COOKBOOK.md +++ b/Framework/Core/COOKBOOK.md @@ -524,8 +524,7 @@ Sometimes (e.g. when running a child inside valgrind) it might be useful to disa some-workflow --monitoring-backend=no-op:// ``` -notice that the -will not function properly if you do so. +notice that the will not function properly if you do so. ## Profiling diff --git a/Framework/Core/include/Framework/DataProcessingHeader.h b/Framework/Core/include/Framework/DataProcessingHeader.h index 5c068b4e4179a..484dbb9d51a8e 100644 --- a/Framework/Core/include/Framework/DataProcessingHeader.h +++ b/Framework/Core/include/Framework/DataProcessingHeader.h @@ -42,6 +42,9 @@ namespace o2::framework /// @ingroup aliceo2_dataformats_dataheader struct DataProcessingHeader : public header::BaseHeader { static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET = 0x8000000000000000; + // The following flags are used to indicate the behavior of the data processing + static constexpr int32_t KEEP_AT_EOS_FLAG = 1; + /// We return some number of milliseconds, offsetting int by 0x8000000000000000 /// to make sure we can understand when the dummy constructor of DataProcessingHeader was /// used without overriding it with an actual real time from epoch. diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index 3777e7f608b75..04cf663d5e276 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -28,8 +28,18 @@ struct ComputingQuotaStats; struct DeviceContext { ComputingQuotaStats* quotaStats = nullptr; uv_timer_t* gracePeriodTimer = nullptr; + uv_timer_t* dataProcessingGracePeriodTimer = nullptr; uv_signal_t* sigusr1Handle = nullptr; int expectedRegionCallbacks = 0; + // The timeout for the data processing to stop on this device. + // After this is reached, incoming data not marked to be kept will + // be dropped and the data processing will be stopped. However the + // calibrations will still be done and objects resulting from calibrations + // will be marked to be kept. + int dataProcessingTimeout = 0; + // The timeout for the whole processing to stop on this device. + // This includes the grace period for processing and the time + // for the calibrations to be done. int exitTransitionTimeout = 0; }; diff --git a/Framework/Core/include/Framework/DeviceStateEnums.h b/Framework/Core/include/Framework/DeviceStateEnums.h index 291faac0ac982..a4c02c70c2bf6 100644 --- a/Framework/Core/include/Framework/DeviceStateEnums.h +++ b/Framework/Core/include/Framework/DeviceStateEnums.h @@ -30,6 +30,8 @@ enum struct TransitionHandlingState { NoTransition, /// A transition was notified to be requested Requested, + /// Only calibrations can be done + DataProcessingExpired, /// A transition needs to be fullfilled ASAP Expired }; diff --git a/Framework/Core/include/Framework/TimingInfo.h b/Framework/Core/include/Framework/TimingInfo.h index 84f3971ad3a4a..1734da8dd3941 100644 --- a/Framework/Core/include/Framework/TimingInfo.h +++ b/Framework/Core/include/Framework/TimingInfo.h @@ -36,9 +36,11 @@ struct TimingInfo { /// from a new run, as being processed by the current stream. /// FIXME: for now this is the same as the above. bool streamRunNumberChanged = false; + /// Wether this kind of data should be flushed during end of stream. + bool keepAtEndOfStream = false; static bool timesliceIsTimer(size_t timeslice) { return timeslice > 1652945069870351; } - bool isTimer() const { return timesliceIsTimer(timeslice); }; + [[nodiscard]] bool isTimer() const { return timesliceIsTimer(timeslice); }; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index fe38283d5e2de..b197f31c4f485 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -119,6 +119,7 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, dh.runNumber = timingInfo.runNumber; DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation}; + static_cast(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0; auto& proxy = mRegistry.get(); auto* transport = proxy.getOutputTransport(routeIndex); diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 58ea6524f0b7d..436b48f80a8be 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -119,10 +119,20 @@ void on_transition_requested_expired(uv_timer_t* handle) { auto* state = (DeviceState*)handle->data; state->loopReason |= DeviceState::TIMER_EXPIRED; - LOGP(info, "Timer expired. Forcing transition to READY"); + O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "callback", "Grace period for calibration expired. Exiting."); state->transitionHandling = TransitionHandlingState::Expired; } +void on_data_processing_grace_expired(uv_timer_t* handle) +{ + auto* state = (DeviceState*)handle->data; + state->loopReason |= DeviceState::TIMER_EXPIRED; + O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); + O2_SIGNPOST_EVENT_EMIT_INFO(device, cid, "callback", "Grace period for Data Processing expired. Waiting for calibration"); + state->transitionHandling = TransitionHandlingState::DataProcessingExpired; +} + void on_communication_requested(uv_async_t* s) { auto* state = (DeviceState*)s->data; @@ -218,8 +228,13 @@ void run_callback(uv_work_t* handle) { auto* task = (TaskStreamInfo*)handle->data; auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)}; + // We create a new signpost interval for this specific data processor. Same id, same data processor. + auto& dataProcessorContext = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext); + O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index); DataProcessingDevice::doPrepare(ref); DataProcessingDevice::doRun(ref); + O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index); } // Once the processing in a thread is done, this is executed on the main thread. @@ -951,6 +966,7 @@ void DataProcessingDevice::InitTask() } deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue("expected-region-callbacks")); + deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue("data-processing-timeout")); deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue("exit-transition-timeout")); for (auto& channel : GetChannels()) { @@ -1207,6 +1223,8 @@ void DataProcessingDevice::Run() auto& state = ref.get(); state.loopReason = DeviceState::LoopReason::FIRST_LOOP; bool firstLoop = true; + O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop); + O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop"); while (state.transitionHandling != TransitionHandlingState::Expired) { if (state.nextFairMQState.empty() == false) { (void)this->ChangeState(state.nextFairMQState.back()); @@ -1236,13 +1254,13 @@ void DataProcessingDevice::Run() state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE; } if (NewStatePending()) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled."); shouldNotWait = true; state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING; } if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { state.transitionHandling = TransitionHandlingState::Requested; auto& deviceContext = ref.get(); - auto timeout = deviceContext.exitTransitionTimeout; // Check if we only have timers bool onlyTimers = true; auto& spec = ref.get(); @@ -1255,31 +1273,39 @@ void DataProcessingDevice::Run() if (onlyTimers) { state.streaming = StreamingState::EndOfStreaming; } - if (timeout != 0 && state.streaming != StreamingState::Idle) { + + if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { state.transitionHandling = TransitionHandlingState::Requested; ref.get().call(ServiceRegistryRef{ref}); uv_update_time(state.loop); - uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); + // In case we have a calibration grace period it will always be longer than the data processing timeout + if (deviceContext.dataProcessingTimeout != deviceContext.exitTransitionTimeout) { + uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_grace_expired, deviceContext.dataProcessingTimeout * 1000, 0); + } else { + deviceContext.dataProcessingGracePeriodTimer = nullptr; + } if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state requested. Waiting for {} seconds before quitting.", timeout); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout); } else { - LOGP(info, "New state requested. Waiting for {} seconds before switching to READY state.", timeout); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", (int)deviceContext.exitTransitionTimeout); } } else { state.transitionHandling = TransitionHandlingState::Expired; - if (timeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state requested. No timeout set, quitting immediately as per --completion-policy"); - } else if (timeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { - LOGP(info, "New state requested. No timeout set, switching to READY state immediately"); + if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); + } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - LOGP(info, "New state pending and we are already idle, quitting immediately as per --completion-policy"); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); } else { - LOGP(info, "New state pending and we are already idle, switching to READY immediately."); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately."); } } } - // If we are Idle, we can then consider the transition to be expired. - if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { + // If we are Idle, we can then consider the transition to be expired when it was requested or when the data processing timeout expired. + if ((state.transitionHandling == TransitionHandlingState::Requested || state.transitionHandling == TransitionHandlingState::DataProcessingExpired) && state.streaming == StreamingState::Idle) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed."); state.transitionHandling = TransitionHandlingState::Expired; } if (state.severityStack.empty() == false) { @@ -1299,8 +1325,8 @@ void DataProcessingDevice::Run() // - we can trigger further events from the queue // - we can guarantee this is the last thing we do in the loop ( // assuming no one else is adding to the queue before this point). - auto onDrop = [®istry = mServiceRegistry](TimesliceSlot slot, std::vector& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { - LOGP(debug, "Dropping message from slot {}. Forwarding as needed.", slot.index); + auto onDrop = [®istry = mServiceRegistry, lid](TimesliceSlot slot, std::vector& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { + O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index); ServiceRegistryRef ref{registry}; ref.get(); ref.get(); @@ -1319,7 +1345,9 @@ void DataProcessingDevice::Run() auto& dpContext = ref.get(); dpContext.preLoopCallbacks(ref); } + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event."); uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE); + O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason); if ((state.loopReason & state.tracingFlags) != 0) { state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity()); fair::Logger::SetConsoleSeverity(fair::Severity::trace); @@ -1327,16 +1355,15 @@ void DataProcessingDevice::Run() fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back()); state.severityStack.pop_back(); } - LOGP(debug, "Loop reason mask {:b} & {:b} = {:b}", - state.loopReason, state.tracingFlags, - state.loopReason & state.tracingFlags); + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags); if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) { - LOGP(debug, "We were awakened by a OOB event. Rescanning everything."); + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything."); relayer.rescan(); } if (!state.pendingOffers.empty()) { + O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size()); ref.get().updateOffers(state.pendingOffers, uv_now(state.loop)); } } @@ -1411,6 +1438,8 @@ void DataProcessingDevice::Run() mWasActive = false; } } + + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling); auto& spec = ref.get(); /// Cleanup messages which are still pending on exit. for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) { @@ -1425,6 +1454,8 @@ void DataProcessingDevice::Run() void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) { auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); + O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare."); *context.wasActive = false; { @@ -1441,17 +1472,19 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // real data input channels, they have to signal EndOfStream themselves. auto& state = ref.get(); auto& spec = ref.get(); - context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [](const auto& info) { + O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data()); + O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states."); + context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) { if (info.channel) { - LOGP(debug, "Input channel {}{} has {} parts left and is in state {}.", info.channel->GetName(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.", + info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state); } else { - LOGP(debug, "External channel {} is in state {}.", info.id.value, (int)info.state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state); } return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull); }); - - // Whether or not all the channels are completed - LOGP(debug, "Processing {} input channels.", spec.inputChannels.size()); + O2_SIGNPOST_END(device, cid, "do_prepare", "End report."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size()); /// Sort channels by oldest possible timeframe and /// process them in such order. static std::vector pollOrder; @@ -1463,13 +1496,14 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // Nothing to poll... if (pollOrder.empty()) { + O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration."); return; } auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel; auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel; auto delta = currentNewest.value - currentOldest.value; - LOGP(debug, "oldest possible timeframe range {}, {} => {} delta", currentOldest.value, currentNewest.value, - delta); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64, + (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta); auto& infos = state.inputChannelInfos; if (context.balancingInputs) { @@ -1495,12 +1529,13 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) } pollOrder.erase(newEnd, pollOrder.end()); } - LOGP(debug, "processing {} channels", pollOrder.size()); + O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size()); for (auto sci : pollOrder) { auto& info = state.inputChannelInfos[sci]; auto& channelSpec = spec.inputChannels[sci]; - LOGP(debug, "Processing channel {}", channelSpec.name); + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); + O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str()); if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) { context.allDone = false; @@ -1511,14 +1546,19 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) if (info.parts.Size()) { DataProcessingDevice::handleData(ref, info); } - LOGP(debug, "Flushing channel {} which is in state {} and has {} parts still pending.", channelSpec.name, (int)info.state, info.parts.Size()); + O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } if (info.channel == nullptr) { + O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } // Only poll DPL channels for now. if (info.channelType != ChannelAccountingType::DPL) { + O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.", + channelSpec.name.c_str(), (int)info.state, info.parts.Size()); continue; } auto& socket = info.channel->GetSocket(); @@ -1530,7 +1570,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) socket.Events(&info.hasPendingEvents); // If we do not read, we can continue. if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) { - LOGP(debug, "No pending events and no remaining parts to process for channel {}", channelSpec.name); + O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str()); continue; } } @@ -1547,13 +1587,13 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) // to process. bool newMessages = false; while (true) { - LOGP(debug, "Receiving loop called for channel {} ({}) with oldest possible timeslice {}", - info.channel->GetName(), info.id.value, info.oldestForChannel.value); + O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu", + channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value); if (info.parts.Size() < 64) { fair::mq::Parts parts; info.channel->Receive(parts, 0); if (parts.Size()) { - LOGP(debug, "Receiving some parts {}", parts.Size()); + O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value); } for (auto&& part : parts) { info.parts.fParts.emplace_back(std::move(part)); @@ -1578,15 +1618,21 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) info.readPolled = false; *context.wasActive |= newMessages; } + O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).", + channelSpec.name.c_str(), info.id.value); } } void DataProcessingDevice::doRun(ServiceRegistryRef ref) { auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); auto switchState = [ref](StreamingState newState) { auto& state = ref.get(); - LOG(detail) << "New state " << (int)newState << " old state " << (int)state.streaming; + auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); + O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming); + O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState); state.streaming = newState; ref.get().notifyStreamingState(state.streaming); }; @@ -1625,7 +1671,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) } if (state.streaming == StreamingState::EndOfStreaming) { - LOGP(detail, "We are in EndOfStreaming. Flushing queues."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Flushing queues."); // We keep processing data until we are Idle. // FIXME: not sure this is the correct way to drain the queues, but // I guess we will see. @@ -1636,6 +1682,11 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && hasOnlyGenerated == false) { relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false); } + + auto& timingInfo = ref.get(); + // We should keep the data generated at end of stream only for those + // which are not sources. + timingInfo.keepAtEndOfStream = (hasOnlyGenerated == false); EndOfStreamContext eosContext{*context.registry, ref.get()}; context.preEOSCallbacks(eosContext); @@ -1646,7 +1697,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) context.postEOSCallbacks(eosContext); for (auto& channel : spec.outputChannels) { - LOGP(detail, "Sending end of stream to {}", channel.name); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str()); DataProcessingHelpers::sendEndOfStream(ref, channel); } // This is needed because the transport is deleted before the device. @@ -1658,7 +1709,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) *context.wasActive = true; } // On end of stream we shut down all output pollers. - LOGP(detail, "Shutting down output pollers"); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { uv_poll_stop(poller); } @@ -1667,7 +1718,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) if (state.streaming == StreamingState::Idle) { // On end of stream we shut down all output pollers. - LOGP(detail, "We are in Idle. Shutting down output pollers."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { uv_poll_stop(poller); } @@ -1708,6 +1759,10 @@ struct WaitBackpressurePolicy { void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& info) { auto& context = ref.get(); + // This is the same id as the upper level function, so we get the events + // associated with the same interval. We will simply use "handle_data" as + // the category. + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); enum struct InputType : int { Invalid = 0, @@ -1731,6 +1786,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& // than an input, because we do not want the outer loop actually be exposed // to the implementation details of the messaging layer. auto getInputTypes = [&info, &context]() -> std::optional> { + O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); auto ref = ServiceRegistryRef{*context.registry}; auto& stats = ref.get(); auto& parts = info.parts; @@ -1752,7 +1808,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto* headerData = parts.At(pi)->GetData(); auto sih = o2::header::get(headerData); if (sih) { - LOGP(debug, "Got SourceInfoHeader with state {}", (int)sih->state); + O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state); info.state = sih->state; insertInputInfo(pi, 2, InputType::SourceInfo); *context.wasActive = true; @@ -1767,12 +1823,12 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto dh = o2::header::get(headerData); if (!dh) { insertInputInfo(pi, 0, InputType::Invalid); - LOGP(error, "Header is not a DataHeader?"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?"); continue; } if (dh->payloadSize > parts.At(pi + 1)->GetSize()) { insertInputInfo(pi, 0, InputType::Invalid); - LOGP(error, "DataHeader payloadSize mismatch"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch"); continue; } auto dph = o2::header::get(headerData); @@ -1785,7 +1841,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& } if (!dph) { insertInputInfo(pi, 2, InputType::Invalid); - LOGP(error, "Header stack does not contain DataProcessingHeader"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader"); continue; } if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { @@ -1800,7 +1856,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& // pair. size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; if (finalSplitPayloadIndex > parts.Size()) { - LOGP(error, "DataHeader::splitPayloadParts invalid"); + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid"); insertInputInfo(pi, 0, InputType::Invalid); continue; } @@ -1811,7 +1867,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& } } if (results.size() + nTotalPayloads != parts.Size()) { - LOG(error) << "inconsistent number of inputs extracted"; + O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size()); return std::nullopt; } return results; @@ -2429,6 +2485,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (action.op == CompletionPolicy::CompletionOp::Process) { cleanTimers(action.slot, record); } + O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str()); } O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions"); diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 565f85e895f31..354c30b4781b2 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1495,6 +1495,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("child-driver", bpo::value()); realOdesc.add_options()("rate", bpo::value()); realOdesc.add_options()("exit-transition-timeout", bpo::value()); + realOdesc.add_options()("data-processing-timeout", bpo::value()); realOdesc.add_options()("expected-region-callbacks", bpo::value()); realOdesc.add_options()("timeframes-rate-limit", bpo::value()); realOdesc.add_options()("environment", bpo::value()); @@ -1679,6 +1680,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("control-port", bpo::value(), "Utility port to be used by O2 Control") // ("rate", bpo::value(), "rate for a data source device (Hz)") // ("exit-transition-timeout", bpo::value(), "timeout before switching to READY state") // + ("data-processing-timeout", bpo::value(), "timeout before switching to calibration processing mode") // ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in fly") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // @@ -1709,7 +1711,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("infologger-mode", bpo::value(), "O2_INFOLOGGER_MODE override") // ("infologger-severity", bpo::value(), "minimun FairLogger severity which goes to info logger") // ("dpl-tracing-flags", bpo::value(), "pipe separated list of events to trace") // - ("signposts", bpo::value(), // + ("signposts", bpo::value()->default_value(defaultSignposts), // "comma separated list of signposts to enable (any of `completion`, `data_processor_context`, `stream_context`, `device`, `monitoring_service`)") // ("child-driver", bpo::value(), "external driver to start childs with (e.g. valgrind)"); // diff --git a/Framework/Core/src/O2ControlHelpers.cxx b/Framework/Core/src/O2ControlHelpers.cxx index 88132572a6210..e19e4a995d505 100644 --- a/Framework/Core/src/O2ControlHelpers.cxx +++ b/Framework/Core/src/O2ControlHelpers.cxx @@ -261,6 +261,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s dumpOut << indLevel << indScheme << "- \"-b\"\n"; dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n"; + dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n"; + dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--session\"\n"; diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index c72b4da73a45a..6be625743c6d6 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1034,6 +1034,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, // declared in the workflow definition are allowed. runner.AddHook([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) { std::string defaultExitTransitionTimeout = "0"; + std::string defaultDataProcessingTimeout = "0"; std::string defaultInfologgerMode = ""; o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode(); if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) { @@ -1052,6 +1053,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("signposts", bpo::value()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") // ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // + ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before switching from RUN to CALIBRATION") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");