Skip to content

Commit

Permalink
stfsender,tfbuilder: implement datadist controller rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann committed Feb 14, 2021
1 parent 3b3936b commit cc9282c
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 114 deletions.
45 changes: 30 additions & 15 deletions src/StfSender/StfSenderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ void StfSenderDevice::InitTask()

auto& lStatus = mDiscoveryConfig->status();
lStatus.mutable_info()->set_type(StfSender);
lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig()));
lStatus.mutable_info()->set_process_id(Config::getIdOption(StfSender, *GetConfig()));
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig()));
lStatus.mutable_partition()->set_partition_id(Config::getPartitionOption(*GetConfig()));

// wait for "partition-id"
while (!Config::getPartitionOption(*GetConfig())) {
WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter.");
std::this_thread::sleep_for(1s);
}

lStatus.mutable_partition()->set_partition_id(*Config::getPartitionOption(*GetConfig()));
mDiscoveryConfig->write();
}

Expand Down Expand Up @@ -94,13 +101,12 @@ void StfSenderDevice::InitTask()
if (mStandalone && !mFileSink.enabled()) {
WDDLOG("Running in standalone mode and with STF file sink disabled. Data will be lost.");
}

// Info thread
mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this);
}

void StfSenderDevice::PreRun()
{
mRunning = true;

if (!mStandalone) {
// Start output handler
mOutputHandler.start(mDiscoveryConfig);
Expand All @@ -126,15 +132,21 @@ void StfSenderDevice::PreRun()
if (mFileSink.enabled()) {
mFileSink.start();
}

// Info thread
mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this);

// start the receiver thread
mReceiverThread = create_thread_member("stfs_recv", &StfSenderDevice::StfReceiverThread, this);
}

void StfSenderDevice::ResetTask()
void StfSenderDevice::PostRun()
{
// Stop the pipeline
stopPipeline();

mRunning = false;

// stop the receiver thread
if (mReceiverThread.joinable()) {
mReceiverThread.join();
Expand All @@ -161,6 +173,11 @@ void StfSenderDevice::ResetTask()
mInfoThread.join();
}

DDDLOG("PostRun() done.");
}

void StfSenderDevice::ResetTask()
{
DDDLOG("ResetTask() done.");
}

Expand All @@ -175,12 +192,9 @@ void StfSenderDevice::StfReceiverThread()
DplToStfAdapter lStfReceiver;
std::unique_ptr<SubTimeFrame> lStf;

// wait for the device to go into RUNNING state
WaitForRunningState();

const auto lStfStartTime = hres_clock::now();

