Skip to content

Commit

Permalink
fmq kvals: refactor dd kv subscribe methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann committed Mar 31, 2021
1 parent 6983dc9 commit c40b67f
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 150 deletions.
8 changes: 4 additions & 4 deletions src/ReadoutEmulator/CruMemoryHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "Headers/DataHeader.h"

#include <fairmq/FairMQUnmanagedRegion.h>

#include <stack>
#include <map>
#include <vector>
Expand All @@ -30,9 +32,7 @@

class FairMQUnmanagedRegion;

namespace o2
{
namespace DataDistribution
namespace o2::DataDistribution
{

using namespace o2::header;
Expand Down Expand Up @@ -145,7 +145,7 @@ class CruMemoryHandler
/// output data queue
ConcurrentFifo<ReadoutLinkO2Data> mO2LinkDataQueue;
};
}

} /* namespace o2::DataDistribution */

#endif /* ALICEO2_CRU_MEMORY_HANDLER_H_ */
7 changes: 3 additions & 4 deletions src/ReadoutEmulator/ReadoutDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

#include <ReadoutDataModel.h>
#include <Utilities.h>
#include <FmqUtilities.h>

#include <memory>
#include <deque>
#include <condition_variable>

namespace o2
{
namespace DataDistribution
namespace o2::DataDistribution
{

class ReadoutDevice : public DataDistDevice
Expand Down Expand Up @@ -84,7 +83,7 @@ class ReadoutDevice : public DataDistDevice

RunningSamples<uint64_t, 8192> mFreeSuperpagesSamples;
};
}

} /* namespace o2::DataDistribution */

#endif /* ALICEO2_READOUT_EMULATOR_DEVICE_H_ */
1 change: 1 addition & 0 deletions src/StfBuilder/StfBuilderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <SubTimeFrameFileSource.h>
#include <ConcurrentQueue.h>
#include <Utilities.h>
#include <FmqUtilities.h>

#include <deque>
#include <memory>
Expand Down
1 change: 1 addition & 0 deletions src/StfSender/StfSenderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <SubTimeFrameFileSink.h>
#include <Utilities.h>
#include <FmqUtilities.h>

#include <thread>
#include <vector>
Expand Down
1 change: 1 addition & 0 deletions src/TfBuilder/TfBuilderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <SubTimeFrameFileSource.h>
#include <ConcurrentQueue.h>
#include <Utilities.h>
#include <FmqUtilities.h>

#include <deque>
#include <mutex>
Expand Down
8 changes: 5 additions & 3 deletions src/TfBuilder/runTfBuilderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ int main(int argc, char* argv[])

if (pKey == "partition_id" || pKey == "partition-id" || pKey == "environment-id" || pKey == "environment_id") {

if (r.fConfig.GetProperty<std::string>("discovery-partition") == "") {
r.fConfig.SetProperty<std::string>("discovery-partition", pValue);
IDDLOG("Config::Subscribe received key-value pair. {}=<{}>", pKey, pValue);
IDDLOG("Config::PartitionIdSubscribe received key-value pair. {}=<{}>", pKey, pValue);

if (r.fConfig.GetProperty<std::string>(Config::OptionKeyDiscoveryPartition) == "") {
r.fConfig.SetProperty<std::string>(Config::OptionKeyDiscoveryPartition, pValue);
IDDLOG("Config::PartitionIdSubscribe changed to: {}", pValue);
}
}
});
Expand Down
1 change: 1 addition & 0 deletions src/TfScheduler/TfSchedulerDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <ConfigConsul.h>

#include <Utilities.h>
#include <FmqUtilities.h>

#include <thread>

Expand Down
9 changes: 5 additions & 4 deletions src/TfScheduler/TfSchedulerInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
#include <ConfigConsul.h>

#include <Utilities.h>
#include <FmqUtilities.h>

#include <fairmq/ProgOptionsFwd.h>

