Skip to content

Commit

Permalink
DPL: improve logging for processing loop (#12585)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Jan 24, 2024
1 parent bb8b1c3 commit b397b5c
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include <TMessage.h>
#include <TClonesArray.h>

#include <fmt/ostream.h>
#include <algorithm>
#include <vector>
#include <numeric>
Expand All @@ -83,6 +84,14 @@
#include <sstream>
#include <boost/property_tree/json_parser.hpp>

// Formatter to avoid having to rewrite the ostream operator for the enum
namespace fmt
{
template <>
struct formatter<o2::framework::CompletionPolicy::CompletionOp> : ostream_formatter {
};
} // namespace fmt

O2_DECLARE_DYNAMIC_LOG(device);

using namespace o2::framework;
Expand Down Expand Up @@ -2253,28 +2262,21 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
};

// This is the main dispatching loop
LOGP(debug, "Processing actions:");
auto& state = ref.get<DeviceState>();
auto& spec = ref.get<DeviceSpec const>();

auto& dpContext = ref.get<DataProcessorContext>();
auto& streamContext = ref.get<StreamContext>();
O2_SIGNPOST_ID_GENERATE(sid, device);
O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
for (auto action : getReadyActions()) {
LOGP(debug, " Begin action");
O2_SIGNPOST_ID_GENERATE(aid, device);
O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
if (action.op == CompletionPolicy::CompletionOp::Wait) {
LOGP(debug, " - Action is to Wait");
O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
continue;
}

switch (action.op) {
case CompletionPolicy::CompletionOp::Consume:
LOG(debug) << " - Action is to " << action.op << " " << action.slot.index;
break;
default:
LOG(debug) << " - Action is to " << action.op << " " << action.slot.index;
break;
}

prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
action.op == CompletionPolicy::CompletionOp::Discard;
Expand All @@ -2285,20 +2287,19 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
*context.registry};
ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
{
ZoneScopedN("service pre processing");
// Notice this should be thread safe and reentrant
// as it is called from many threads.
streamContext.preProcessingCallbacks(processContext);
dpContext.preProcessingCallbacks(processContext);
}
if (action.op == CompletionPolicy::CompletionOp::Discard) {
LOGP(debug, " - Action is to Discard");
context.postDispatchingCallbacks(processContext);
if (spec.forwards.empty() == false) {
auto& timesliceIndex = ref.get<TimesliceIndex>();
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
continue;
}
continue;
}
// If there is no optional inputs we canForwardEarly
// the messages to that parallel processing can happen.
Expand All @@ -2308,7 +2309,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;

if (context.canForwardEarly && hasForwards && consumeSomething) {
LOGP(debug, " - Early forwarding");
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
auto& timesliceIndex = ref.get<TimesliceIndex>();
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
}
Expand All @@ -2325,28 +2326,48 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
auto& spec = ref.get<DeviceSpec const>();
auto& streamContext = ref.get<StreamContext>();
auto& dpContext = ref.get<DataProcessorContext>();
auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
switch (action.op) {
case CompletionPolicy::CompletionOp::Consume:
case CompletionPolicy::CompletionOp::ConsumeExisting:
case CompletionPolicy::CompletionOp::ConsumeAndRescan:
case CompletionPolicy::CompletionOp::Process:
return true;
break;
default:
return false;
}
};
if (state.quitRequested == false) {
{
ZoneScopedN("service post processing");
// Callbacks from services
dpContext.preProcessingCallbacks(processContext);
streamContext.preProcessingCallbacks(processContext);
dpContext.preProcessingCallbacks(processContext);
// Callbacks from users
ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
}
if (context.statefulProcess) {
ZoneScopedN("statefull process");
O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
if (context.statefulProcess && shouldProcess(action)) {
// This way, usercode can use the the same processing context to identify
// its signposts and we can map user code to device iterations.
O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
(context.statefulProcess)(processContext);
} else if (context.statelessProcess) {
ZoneScopedN("stateless process");
O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
} else if (context.statelessProcess && shouldProcess(action)) {
O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
(context.statelessProcess)(processContext);
O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
} else if (context.statelessProcess || context.statefulProcess) {
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
} else {
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
state.streaming = StreamingState::Idle;
}

// Notify the sink we just consumed some timeframe data
if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
auto& allocator = ref.get<DataAllocator>();
allocator.make<int>(OutputRef{"dpl-summary", compile_time_hash(spec.name.c_str())}, 1);
}
Expand All @@ -2361,7 +2382,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
}

{
ZoneScopedN("service post processing");
ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
dpContext.postProcessingCallbacks(processContext);
streamContext.postProcessingCallbacks(processContext);
Expand Down Expand Up @@ -2408,7 +2428,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
}
if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
LOGP(debug, "Late forwarding");
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
auto& timesliceIndex = ref.get<TimesliceIndex>();
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
}
Expand All @@ -2421,6 +2441,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
cleanTimers(action.slot, record);
}
}
O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");

// We now broadcast the end of stream if it was requested
if (state.streaming == StreamingState::EndOfStreaming) {
Expand Down

0 comments on commit b397b5c

Please sign in to comment.