while (IsRunningState()) {
while (mRunning) {
lStf = lStfReceiver.deserialize(lInputChan);

if (!lStf) {
Expand Down Expand Up @@ -211,10 +225,7 @@ void StfSenderDevice::StfReceiverThread()

void StfSenderDevice::InfoThread()
{
// wait for the device to go into RUNNING state
WaitForRunningState();

while (IsRunningState()) {
while (mRunning) {
IDDLOG("SubTimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}",
mStfSizeSamples.Mean(), mStfTimeSamples.MeanStepFreq(), mNumStfs);

Expand All @@ -225,8 +236,12 @@ void StfSenderDevice::InfoThread()

bool StfSenderDevice::ConditionalRun()
{
if (mRpcServer.isTerminateRequested()) {
IDDLOG_RL(10000, "DataDistribution partition is terminated.");
return false; // trigger PostRun()
}
// nothing to do here sleep for awhile
std::this_thread::sleep_for(1000ms);
std::this_thread::sleep_for(300ms);
return true;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/StfSender/StfSenderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class StfSenderDevice : public DataDistDevice,

protected:
void PreRun() final;
void PostRun() final {};
void PostRun() final;
bool ConditionalRun() final;

void StfReceiverThread();
Expand Down Expand Up @@ -156,6 +156,7 @@ class StfSenderDevice : public DataDistDevice,
TfSchedulerRpcClient mTfSchedulerRpcClient;

/// Receiver threads
bool mRunning = false;
std::thread mReceiverThread;

/// File sink
Expand Down
20 changes: 18 additions & 2 deletions src/StfSender/StfSenderRpc.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ ::grpc::Status StfSenderRpcImpl::ConnectTfBuilderRequest(::grpc::ServerContext*
const std::string lTfBuilderId = request->tf_builder_id();
const std::string lTfBuilderEndpoint = request->endpoint();

if (mTerminateRequested) {
response->set_status(TfBuilderConnectionStatus::ERROR_PARTITION_TERMINATING);
return Status::OK;
}

// handle the request
DDDLOG("Requested to connect to TfBuilder. tfb_id={} tfb_ep={}", lTfBuilderId, lTfBuilderEndpoint);
response->set_status(OK);
Expand All @@ -80,7 +85,6 @@ ::grpc::Status StfSenderRpcImpl::DisconnectTfBuilderRequest(::grpc::ServerContex
const TfBuilderEndpoint* request,
StatusResponse* response)
{

const std::string lTfBuilderId = request->tf_builder_id();
const std::string lTfBuilderEndpoint = request->endpoint();

Expand All @@ -99,12 +103,24 @@ ::grpc::Status StfSenderRpcImpl::StfDataRequest(::grpc::ServerContext* /*context
const StfDataRequestMessage* request,
StfDataResponse* response)
{

mOutput->sendStfToTfBuilder(request->stf_id(), request->tf_builder_id(), *response/*out*/);

return Status::OK;
}

// rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { }
::grpc::Status StfSenderRpcImpl::TerminatePartition(::grpc::ServerContext* /*context*/,
const PartitionInfo* /*request*/, PartitionResponse* response)
{
DDDLOG("TerminatePartition request received.");
// TODO: verify partition id
response->set_partition_state(PartitionState::PARTITION_TERMINATING);

mTerminateRequested = true;

return Status::OK;
}


}
} /* o2::DataDistribution */
8 changes: 8 additions & 0 deletions src/StfSender/StfSenderRpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,18 @@ class StfSenderRpcImpl final : public StfSenderRpc::Service
const StfDataRequestMessage* request,
StfDataResponse* response) override;

// rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { }
::grpc::Status TerminatePartition(::grpc::ServerContext* context,
const PartitionInfo* request,
PartitionResponse* response) override;

void start(StfSenderOutput *pOutput, const std::string pRpcSrvBindIp, int& lRealPort /*[out]*/);
void stop();

bool isTerminateRequested() const { return mTerminateRequested; }

private:
bool mTerminateRequested = false;
std::unique_ptr<Server> mServer = nullptr;
StfSenderOutput *mOutput = nullptr;

Expand Down
13 changes: 13 additions & 0 deletions src/StfSender/runStfSenderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ int main(int argc, char* argv[])

// Install listener for Logging options
o2::DataDistribution::impl::DataDistLoggerCtx::HandleFMQOptions(r);

// Install listener for discovery partition key
r.fConfig.Subscribe<std::string>("discovery-partition", [&](const std::string& pKey, std::string pValue) {

if (pKey == "partition_id" || pKey == "partition-id" || pKey == "environment-id" || pKey == "environment_id") {

if (r.fConfig.GetProperty<std::string>("discovery-partition") == "") {
r.fConfig.SetProperty<std::string>("discovery-partition", pValue);
IDDLOG("Config::Subscribe received key-value pair. {}=<{}>", pKey, pValue);
}
}
});

// reset unsupported options
r.fConfig.SetProperty<int>("io-threads", (int) std::min(std::thread::hardware_concurrency(), 16u));
r.fConfig.SetProperty<float>("rate", 0.f);
Expand Down
58 changes: 40 additions & 18 deletions src/TfBuilder/TfBuilderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void TfBuilderDevice::Init()

void TfBuilderDevice::Reset()
{
mMemI->stop();
mMemI.reset();
}

Expand All @@ -69,10 +70,15 @@ void TfBuilderDevice::InitTask()

auto &lStatus = mDiscoveryConfig->status();
lStatus.mutable_info()->set_type(TfBuilder);
lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig()));
lStatus.mutable_info()->set_process_id(Config::getIdOption(TfBuilder, *GetConfig()));
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig()));

mPartitionId = Config::getPartitionOption(*GetConfig());
// wait for "partition-id"
while (!Config::getPartitionOption(*GetConfig())) {
WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter.");
std::this_thread::sleep_for(1s);
}
mPartitionId = *Config::getPartitionOption(*GetConfig());
lStatus.mutable_partition()->set_partition_id(mPartitionId);

// File sink
Expand Down Expand Up @@ -110,15 +116,14 @@ void TfBuilderDevice::InitTask()
mStandalone = true;
IDDLOG("Not sending to DPL.");
}