#include <vector>
#include <map>
#include <thread>

namespace o2
{
namespace DataDistribution
namespace o2::DataDistribution
{

class ConsulConfig;
Expand Down Expand Up @@ -64,7 +65,7 @@ class TfSchedulerInstanceHandler
TfSchedulerInstanceRpcImpl mRpcServer;

};
}

} /* namespace o2::DataDistribution */

#endif /* ALICEO2_TF_SCHEDULER_INSTANCE_H_ */
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ target_include_directories(common
target_link_libraries(common
PUBLIC
base
fmqtools
Boost::iostreams
FairMQ::FairMQ
AliceO2::Headers
Expand Down
3 changes: 3 additions & 0 deletions src/common/SubTimeFrameFileSink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

#include "SubTimeFrameFileSink.h"
#include "FilePathUtils.h"
#include "FmqUtilities.h"
#include "DataDistLogger.h"

#include <fairmq/ProgOptions.h>

#include <boost/algorithm/string/replace.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/filesystem.hpp>
Expand Down
4 changes: 4 additions & 0 deletions src/common/SubTimeFrameFileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <Headers/DataHeader.h>

#include <fairmq/ProgOptionsFwd.h>

#include <boost/program_options/options_description.hpp>
#include <boost/filesystem.hpp>
#include <fstream>
Expand All @@ -33,6 +35,8 @@ namespace DataDistribution

namespace bpo = boost::program_options;

class DataDistDevice;

////////////////////////////////////////////////////////////////////////////////
/// SubTimeFrameFileSink
////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ target_link_libraries(base
spdlog::spdlog
Boost::filesystem
AliceO2::InfoLogger
FairLogger::FairLogger
)
27 changes: 1 addition & 26 deletions src/common/base/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#ifndef ALICEO2_DATADIST_UTILITIES_H_
#define ALICEO2_DATADIST_UTILITIES_H_

#include <fairmq/FairMQDevice.h>

#include <type_traits>
#include <memory>
#include <thread>
Expand All @@ -26,9 +24,7 @@

#include <boost/dynamic_bitset.hpp>

namespace o2
{
namespace DataDistribution
namespace o2::DataDistribution
{

template <class F, class ... Args>
Expand All @@ -42,26 +38,6 @@ std::thread create_thread_member(const char* name, F&& f, Args&&... args) {
});
}

class DataDistDevice : public FairMQDevice {

public:

void WaitForRunningState() const {
while (GetCurrentState() < fair::mq::State::Running) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(20ms);
}
}

bool IsRunningState() const {
return (GetCurrentState() == fair::mq::State::Running);
}

bool IsReadyOrRunningState() const {
return ((GetCurrentState() == fair::mq::State::Running) || (GetCurrentState() == fair::mq::State::Ready));
}
};

template <
typename T,
size_t N = 1024,
Expand Down Expand Up @@ -218,7 +194,6 @@ class EventRecorder {
}
};

}
} /* namespace o2::DataDistribution */

#endif /* ALICEO2_DATADIST_UTILITIES_H_ */
3 changes: 2 additions & 1 deletion src/common/fmqtools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ target_include_directories(fmqtools
${CMAKE_CURRENT_SOURCE_DIR}
)

