diff --git a/src/StfSender/StfSenderDevice.cxx b/src/StfSender/StfSenderDevice.cxx index 98cf51d..07c2f75 100644 --- a/src/StfSender/StfSenderDevice.cxx +++ b/src/StfSender/StfSenderDevice.cxx @@ -58,9 +58,16 @@ void StfSenderDevice::InitTask() auto& lStatus = mDiscoveryConfig->status(); lStatus.mutable_info()->set_type(StfSender); - lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig())); + lStatus.mutable_info()->set_process_id(Config::getIdOption(StfSender, *GetConfig())); lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig())); - lStatus.mutable_partition()->set_partition_id(Config::getPartitionOption(*GetConfig())); + + // wait for "partition-id" + while (!Config::getPartitionOption(*GetConfig())) { + WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter."); + std::this_thread::sleep_for(1s); + } + + lStatus.mutable_partition()->set_partition_id(*Config::getPartitionOption(*GetConfig())); mDiscoveryConfig->write(); } @@ -94,13 +101,12 @@ void StfSenderDevice::InitTask() if (mStandalone && !mFileSink.enabled()) { WDDLOG("Running in standalone mode and with STF file sink disabled. Data will be lost."); } - - // Info thread - mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this); } void StfSenderDevice::PreRun() { + mRunning = true; + if (!mStandalone) { // Start output handler mOutputHandler.start(mDiscoveryConfig); @@ -126,15 +132,21 @@ void StfSenderDevice::PreRun() if (mFileSink.enabled()) { mFileSink.start(); } + + // Info thread + mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this); + // start the receiver thread mReceiverThread = create_thread_member("stfs_recv", &StfSenderDevice::StfReceiverThread, this); } -void StfSenderDevice::ResetTask() +void StfSenderDevice::PostRun() { // Stop the pipeline stopPipeline(); + mRunning = false; + // stop the receiver thread if (mReceiverThread.joinable()) { mReceiverThread.join(); @@ -161,6 +173,11 @@ void StfSenderDevice::ResetTask() mInfoThread.join(); } + DDDLOG("PostRun() done."); +} + +void StfSenderDevice::ResetTask() +{ DDDLOG("ResetTask() done."); } @@ -175,12 +192,9 @@ void StfSenderDevice::StfReceiverThread() DplToStfAdapter lStfReceiver; std::unique_ptr lStf; - // wait for the device to go into RUNNING state - WaitForRunningState(); - const auto lStfStartTime = hres_clock::now(); - while (IsRunningState()) { + while (mRunning) { lStf = lStfReceiver.deserialize(lInputChan); if (!lStf) { @@ -211,10 +225,7 @@ void StfSenderDevice::StfReceiverThread() void StfSenderDevice::InfoThread() { - // wait for the device to go into RUNNING state - WaitForRunningState(); - - while (IsRunningState()) { + while (mRunning) { IDDLOG("SubTimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}", mStfSizeSamples.Mean(), mStfTimeSamples.MeanStepFreq(), mNumStfs); @@ -225,8 +236,12 @@ void StfSenderDevice::InfoThread() bool StfSenderDevice::ConditionalRun() { + if (mRpcServer.isTerminateRequested()) { + IDDLOG_RL(10000, "DataDistribution partition is terminated."); + return false; // trigger PostRun() + } // nothing to do here sleep for awhile - std::this_thread::sleep_for(1000ms); + std::this_thread::sleep_for(300ms); return true; } } diff --git a/src/StfSender/StfSenderDevice.h b/src/StfSender/StfSenderDevice.h index 0d426e0..de68315 100644 --- a/src/StfSender/StfSenderDevice.h +++ b/src/StfSender/StfSenderDevice.h @@ -76,7 +76,7 @@ class StfSenderDevice : public DataDistDevice, protected: void PreRun() final; - void PostRun() final {}; + void PostRun() final; bool ConditionalRun() final; void StfReceiverThread(); @@ -156,6 +156,7 @@ class StfSenderDevice : public DataDistDevice, TfSchedulerRpcClient mTfSchedulerRpcClient; /// Receiver threads + bool mRunning = false; std::thread mReceiverThread; /// File sink diff --git a/src/StfSender/StfSenderRpc.cxx b/src/StfSender/StfSenderRpc.cxx index 493e6f3..9827e40 100644 --- a/src/StfSender/StfSenderRpc.cxx +++ b/src/StfSender/StfSenderRpc.cxx @@ -56,6 +56,11 @@ ::grpc::Status StfSenderRpcImpl::ConnectTfBuilderRequest(::grpc::ServerContext* const std::string lTfBuilderId = request->tf_builder_id(); const std::string lTfBuilderEndpoint = request->endpoint(); + if (mTerminateRequested) { + response->set_status(TfBuilderConnectionStatus::ERROR_PARTITION_TERMINATING); + return Status::OK; + } + // handle the request DDDLOG("Requested to connect to TfBuilder. tfb_id={} tfb_ep={}", lTfBuilderId, lTfBuilderEndpoint); response->set_status(OK); @@ -80,7 +85,6 @@ ::grpc::Status StfSenderRpcImpl::DisconnectTfBuilderRequest(::grpc::ServerContex const TfBuilderEndpoint* request, StatusResponse* response) { - const std::string lTfBuilderId = request->tf_builder_id(); const std::string lTfBuilderEndpoint = request->endpoint(); @@ -99,12 +103,24 @@ ::grpc::Status StfSenderRpcImpl::StfDataRequest(::grpc::ServerContext* /*context const StfDataRequestMessage* request, StfDataResponse* response) { - mOutput->sendStfToTfBuilder(request->stf_id(), request->tf_builder_id(), *response/*out*/); return Status::OK; } +// rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { } +::grpc::Status StfSenderRpcImpl::TerminatePartition(::grpc::ServerContext* /*context*/, + const PartitionInfo* /*request*/, PartitionResponse* response) +{ + DDDLOG("TerminatePartition request received."); + // TODO: verify partition id + response->set_partition_state(PartitionState::PARTITION_TERMINATING); + + mTerminateRequested = true; + + return Status::OK; +} + } } /* o2::DataDistribution */ diff --git a/src/StfSender/StfSenderRpc.h b/src/StfSender/StfSenderRpc.h index 64c46f5..da52312 100644 --- a/src/StfSender/StfSenderRpc.h +++ b/src/StfSender/StfSenderRpc.h @@ -62,10 +62,18 @@ class StfSenderRpcImpl final : public StfSenderRpc::Service const StfDataRequestMessage* request, StfDataResponse* response) override; + // rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { } + ::grpc::Status TerminatePartition(::grpc::ServerContext* context, + const PartitionInfo* request, + PartitionResponse* response) override; + void start(StfSenderOutput *pOutput, const std::string pRpcSrvBindIp, int& lRealPort /*[out]*/); void stop(); + bool isTerminateRequested() const { return mTerminateRequested; } + private: + bool mTerminateRequested = false; std::unique_ptr mServer = nullptr; StfSenderOutput *mOutput = nullptr; diff --git a/src/StfSender/runStfSenderDevice.cxx b/src/StfSender/runStfSenderDevice.cxx index 382907c..2e6e003 100644 --- a/src/StfSender/runStfSenderDevice.cxx +++ b/src/StfSender/runStfSenderDevice.cxx @@ -70,6 +70,19 @@ int main(int argc, char* argv[]) // Install listener for Logging options o2::DataDistribution::impl::DataDistLoggerCtx::HandleFMQOptions(r); + + // Install listener for discovery partition key + r.fConfig.Subscribe("discovery-partition", [&](const std::string& pKey, std::string pValue) { + + if (pKey == "partition_id" || pKey == "partition-id" || pKey == "environment-id" || pKey == "environment_id") { + + if (r.fConfig.GetProperty("discovery-partition") == "") { + r.fConfig.SetProperty("discovery-partition", pValue); + IDDLOG("Config::Subscribe received key-value pair. {}=<{}>", pKey, pValue); + } + } + }); + // reset unsupported options r.fConfig.SetProperty("io-threads", (int) std::min(std::thread::hardware_concurrency(), 16u)); r.fConfig.SetProperty("rate", 0.f); diff --git a/src/TfBuilder/TfBuilderDevice.cxx b/src/TfBuilder/TfBuilderDevice.cxx index aa1e714..f2f0276 100644 --- a/src/TfBuilder/TfBuilderDevice.cxx +++ b/src/TfBuilder/TfBuilderDevice.cxx @@ -51,6 +51,7 @@ void TfBuilderDevice::Init() void TfBuilderDevice::Reset() { + mMemI->stop(); mMemI.reset(); } @@ -69,10 +70,15 @@ void TfBuilderDevice::InitTask() auto &lStatus = mDiscoveryConfig->status(); lStatus.mutable_info()->set_type(TfBuilder); - lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig())); + lStatus.mutable_info()->set_process_id(Config::getIdOption(TfBuilder, *GetConfig())); lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig())); - mPartitionId = Config::getPartitionOption(*GetConfig()); + // wait for "partition-id" + while (!Config::getPartitionOption(*GetConfig())) { + WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter."); + std::this_thread::sleep_for(1s); + } + mPartitionId = *Config::getPartitionOption(*GetConfig()); lStatus.mutable_partition()->set_partition_id(mPartitionId); // File sink @@ -110,15 +116,14 @@ void TfBuilderDevice::InitTask() mStandalone = true; IDDLOG("Not sending to DPL."); } - - // start the info thread - mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this); } } void TfBuilderDevice::PreRun() { - start(); + if (!start()) { + mShouldExit = true; + } } bool TfBuilderDevice::start() @@ -136,7 +141,6 @@ bool TfBuilderDevice::start() // we reached the scheduler instance, initialize everything else mRunning = true; - auto lShmTransport = this->AddTransport(fair::mq::Transport::SHM); mTfBuilder = std::make_unique(MemI(), mTfBufferSize, 512 << 20 /* config */, dplEnabled()); @@ -154,19 +158,23 @@ bool TfBuilderDevice::start() if (!mFlpInputHandler->start(mDiscoveryConfig)) { mShouldExit = true; EDDLOG("Could not initialize input connections. Exiting."); - throw "Input connection error"; return false; } // start file source mFileSource.start(MemI(), mStandalone ? false : mDplEnabled); + // start the info thread + mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this); + return true; } void TfBuilderDevice::stop() { - mRpc->stopAcceptingTfs(); + if (mRpc) { + mRpc->stopAcceptingTfs(); + } if (mTfDplAdapter) { mTfDplAdapter->stop(); @@ -177,41 +185,58 @@ void TfBuilderDevice::stop() } mRunning = false; + DDDLOG("TfBuilderDevice::stop(): mRunning is false."); // Stop the pipeline stopPipeline(); // stop output handlers - mFlpInputHandler->stop(mDiscoveryConfig); + if (mFlpInputHandler) { + mFlpInputHandler->stop(mDiscoveryConfig); + } + DDDLOG("TfBuilderDevice::stop(): Input handler stopped."); // signal and wait for the output thread mFileSink.stop(); // join on fwd thread if (mTfFwdThread.joinable()) { mTfFwdThread.join(); } + DDDLOG("TfBuilderDevice::stop(): Forward thread stopped."); //wait for the info thread if (mInfoThread.joinable()) { mInfoThread.join(); } + DDDLOG("TfBuilderDevice::stop(): Info thread stopped."); // stop the RPCs - mRpc->stop(); + if (mRpc) { + mRpc->stop(); + } + DDDLOG("TfBuilderDevice::stop(): RPC clients stopped."); mDiscoveryConfig.reset(); - DDDLOG("Reset() done... "); + DDDLOG("TfBuilderDevice() stopped... "); } void TfBuilderDevice::ResetTask() { stop(); + DDDLOG("ResetTask()"); +} + +// Get here when ConditionalRun returns false +void TfBuilderDevice::PostRun() +{ + stop(); + DDDLOG("PostRun()"); } bool TfBuilderDevice::ConditionalRun() { - if (mShouldExit) { - mRunning = false; + if (mShouldExit || mRpc->isTerminateRequested()) { + // mRunning = false; return false; } @@ -279,10 +304,7 @@ void TfBuilderDevice::TfForwardThread() void TfBuilderDevice::InfoThread() { - // wait for the device to go into RUNNING state - WaitForRunningState(); - - while (IsRunningState()) { + while (mRunning) { IDDLOG("TimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}", mTfSizeSamples.Mean(), mTfTimeSamples.MeanStepFreq(), getPipelineSize()); diff --git a/src/TfBuilder/TfBuilderDevice.h b/src/TfBuilder/TfBuilderDevice.h index b959628..664f9b2 100644 --- a/src/TfBuilder/TfBuilderDevice.h +++ b/src/TfBuilder/TfBuilderDevice.h @@ -70,6 +70,8 @@ class TfBuilderDevice : public DataDistDevice, bool start(); void stop(); + void PostRun() override final; + void Init() override final; void Reset() override final; diff --git a/src/TfBuilder/TfBuilderInput.cxx b/src/TfBuilder/TfBuilderInput.cxx index 4e99801..0f9fe53 100644 --- a/src/TfBuilder/TfBuilderInput.cxx +++ b/src/TfBuilder/TfBuilderInput.cxx @@ -120,7 +120,12 @@ bool TfBuilderInput::start(std::shared_ptr pConfig) continue; } - if (lConnResult.status() != 0) { + if (lConnResult.status() == ERROR_PARTITION_TERMINATING) { + WDDLOG("Partition is terminating. Stopping."); + return false; + } + + if (lConnResult.status() != OK) { EDDLOG("Request for StfSender connection failed. scheduler_error={}", TfBuilderConnectionStatus_Name(lConnResult.status())); return false; diff --git a/src/TfBuilder/TfBuilderRpc.cxx b/src/TfBuilder/TfBuilderRpc.cxx index 64b1cc4..70ae8f6 100644 --- a/src/TfBuilder/TfBuilderRpc.cxx +++ b/src/TfBuilder/TfBuilderRpc.cxx @@ -41,6 +41,7 @@ void TfBuilderRpcImpl::initDiscovery(const std::string pRpcSrvBindIp, int &lReal bool TfBuilderRpcImpl::start(const std::uint64_t pBufferSize) { mCurrentTfBufferSize = pBufferSize; + mTerminateRequested = false; // Interact with the scheduler if (!mTfSchedulerRpcClient.start(mDiscoveryConfig)) { @@ -114,9 +115,14 @@ void TfBuilderRpcImpl::UpdateSendingThread() DDDLOG("Starting TfBuilder Update sending thread."); while (mRunning) { - std::unique_lock lLock(mUpdateLock); - sendTfBuilderUpdate(); - mUpdateCondition.wait_for(lLock, 500ms); + if (!mTerminateRequested) { + std::unique_lock lLock(mUpdateLock); + sendTfBuilderUpdate(); + mUpdateCondition.wait_for(lLock, 500ms); + } else { + stopAcceptingTfs(); + std::this_thread::sleep_for(1s); + } } // send disconnect update @@ -274,7 +280,7 @@ bool TfBuilderRpcImpl::recordTfForwarded(const std::uint64_t &pTfId) ::grpc::Status TfBuilderRpcImpl::BuildTfRequest(::grpc::ServerContext* /*context*/, const TfBuildingInformation* request, BuildTfResponse* response) { - if(!mRunning) { + if (!mRunning || mTerminateRequested) { response->set_status(BuildTfResponse::ERROR_NOT_RUNNING); return ::grpc::Status::OK; } @@ -287,8 +293,7 @@ ::grpc::Status TfBuilderRpcImpl::BuildTfRequest(::grpc::ServerContext* /*context std::scoped_lock lLock(mTfIdSizesLock); if (lTfSize > mCurrentTfBufferSize) { - EDDLOG( - "Request to build a TimeFrame: Not enough free memory! tf_id={} tf_size={} buffer_size={}", + EDDLOG("Request to build a TimeFrame: Not enough free memory! tf_id={} tf_size={} buffer_size={}", lTfId,lTfSize, mCurrentTfBufferSize); response->set_status(BuildTfResponse::ERROR_NOMEM); @@ -302,5 +307,16 @@ ::grpc::Status TfBuilderRpcImpl::BuildTfRequest(::grpc::ServerContext* /*context return ::grpc::Status::OK; } +::grpc::Status TfBuilderRpcImpl::TerminatePartition(::grpc::ServerContext* /*context*/, + const ::o2::DataDistribution::PartitionInfo* /*request*/, ::o2::DataDistribution::PartitionResponse* response) +{ + IDDLOG("TerminatePartition request received."); + mTerminateRequested = true; + + response->set_partition_state(PartitionState::PARTITION_TERMINATING); + + return ::grpc::Status::OK; +} + } } /* o2::DataDistribution */ diff --git a/src/TfBuilder/TfBuilderRpc.h b/src/TfBuilder/TfBuilderRpc.h index bc6ee97..d800708 100644 --- a/src/TfBuilder/TfBuilderRpc.h +++ b/src/TfBuilder/TfBuilderRpc.h @@ -78,13 +78,18 @@ class TfBuilderRpcImpl final : public TfBuilderRpc::Service StfSenderRpcClientCollection& StfSenderRpcClients() { return mStfSenderRpcClients; } + bool isTerminateRequested() const { return mTerminateRequested; } + // rpc BuildTfRequest(TfBuildingInformation) returns (BuildTfResponse) { } ::grpc::Status BuildTfRequest(::grpc::ServerContext* context, const TfBuildingInformation* request, BuildTfResponse* response) override; + ::grpc::Status TerminatePartition(::grpc::ServerContext* context, const ::o2::DataDistribution::PartitionInfo* request, ::o2::DataDistribution::PartitionResponse* response) override; + private: static constexpr const std::int64_t sMaxStfRequestsInFlight = 10; std::atomic_bool mRunning = false; + std::atomic_bool mTerminateRequested = false; std::atomic_bool mAcceptingTfs = false; std::mutex mUpdateLock; diff --git a/src/TfBuilder/runTfBuilderDevice.cxx b/src/TfBuilder/runTfBuilderDevice.cxx index d447976..8aaf696 100644 --- a/src/TfBuilder/runTfBuilderDevice.cxx +++ b/src/TfBuilder/runTfBuilderDevice.cxx @@ -72,6 +72,19 @@ int main(int argc, char* argv[]) // Install listener for Logging options o2::DataDistribution::impl::DataDistLoggerCtx::HandleFMQOptions(r); + + // Install listener for discovery partition key + r.fConfig.Subscribe("discovery-partition", [&](const std::string& pKey, std::string pValue) { + + if (pKey == "partition_id" || pKey == "partition-id" || pKey == "environment-id" || pKey == "environment_id") { + + if (r.fConfig.GetProperty("discovery-partition") == "") { + r.fConfig.SetProperty("discovery-partition", pValue); + IDDLOG("Config::Subscribe received key-value pair. {}=<{}>", pKey, pValue); + } + } + }); + // reset unsupported options r.fConfig.SetProperty("io-threads", (int) std::min(std::thread::hardware_concurrency(), 16u)); r.fConfig.SetProperty("rate", 0.f); diff --git a/src/common/base/DataDistLogger.h b/src/common/base/DataDistLogger.h index 587d3ec..b5d33cb 100644 --- a/src/common/base/DataDistLogger.h +++ b/src/common/base/DataDistLogger.h @@ -398,6 +398,10 @@ struct DataDistLoggerCtx { fair::mq::ProgOptions& lFMQConfig = pFMQRunner.fConfig; + try { + pFMQRunner.UnsubscribeFromConfigChange(); + } catch(...) { } + // disable fairlogger file backend lFMQConfig.SetProperty("file-severity", "nolog"); lFMQConfig.SetProperty("log-to-file", ""); diff --git a/src/common/discovery/Config.h b/src/common/discovery/Config.h index 6725ce7..5cf1692 100644 --- a/src/common/discovery/Config.h +++ b/src/common/discovery/Config.h @@ -206,43 +206,15 @@ class Config { } static - std::string getPartitionOption(const FairMQProgOptions& pFMQProgOpt) + std::optional getPartitionOption(const FairMQProgOptions& pFMQProgOpt) { // check cmdline first - { - std::string lPartId = pFMQProgOpt.GetValue(OptionKeyDiscoveryPartition); - if (!lPartId.empty()) { - IDDLOG("Parameter <{}> provided on command line. value={}", - OptionKeyDiscoveryPartition, lPartId); - return lPartId; - } - } - - // setup for ODC - std::promise mPartIdPromise; - std::future mPartIdFuture = mPartIdPromise.get_future(); - - pFMQProgOpt.Subscribe("o2.datadist", [&](const std::string& pKey, std::string pValue) { - IDDLOG("Config::Subscribe received key-value pair <{}>=<{}>", pKey, pValue); - - // partition key for tf builder - if (pKey == OptionKeyDiscoveryPartition) { - mPartIdPromise.set_value(pValue); - } else { - EDDLOG("Config::Subscribe unrecognized key value pushed {}={}", pKey, pValue); - } - }); - - // wait for an ODC value - mPartIdFuture.wait(); - const auto lDiscId = mPartIdFuture.get(); - if (lDiscId.length() == 0) { - EDDLOG("Parameter {} provided from ODC. zero length", Config::OptionKeyDiscoveryPartition); - throw std::invalid_argument(fmt::format("Invalid ODC parameter <{}>: zero length", Config::OptionKeyDiscoveryPartition)); + std::string lPartId = pFMQProgOpt.GetValue(OptionKeyDiscoveryPartition); + if (!lPartId.empty()) { + IDDLOG("Parameter <{}> provided on command line. value={}", OptionKeyDiscoveryPartition, lPartId); + return lPartId; } - - IDDLOG("Parameter {} provided from ODC. value={}", Config::OptionKeyDiscoveryPartition, lDiscId); - return lDiscId; + return std::nullopt; } Config() = delete; diff --git a/src/common/discovery/TfSchedulerRpcClient.cxx b/src/common/discovery/TfSchedulerRpcClient.cxx index 9250250..efb487e 100644 --- a/src/common/discovery/TfSchedulerRpcClient.cxx +++ b/src/common/discovery/TfSchedulerRpcClient.cxx @@ -23,7 +23,7 @@ namespace o2 namespace DataDistribution { -bool TfSchedulerRpcClient::is_ready() { +bool TfSchedulerRpcClient::is_ready() const { if (!mChannel) { return false; } @@ -39,13 +39,13 @@ void TfSchedulerRpcClient::updateTimeInformation(BasicInfo &pInfo) { // rpc NumStfSendersInPartitionRequest(google.protobuf.Empty) returns (NumStfSendersInPartitionResponse) { } bool TfSchedulerRpcClient::NumStfSendersInPartitionRequest(std::uint32_t &pNumStfSenders) { - if (!mStub) { + if (!mStub || !is_alive()) { EDDLOG_GRL(1000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); return false; } using namespace std::chrono_literals; - do { + while (is_alive()) { ClientContext lContext; pNumStfSenders = 0; @@ -57,16 +57,16 @@ bool TfSchedulerRpcClient::NumStfSendersInPartitionRequest(std::uint32_t &pNumSt pNumStfSenders = lRet.num_stf_senders(); return true; } - if (lStatus.error_code() == grpc::StatusCode::UNAVAILABLE) { - WDDLOG("NumStfSendersInPartitionRequest: Scheduler gRPC server UNAVAILABLE. Retrying..."); - std::this_thread::sleep_for(250ms); - continue; // retry + + if (!is_ready()) { + WDDLOG("NumStfSendersInPartitionRequest: Scheduler gRPC server is not ready. Retrying..."); + std::this_thread::sleep_for(250ms); + continue; // retry } EDDLOG_GRL(1000, "gRPC request error. code={} message={}", lStatus.error_code(), lStatus.error_message()); break; - - } while (true); + } return false; } @@ -74,44 +74,69 @@ bool TfSchedulerRpcClient::NumStfSendersInPartitionRequest(std::uint32_t &pNumSt // rpc TfBuilderConnectionRequest(TfBuilderConfigStatus) returns (TfBuilderConnectionResponse) { } bool TfSchedulerRpcClient::TfBuilderConnectionRequest(TfBuilderConfigStatus &pParam, TfBuilderConnectionResponse &pRet /*out*/) { - if (!mStub) { - EDDLOG_GRL(1000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); + if (!mStub || !is_alive()) { + EDDLOG_GRL(1000, "TfBuilderConnectionRequest: no gRPC connection to scheduler"); return false; } using namespace std::chrono_literals; - do { + while (is_alive()) { ClientContext lContext; // update timestamp updateTimeInformation(*pParam.mutable_info()); auto lStatus = mStub->TfBuilderConnectionRequest(&lContext, pParam, &pRet); + if (lStatus.ok()) { - return true; - } - if (lStatus.error_code() == grpc::StatusCode::UNAVAILABLE) { - std::this_thread::sleep_for(250ms); - continue; // retry + switch (pRet.status()) { + case TfBuilderConnectionStatus::OK: + break; + case TfBuilderConnectionStatus::ERROR_DISCOVERY: + break; + case TfBuilderConnectionStatus::ERROR_SOCKET_COUNT: + EDDLOG("TfBuilderConnectionRequest: TfBuilder socket count is not equal to number of StfSenders."); + break; + case TfBuilderConnectionStatus::ERROR_STF_SENDERS_NOT_READY: + break; + case TfBuilderConnectionStatus::ERROR_GRPC_STF_SENDER: + break; + case TfBuilderConnectionStatus::ERROR_GRPC_TF_BUILDER: + break; + case TfBuilderConnectionStatus::ERROR_STF_SENDER_CONNECTING: + break; + case TfBuilderConnectionStatus::ERROR_STF_SENDER_EXISTS: + EDDLOG("TfBuilderConnectionRequest: TfBuilder with the same id already registered with StfSenders."); + break; + case TfBuilderConnectionStatus::ERROR_PARTITION_TERMINATING: + EDDLOG("TfBuilderConnectionRequest: Partition is terminating."); + break; + default: + EDDLOG("TfBuilderConnectionRequest: Unknown error!"); + break; + } + + return true; } pRet.Clear(); - } while (false); - + std::this_thread::sleep_for(500ms); + } return false; } // rpc TfBuilderDisconnectionRequest(TfBuilderConfigStatus) returns (StatusResponse) { } bool TfSchedulerRpcClient::TfBuilderDisconnectionRequest(TfBuilderConfigStatus &pParam, StatusResponse &pRet /*out*/) { - if (!mStub) { + if (!mStub || !is_alive()) { EDDLOG_GRL(1000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); return false; } using namespace std::chrono_literals; - do { + + while(is_alive()) { ClientContext lContext; // update timestamp @@ -122,22 +147,17 @@ bool TfSchedulerRpcClient::TfBuilderDisconnectionRequest(TfBuilderConfigStatus & return true; } - if (lStatus.error_code() == grpc::StatusCode::UNAVAILABLE) { - std::this_thread::sleep_for(500ms); - continue; // retry - } - pRet.Clear(); - } while (false); - + std::this_thread::sleep_for(500ms); + } return false; } // rpc TfBuilderUpdate(TfBuilderUpdateMessage) returns (google.protobuf.Empty) { } bool TfSchedulerRpcClient::TfBuilderUpdate(TfBuilderUpdateMessage &pMsg) { - if (!mStub) { - EDDLOG_GRL(1000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); + if (!mStub || !is_alive()) { + EDDLOG_GRL(1000, "TfBuilderUpdate: no gRPC connection to scheduler"); return false; } @@ -154,16 +174,15 @@ bool TfSchedulerRpcClient::TfBuilderUpdate(TfBuilderUpdateMessage &pMsg) { return true; } - EDDLOG_GRL(1000, "gRPC: TfBuilderUpdate error. code={} message={}", - lStatus.error_code(), lStatus.error_message()); + EDDLOG_GRL(2000, "gRPC: TfBuilderUpdate error. code={} message={}", lStatus.error_code(), lStatus.error_message()); return false; } // rpc StfSenderStfUpdate(StfSenderStfInfo) returns (SchedulerStfInfoResponse) { } bool TfSchedulerRpcClient::StfSenderStfUpdate(StfSenderStfInfo &pMsg, SchedulerStfInfoResponse &pRet) { - if (!mStub) { - EDDLOG_GRL(1000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); + if (!mStub || !is_alive()) { + EDDLOG_GRL(2000, "NumStfSendersInPartitionRequest: no gRPC connection to scheduler"); return false; } @@ -177,8 +196,7 @@ bool TfSchedulerRpcClient::StfSenderStfUpdate(StfSenderStfInfo &pMsg, SchedulerS return true; } - EDDLOG_GRL(1000, "gRPC: StfSenderStfUpdate error. code={} message={}", - lStatus.error_code(), lStatus.error_message()); + EDDLOG_GRL(2000, "gRPC: StfSenderStfUpdate error. code={} message={}", lStatus.error_code(), lStatus.error_message()); return false; } diff --git a/src/common/discovery/TfSchedulerRpcClient.h b/src/common/discovery/TfSchedulerRpcClient.h index a5759cb..1a84359 100644 --- a/src/common/discovery/TfSchedulerRpcClient.h +++ b/src/common/discovery/TfSchedulerRpcClient.h @@ -66,6 +66,7 @@ class TfSchedulerRpcClient { void stop() { mTfSchedulerConf.Clear(); mStub.reset(nullptr); + mChannel.reset(); } void updateTimeInformation(BasicInfo &pInfo); @@ -87,7 +88,13 @@ class TfSchedulerRpcClient { std::string getEndpoint() { return mTfSchedulerConf.rpc_endpoint(); } - bool is_ready(); + bool is_ready() const; + bool is_alive() const { + if (mChannel) { + return (mChannel->GetState(true) != grpc_connectivity_state::GRPC_CHANNEL_SHUTDOWN); + } + return false; + } private: TfSchedulerInstanceConfigStatus mTfSchedulerConf;