// start the info thread
mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this);
}
}

void TfBuilderDevice::PreRun()
{
start();
if (!start()) {
mShouldExit = true;
}
}

bool TfBuilderDevice::start()
Expand All @@ -136,7 +141,6 @@ bool TfBuilderDevice::start()

// we reached the scheduler instance, initialize everything else
mRunning = true;
auto lShmTransport = this->AddTransport(fair::mq::Transport::SHM);

mTfBuilder = std::make_unique<TimeFrameBuilder>(MemI(), mTfBufferSize, 512 << 20 /* config */, dplEnabled());

Expand All @@ -154,19 +158,23 @@ bool TfBuilderDevice::start()
if (!mFlpInputHandler->start(mDiscoveryConfig)) {
mShouldExit = true;
EDDLOG("Could not initialize input connections. Exiting.");
throw "Input connection error";
return false;
}

// start file source
mFileSource.start(MemI(), mStandalone ? false : mDplEnabled);

// start the info thread
mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this);

return true;
}

void TfBuilderDevice::stop()
{
mRpc->stopAcceptingTfs();
if (mRpc) {
mRpc->stopAcceptingTfs();
}

if (mTfDplAdapter) {
mTfDplAdapter->stop();
Expand All @@ -177,41 +185,58 @@ void TfBuilderDevice::stop()
}

mRunning = false;
DDDLOG("TfBuilderDevice::stop(): mRunning is false.");

// Stop the pipeline
stopPipeline();

// stop output handlers
mFlpInputHandler->stop(mDiscoveryConfig);
if (mFlpInputHandler) {
mFlpInputHandler->stop(mDiscoveryConfig);
}
DDDLOG("TfBuilderDevice::stop(): Input handler stopped.");
// signal and wait for the output thread
mFileSink.stop();
// join on fwd thread
if (mTfFwdThread.joinable()) {
mTfFwdThread.join();
}
DDDLOG("TfBuilderDevice::stop(): Forward thread stopped.");

//wait for the info thread
if (mInfoThread.joinable()) {
mInfoThread.join();
}
DDDLOG("TfBuilderDevice::stop(): Info thread stopped.");

// stop the RPCs
mRpc->stop();
if (mRpc) {
mRpc->stop();
}
DDDLOG("TfBuilderDevice::stop(): RPC clients stopped.");

mDiscoveryConfig.reset();

DDDLOG("Reset() done... ");
DDDLOG("TfBuilderDevice() stopped... ");
}

void TfBuilderDevice::ResetTask()
{
stop();
DDDLOG("ResetTask()");
}

// Get here when ConditionalRun returns false
void TfBuilderDevice::PostRun()
{
stop();
DDDLOG("PostRun()");
}

bool TfBuilderDevice::ConditionalRun()
{
if (mShouldExit) {
mRunning = false;
if (mShouldExit || mRpc->isTerminateRequested()) {
// mRunning = false;
return false;
}

Expand Down Expand Up @@ -279,10 +304,7 @@ void TfBuilderDevice::TfForwardThread()

void TfBuilderDevice::InfoThread()
{
// wait for the device to go into RUNNING state
WaitForRunningState();

while (IsRunningState()) {
while (mRunning) {

IDDLOG("TimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}",
mTfSizeSamples.Mean(), mTfTimeSamples.MeanStepFreq(), getPipelineSize());
Expand Down
2 changes: 2 additions & 0 deletions src/TfBuilder/TfBuilderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class TfBuilderDevice : public DataDistDevice,
bool start();
void stop();

void PostRun() override final;

void Init() override final;
void Reset() override final;

Expand Down
7 changes: 6 additions & 1 deletion src/TfBuilder/TfBuilderInput.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ bool TfBuilderInput::start(std::shared_ptr<ConsulTfBuilder> pConfig)
continue;
}

if (lConnResult.status() != 0) {
if (lConnResult.status() == ERROR_PARTITION_TERMINATING) {
WDDLOG("Partition is terminating. Stopping.");
return false;
}

if (lConnResult.status() != OK) {
EDDLOG("Request for StfSender connection failed. scheduler_error={}",
TfBuilderConnectionStatus_Name(lConnResult.status()));
return false;
Expand Down
Loading

0 comments on commit cc9282c

Please sign in to comment.