target_link_libraries(base
target_link_libraries(fmqtools
PUBLIC
FairLogger::FairLogger
FairMQ::FairMQ
base
)
121 changes: 121 additions & 0 deletions src/common/fmqtools/FmqUtilities.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,124 @@

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#include "FmqUtilities.h"
#include <DataDistLogger.h>
#include <fairmq/FairMQDevice.h>
#include <fairmq/DeviceRunner.h>


namespace o2::DataDistribution::fmqtools {

static bool checkInfoLoggerOptions() {

DDDLOG("DataDistLogger: Checking INFOLOGGER_MODE variable");

const char *cMode = getenv("INFOLOGGER_MODE");

if (cMode == nullptr) {
IDDLOG("DataDistLogger: INFOLOGGER_MODE backend is not set.");
return false;
}

const std::string cModeStr = std::string(cMode);

if (cModeStr.length() == 0) {
WDDLOG("DataDistLogger: INFOLOGGER_MODE variable is empty.");
return false;
}

if (cModeStr != "infoLoggerD") {
EDDLOG("DataDistLogger: INFOLOGGER_MODE mode is not supported "
"(only infoLoggerD mode). INFOLOGGER_MODE={}", cModeStr);
return false;
}

IDDLOG("DataDistLogger: enabling InfoLogger in infoLoggerD mode.");

return true;
}


static void handleSeverity(const std::string &pOptKey, const std::string &pOptVal)
{
if (pOptKey == "severity") {
// we never allow FairMQ console or file backends!
fair::Logger::SetFileSeverity(DataDistSeverity::nolog);
fair::Logger::SetConsoleSeverity(fair::Severity::nolog);

// set the DD log console severity
if (fair::Logger::fSeverityMap.count(pOptVal)) {
const auto newLevel = fair::Logger::fSeverityMap.at(pOptVal);
if (newLevel == fair::Severity::nolog) {
DataDistLogger::sConfigSeverity = fair::Severity::fatal;
} else {
DataDistLogger::sConfigSeverity = newLevel;
}
}

} else if (pOptKey == "severity-infologger") {
// check the InfoLogger mode. Only infoLoggerD is supported.
if(!checkInfoLoggerOptions()) {
WDDLOG("DataDistLogger: Invalid INFOLOGGER_MODE. Ignoring severity-infologger={}",
pOptVal);
DataDistLogger::sInfologgerSeverity = DataDistSeverity::nolog;
DataDistLogger::sInfologgerEnabled = false;
return;
}

// set the InfoLogger log severity
if (fair::Logger::fSeverityMap.count(pOptVal)) {
const auto newLevel = fair::Logger::fSeverityMap.at(pOptVal);
if (newLevel == fair::Severity::nolog) {
DataDistLogger::sInfologgerSeverity = fair::Severity::fatal;
DataDistLogger::sInfologgerEnabled = false;
} else {
DataDistLogger::sInfologgerSeverity = newLevel;
DataDistLogger::sInfologgerEnabled = true;
impl::DataDistLoggerCtx::InitInfoLogger();
}
}
} else if (pOptKey == "severity-file") {
EDDLOG("DataDistLogger: FMQ File logger is not supported.");
}
}

static auto handleRunNumber(const std::string &pOptKey, const std::string &pOptVal)
{
if (pOptKey == "runNumber") {
IDDLOG("NEW RUN NUMBER. run_number={}", pOptVal);
DataDistLogger::sRunNumberStr = pOptVal;
impl::DataDistLoggerCtx::InitInfoLogger();
}
}


void HandleFMQOptions(fair::mq::DeviceRunner &pFMQRunner)
{
fair::mq::ProgOptions& lFMQConfig = pFMQRunner.fConfig;

// disable fairlogger file backend
lFMQConfig.SetProperty<std::string>("file-severity", "nolog");
lFMQConfig.SetProperty<std::string>("log-to-file", "");

try {
pFMQRunner.UnsubscribeFromConfigChange();
} catch(...) { }

// subscribe to notifications
lFMQConfig.Subscribe<std::string>("dd-log-config", handleSeverity);
lFMQConfig.Subscribe<std::string>("run-number-config", handleRunNumber);

// read and apply the current value
handleSeverity("severity", lFMQConfig.GetProperty<std::string>("severity"));

// read and apply the current value
handleSeverity("severity-infologger", lFMQConfig.GetProperty<std::string>("severity-infologger"));

// set a degfault run number
handleRunNumber("runNumber", "0");
}


}
Loading

0 comments on commit c40b67f

Please sign in to comment.