Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] DPL: Introduce calibration mode #12723

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions DataFormats/Headers/include/Headers/DataHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ struct BaseHeader {
union {
uint32_t flags;
struct {
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
flagsUnused : 31; // currently unused
uint32_t flagsNextHeader : 1, // do we have a next header after this one?
flagsReserved : 15, // reserved for future use
flagsDerivedHeader : 16; // reserved for usage by the derived header
};
};

Expand Down
10 changes: 10 additions & 0 deletions Framework/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# 2024-03-14: Introduce calibration mode

This introduces a new --data-processing-timeout `<timeout>` option which can be used to specify
that data processing should finish after `<timeout>` seconds. After it, only the messages which are produced on EndOfStream will be send.

# 2024-02-23: Move DataProcessingDevice to use Signposts

All the messages from DataProcessingDevice have been migrated to use Signpost.
This will hopefully simplify debugging.

# 2024-02-22: Drop Tracy support

Tracy support never took off, so I am dropping it. This was mostly because people do not know about it and having a per process profile GUI was way unpractical. Moreover, needing an extra compile time flag meant one most likely did not have the support compiled in when needed.
Expand Down
3 changes: 1 addition & 2 deletions Framework/Core/COOKBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,7 @@ Sometimes (e.g. when running a child inside valgrind) it might be useful to disa
some-workflow --monitoring-backend=no-op://
```

notice that the
will not function properly if you do so.
notice that the will not function properly if you do so.

## Profiling

Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/DataProcessingHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ namespace o2::framework
/// @ingroup aliceo2_dataformats_dataheader
struct DataProcessingHeader : public header::BaseHeader {
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET = 0x8000000000000000;
// The following flags are used to indicate the behavior of the data processing
static constexpr int32_t KEEP_AT_EOS_FLAG = 1;

/// We return some number of milliseconds, offsetting int by 0x8000000000000000
/// to make sure we can understand when the dummy constructor of DataProcessingHeader was
/// used without overriding it with an actual real time from epoch.
Expand Down
10 changes: 10 additions & 0 deletions Framework/Core/include/Framework/DeviceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,18 @@
struct DeviceContext {
ComputingQuotaStats* quotaStats = nullptr;
uv_timer_t* gracePeriodTimer = nullptr;
uv_timer_t* dataProcessingGracePeriodTimer = nullptr;
uv_signal_t* sigusr1Handle = nullptr;
int expectedRegionCallbacks = 0;
// The timeout for the data processing to stop on this device.
// After this is reached, incoming data not marked to be kept will
// be dropped and the data processing will be stopped. However the

Check failure on line 36 in Framework/Core/include/Framework/DeviceContext.h

View workflow job for this annotation

GitHub Actions / PR formatting / whitespace

Trailing spaces

Remove the trailing spaces at the end of the line.
// calibrations will still be done and objects resulting from calibrations
// will be marked to be kept.
int dataProcessingTimeout = 0;
// The timeout for the whole processing to stop on this device.
// This includes the grace period for processing and the time

Check failure on line 41 in Framework/Core/include/Framework/DeviceContext.h

View workflow job for this annotation

GitHub Actions / PR formatting / whitespace

Trailing spaces

Remove the trailing spaces at the end of the line.
// for the calibrations to be done.
int exitTransitionTimeout = 0;
};

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceStateEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum struct TransitionHandlingState {
NoTransition,
/// A transition was notified to be requested
Requested,
/// Only calibrations can be done
DataProcessingExpired,
/// A transition needs to be fullfilled ASAP
Expired
};
Expand Down
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/TimingInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ struct TimingInfo {
/// from a new run, as being processed by the current stream.
/// FIXME: for now this is the same as the above.
bool streamRunNumberChanged = false;
/// Wether this kind of data should be flushed during end of stream.
bool keepAtEndOfStream = false;

static bool timesliceIsTimer(size_t timeslice) { return timeslice > 1652945069870351; }
bool isTimer() const { return timesliceIsTimer(timeslice); };
[[nodiscard]] bool isTimer() const { return timesliceIsTimer(timeslice); };
};

} // namespace o2::framework
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(Output const& spec,
dh.runNumber = timingInfo.runNumber;

DataProcessingHeader dph{timingInfo.timeslice, 1, timingInfo.creation};
static_cast<o2::header::BaseHeader&>(dph).flagsDerivedHeader |= timingInfo.keepAtEndOfStream ? DataProcessingHeader::KEEP_AT_EOS_FLAG : 0;
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
auto* transport = proxy.getOutputTransport(routeIndex);

Expand Down
149 changes: 103 additions & 46 deletions Framework/Core/src/DataProcessingDevice.cxx

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("child-driver", bpo::value<std::string>());
realOdesc.add_options()("rate", bpo::value<std::string>());
realOdesc.add_options()("exit-transition-timeout", bpo::value<std::string>());
realOdesc.add_options()("data-processing-timeout", bpo::value<std::string>());
realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
realOdesc.add_options()("timeframes-rate-limit", bpo::value<std::string>());
realOdesc.add_options()("environment", bpo::value<std::string>());
Expand Down Expand Up @@ -1679,6 +1680,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
("exit-transition-timeout", bpo::value<std::string>(), "timeout before switching to READY state") //
("data-processing-timeout", bpo::value<std::string>(), "timeout before switching to calibration processing mode") //
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
Expand Down Expand Up @@ -1709,7 +1711,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("infologger-mode", bpo::value<std::string>(), "O2_INFOLOGGER_MODE override") //
("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //
("dpl-tracing-flags", bpo::value<std::string>(), "pipe separated list of events to trace") //
("signposts", bpo::value<std::string>(), //
("signposts", bpo::value<std::string>()->default_value(defaultSignposts), //
"comma separated list of signposts to enable (any of `completion`, `data_processor_context`, `stream_context`, `device`, `monitoring_service`)") //
("child-driver", bpo::value<std::string>(), "external driver to start childs with (e.g. valgrind)"); //

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s
dumpOut << indLevel << indScheme << "- \"-b\"\n";
dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--session\"\n";
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
// declared in the workflow definition are allowed.
runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
std::string defaultExitTransitionTimeout = "0";
std::string defaultDataProcessingTimeout = "0";
std::string defaultInfologgerMode = "";
o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode();
if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
Expand All @@ -1052,6 +1053,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before switching from RUN to CALIBRATION") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
Expand Down
Loading