Skip to content

Commit

Permalink
JsonRpcConnection: Log message processing stats
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Aug 29, 2024
1 parent be98b1b commit 8e7a3d1
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
38 changes: 36 additions & 2 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ void JsonRpcConnection::Start()

void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
namespace ch = std::chrono;

m_Stream->next_layer().SetSeen(&m_Seen);

for (;;) {
Expand All @@ -78,9 +80,37 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
m_Seen = Utility::GetTime();

try {
auto start (ch::steady_clock::now());
ch::steady_clock::duration cpuBoundDuration;

String rpcMethod;

Defer addLogStats ([this, &rpcMethod, &start, &cpuBoundDuration]() {
auto severity = LogDebug;

auto duration = ch::steady_clock::now() - start;
if (duration >= ch::seconds(5)) {
// Processing that RPC message seems to take an unexpectedly long time,
// so promote the log entry from debug to warning.
severity = LogWarning;
}

Log statsLog(severity, "JsonRpcConnection");
statsLog << "Processing JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity << "'";

if (cpuBoundDuration >= ch::seconds(1)) {
statsLog << " waited '" << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count() << "ms' on semaphore and";
}

statsLog << " took total " << ch::duration_cast<ch::milliseconds>(duration).count() << "ms.";
});

CpuBoundWork handleMessage (yc);

MessageHandler(message);
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

MessageHandler(message, rpcMethod);

l_TaskStats.InsertValue(Utility::GetTime(), 1);
} catch (const std::exception& ex) {
Expand Down Expand Up @@ -245,9 +275,13 @@ void JsonRpcConnection::Disconnect()
});
}

void JsonRpcConnection::MessageHandler(const String& jsonString)
void JsonRpcConnection::MessageHandler(const String& jsonString, String& rpcMethod)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
rpcMethod = message->Get("method");
if (rpcMethod.IsEmpty()) {
rpcMethod = "UNKNOWN";
}

if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");
Expand Down
18 changes: 17 additions & 1 deletion lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,23 @@ class JsonRpcConnection final : public Object
void CheckLiveness(boost::asio::yield_context yc);

bool ProcessMessage();
void MessageHandler(const String& jsonString);

/**
* MessageHandler routes the provided message to its corresponding handler (if any).
*
* This will first JSON decode the specified message and verifies the timestamp of that RPC message (if any).
* Subsequently, any message whose timestamp is less than the remote log position of the Endpoint will be rejected;
* otherwise, the endpoint's remote log position is updated to that timestamp. It is not expected to happen, but any
* message lacking an RPC method or referring to a non-existent one is also discarded. Afterwards, the RPC handler
* is then called for that message and sends it's result back to the sender if the message contains an ID.
*
* Note: The "rpcMethod" param is used as an output variable and is set to the RPC method of the given
* message if any, otherwise to "UNKNOWN".
*
* @param jsonString String The RPC message you want to process.
* @param rpcMethod String The output parameter for the RPC method.
*/
void MessageHandler(const String& jsonString, String& rpcMethod);

void CertificateRequestResponseHandler(const Dictionary::Ptr& message);

Expand Down

0 comments on commit 8e7a3d1

Please sign in to comment.