Skip to content

Commit

Permalink
stfbuilder: add partition id parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann committed Jul 27, 2021
1 parent 0934b4f commit 5552dda
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/StfBuilder/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ set(EXE_STFB_SOURCES
)

add_library(StfBuilder_lib OBJECT ${EXE_STFB_SOURCES})
target_link_libraries(StfBuilder_lib base fmqtools common monitoring)
target_link_libraries(StfBuilder_lib base fmqtools common discovery monitoring)

add_executable(StfBuilder)

Expand All @@ -21,7 +21,7 @@ endif()
target_link_libraries(StfBuilder
PRIVATE
StfBuilder_lib
base fmqtools common monitoring
base fmqtools common discovery monitoring
)

install(TARGETS StfBuilder RUNTIME DESTINATION bin)
5 changes: 5 additions & 0 deletions src/StfBuilder/StfBuilderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <SubTimeFrameDataModel.h>
#include <SubTimeFrameDPL.h>
#include <Utilities.h>
#include <ConfigConsul.h>

#include <options/FairMQProgOptions.h>
#include <Framework/SourceInfoHeader.h>
Expand All @@ -29,6 +30,7 @@
#include <exception>
#include <boost/algorithm/string.hpp>


namespace o2::DataDistribution
{

Expand Down Expand Up @@ -96,6 +98,9 @@ void StfBuilderDevice::InitTask()
I().mMaxStfsInPipeline = GetConfig()->GetValue<std::int64_t>(OptionKeyMaxBufferedStfs);
I().mMaxBuiltStfs = GetConfig()->GetValue<std::uint64_t>(OptionKeyMaxBuiltStfs);

// partition id is used for monitoring.
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("-");

// start monitoring
DataDistMonitor::start_datadist(o2::monitoring::tags::Value::StfBuilder, GetConfig()->GetProperty<std::string>("monitoring-backend", ""));
DataDistMonitor::set_interval(GetConfig()->GetValue<float>("monitoring-interval"));
Expand Down
1 change: 1 addition & 0 deletions src/StfBuilder/StfBuilderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class StfBuilderDevice : public DataDistDevice
: IFifoPipeline(eStfPipelineSize) {}

/// config
std::string mPartitionId;
std::string mInputChannelName;
std::string mOutputChannelName;
std::string mDplChannelName;
Expand Down
3 changes: 3 additions & 0 deletions src/StfBuilder/runStfBuilderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <SubTimeFrameFileSink.h>
#include <SubTimeFrameFileSource.h>
#include <FmqUtilities.h>
#include <Config.h>

#include <fairmq/DeviceRunner.h>

Expand Down Expand Up @@ -90,6 +91,8 @@ int main(int argc, char* argv[])
r.fConfig.AddToCmdLineOptions(o2::DataDistribution::SubTimeFrameFileSink::getProgramOptions());
// Add options for STF file source
r.fConfig.AddToCmdLineOptions(o2::DataDistribution::SubTimeFrameFileSource::getProgramOptions());
// Add options for Data Distribution discovery
r.fConfig.AddToCmdLineOptions(o2::DataDistribution::Config::getProgramOptionsStfBuilder());

});

Expand Down
4 changes: 3 additions & 1 deletion src/StfSender/StfSenderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ void StfSenderDevice::InitTask()
DataDistMonitor::set_interval(GetConfig()->GetValue<float>("monitoring-interval"));
DataDistMonitor::set_log(GetConfig()->GetValue<bool>("monitoring-log"));

// partition id is used for monitoring.
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("-");

if (!standalone()) {
// Discovery
I().mDiscoveryConfig = std::make_shared<ConsulStfSender>(ProcessType::StfSender, Config::getEndpointOption(*GetConfig()));
Expand All @@ -86,7 +89,6 @@ void StfSenderDevice::InitTask()
std::this_thread::sleep_for(1s);
}

I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("");
if (I().mPartitionId.empty()) {
WDDLOG("StfSender 'discovery-partition' parameter not set.");
std::this_thread::sleep_for(1s); exit(-1);
Expand Down
18 changes: 18 additions & 0 deletions src/common/discovery/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ class Config {
return lDataDistDiscovery;
}

static
boost::program_options::options_description getProgramOptionsStfBuilder()
{
boost::program_options::options_description lDataDistDiscovery("DataDistribution discovery options", 120);

lDataDistDiscovery.add_options()(
OptionKeyDiscoveryEndpoint,
boost::program_options::value<std::string>()->default_value(""),
"Specifies URL of the DataDistribution discovery endpoint.");

lDataDistDiscovery.add_options()(
OptionKeyDiscoveryPartition,
boost::program_options::value<std::string>()->default_value(""),
"Specifies partition ID for the DataDistribution discovery.");

return lDataDistDiscovery;
}

static
std::string getNetworkIfAddressOption(const FairMQProgOptions& pFMQProgOpt)
{
Expand Down

0 comments on commit 5552dda

Please sign in to comment.