Skip to content

Commit

Permalink
Merge pull request #3955 from hove-io/add_rt_metrics
Browse files Browse the repository at this point in the history
Kraken: Add metrics around RT handling
  • Loading branch information
Pierre-Etienne Bougué authored Mar 28, 2023
2 parents 5d3d547 + 64b8191 commit 937d0e0
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
47 changes: 45 additions & 2 deletions source/kraken/maintenance_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,26 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
bool autocomplete_rebuilding_activated = false;
auto rt_action = RTAction::chaos;

size_t applied_entity_count = 0u;
pt::ptime oldest_message_time{pt::max_date_time};
pt::ptime youngest_message_time{pt::min_date_time};
uint64_t sum_message_age_until_begin_microseconds = 0u;
size_t dated_message_count = 0u;
for (auto& envelope : envelopes) {
const auto routing_key = envelope->RoutingKey();
LOG4CPLUS_DEBUG(logger, "realtime info received from " << routing_key);
assert(envelope);
transit_realtime::FeedMessage feed_message;
if (!feed_message.ParseFromString(envelope->Message()->Body())) {
LOG4CPLUS_WARN(logger, "protobuf not valid!");
return;
continue;
}
if (feed_message.header().has_timestamp()) {
auto message_time = navitia::from_posix_timestamp(feed_message.header().timestamp());
oldest_message_time = std::min(oldest_message_time, message_time);
youngest_message_time = std::max(youngest_message_time, message_time);
++dated_message_count;
sum_message_age_until_begin_microseconds += pt::time_duration(begin - message_time).total_microseconds();
}
LOG4CPLUS_TRACE(logger, "received entity: " << feed_message.DebugString());
for (const auto& entity : feed_message.entity()) {
Expand All @@ -384,6 +396,7 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
this->metrics.observe_data_cloning(duration.total_seconds());
LOG4CPLUS_INFO(logger, "data copied (cloned) in " << duration);
}
++applied_entity_count;
if (entity.is_deleted()) {
LOG4CPLUS_DEBUG(logger, "deletion of disruption " << entity.id());
rt_action = RTAction::deletion;
Expand All @@ -401,9 +414,15 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
autocomplete_rebuilding_activated = autocomplete_rebuilding_needed(entity);
} else {
LOG4CPLUS_WARN(logger, "unsupported gtfs rt feed");
--applied_entity_count;
}
}
}
if (!envelopes.empty()) {
// messages may contain multiple entities, and some may be skipped
this->metrics.observe_applied_rt_entity_count(applied_entity_count);
LOG4CPLUS_DEBUG(logger, "Number of RT entity really applied in this message batch: " << applied_entity_count);
}
if (data) {
LOG4CPLUS_INFO(logger, "rebuilding relations");
data->build_relations();
Expand All @@ -421,7 +440,8 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
data_manager.set_data(std::move(data));

// Feed metrics
auto duration = pt::microsec_clock::universal_time() - begin;
auto end = pt::microsec_clock::universal_time();
auto duration = end - begin;
if (rt_action == RTAction::deletion) {
this->metrics.observe_delete_disruption(duration.total_milliseconds() / 1000.0);
LOG4CPLUS_INFO(logger, "Data updated after deleting disruption, "
Expand All @@ -435,6 +455,21 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
LOG4CPLUS_INFO(logger, "Data updated with realtime from kirin, "
<< envelopes.size() << " disruption(s) applied in " << duration);
}
if (dated_message_count > 0) {
auto min_age = end - youngest_message_time;
auto max_age = end - oldest_message_time;
auto sum_message_age_until_end_microseconds =
sum_message_age_until_begin_microseconds + (dated_message_count * (end - begin).total_microseconds());
auto average_age_microseconds = sum_message_age_until_end_microseconds / dated_message_count;
this->metrics.observe_rt_message_age_min(double(min_age.total_milliseconds()) / 1000.0);
this->metrics.observe_rt_message_age_max(double(max_age.total_milliseconds()) / 1000.0);
this->metrics.observe_rt_message_age_average(double(average_age_microseconds) / 1000000.0);
LOG4CPLUS_DEBUG(logger, "Known ages of RT message(s) in batch: min="
<< min_age << ", average=" << pt::microseconds(average_age_microseconds)
<< ", max=" << max_age);
} else {
LOG4CPLUS_DEBUG(logger, "All ages of RT message(s) in batch are unknown");
}
} else if (!envelopes.empty()) {
// we didn't had to update Data because there is no change but we want to track that realtime data
// is being processed as it should because "nothing has changed" isn't the same thing
Expand Down Expand Up @@ -508,7 +543,15 @@ void MaintenanceWorker::listen_rabbitmq() {
size_t max_batch_nb = conf.broker_max_batch_nb();

try {
auto begin_rt_retrieval = pt::microsec_clock::universal_time();
auto rt_envelopes = consume_in_batch(rt_tag, max_batch_nb, timeout_ms, no_ack);
auto duration_rt_retrieval = pt::microsec_clock::universal_time() - begin_rt_retrieval;
this->metrics.observe_retrieve_rt_message_duration(double(duration_rt_retrieval.total_milliseconds())
/ 1000.0);
this->metrics.observe_retrieved_rt_message_count(rt_envelopes.size());
LOG4CPLUS_DEBUG(logger, "Retrieval of RT messages from RabbitMQ, " << rt_envelopes.size()
<< " messages(s) retrieved in "
<< duration_rt_retrieval);
handle_rt_in_batch(rt_envelopes);

auto task_envelopes = consume_in_batch(task_tag, 1, timeout_ms, no_ack);
Expand Down
84 changes: 84 additions & 0 deletions source/kraken/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,48 @@ Metrics::Metrics(const boost::optional<std::string>& endpoint, const std::string
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->retrieve_rt_message_duration_histogram = &prometheus::BuildHistogram()
.Name("kraken_retrieve_rt_message_duration_seconds")
.Help("duration of RT messages retrieval from RabbitMQ")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->retrieved_rt_message_count_histogram = &prometheus::BuildHistogram()
.Name("kraken_retrieve_rt_message_count")
.Help("number of RT messages retrieved from RabbitMQ")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->applied_rt_entity_count_histogram = &prometheus::BuildHistogram()
.Name("kraken_applied_rt_entity_count")
.Help("number of applied RT entity from a message batch")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->rt_message_age_min_histogram = &prometheus::BuildHistogram()
.Name("kraken_rt_message_age_min_seconds")
.Help("Minimum age of RT message from a batch")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->rt_message_age_average_histogram = &prometheus::BuildHistogram()
.Name("kraken_rt_message_age_average_seconds")
.Help("Average age of RT message from a batch")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));

this->rt_message_age_max_histogram = &prometheus::BuildHistogram()
.Name("kraken_rt_message_age_max_seconds")
.Help("Maximum age of RT message from a batch")
.Labels({{"coverage", coverage}})
.Register(*registry)
.Add({}, create_exponential_buckets(0.5, 2, 10));
}

InFlightGuard Metrics::start_in_flight() const {
Expand Down Expand Up @@ -208,6 +250,48 @@ void Metrics::observe_delete_disruption(double duration) const {
this->delete_disruption_histogram->Observe(duration);
}

void Metrics::observe_retrieve_rt_message_duration(double duration) const {
if (!registry) {
return;
}
this->retrieve_rt_message_duration_histogram->Observe(duration);
}

void Metrics::observe_retrieved_rt_message_count(size_t count) const {
if (!registry) {
return;
}
this->retrieved_rt_message_count_histogram->Observe(double(count));
}

void Metrics::observe_applied_rt_entity_count(size_t count) const {
if (!registry) {
return;
}
this->applied_rt_entity_count_histogram->Observe(double(count));
}

void Metrics::observe_rt_message_age_min(double duration) const {
if (!registry) {
return;
}
this->rt_message_age_min_histogram->Observe(duration);
}

void Metrics::observe_rt_message_age_average(double duration) const {
if (!registry) {
return;
}
this->rt_message_age_average_histogram->Observe(duration);
}

void Metrics::observe_rt_message_age_max(double duration) const {
if (!registry) {
return;
}
this->rt_message_age_max_histogram->Observe(duration);
}

void Metrics::set_raptor_cache_miss(size_t nb_cache_miss) const {
if (!registry) {
return;
Expand Down
12 changes: 12 additions & 0 deletions source/kraken/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class Metrics : boost::noncopyable {
prometheus::Histogram* handle_rt_histogram;
prometheus::Histogram* handle_disruption_histogram;
prometheus::Histogram* delete_disruption_histogram;
prometheus::Histogram* retrieve_rt_message_duration_histogram;
prometheus::Histogram* retrieved_rt_message_count_histogram;
prometheus::Histogram* applied_rt_entity_count_histogram;
prometheus::Histogram* rt_message_age_min_histogram;
prometheus::Histogram* rt_message_age_average_histogram;
prometheus::Histogram* rt_message_age_max_histogram;
prometheus::Gauge* next_st_cache_miss;

public:
Expand All @@ -86,6 +92,12 @@ class Metrics : boost::noncopyable {
void observe_handle_rt(double duration) const;
void observe_handle_disruption(double duration) const;
void observe_delete_disruption(double duration) const;
void observe_retrieve_rt_message_duration(double duration) const;
void observe_retrieved_rt_message_count(size_t count) const;
void observe_applied_rt_entity_count(size_t count) const;
void observe_rt_message_age_min(double duration) const;
void observe_rt_message_age_average(double duration) const;
void observe_rt_message_age_max(double duration) const;
void set_raptor_cache_miss(size_t nb_cache_miss) const;
};

Expand Down

0 comments on commit 937d0e0

Please sign in to comment.