Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stabilize worker_total_busy_duration #6899

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

Owen-CH-Leung
Copy link
Contributor

Motivation

Stabilize worker_total_busy_duration so it can be used outside of --cfg tokio_unstable

Solution

Move the API impl out of tokio_unstable flag

Ref: #6546

@github-actions github-actions bot added R-loom-current-thread Run loom current-thread tests on this PR R-loom-multi-thread Run loom multi-thread tests on this PR R-loom-multi-thread-alt Run loom multi-thread alt tests on this PR labels Oct 11, 2024
@Darksonn Darksonn requested a review from hds October 11, 2024 07:45
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-metrics Module: tokio/runtime/metrics labels Oct 11, 2024
Comment on lines 38 to 48
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change required for the mean time stabilization or just a coincidental change?

Copy link
Contributor Author

@Owen-CH-Leung Owen-CH-Leung Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change is required.

At master we have implemented another struct WorkerMetrics at tokio/src/runtime/metrics/mock.rs which didn't use struct Histogram defined at tokio/src/runtime/metrics/histogram.rs.

In order to stabilize the API, I have to removed this mock WorkerMetrics and instead point to the "real" WorkerMetrics at tokio/src/runtime/metrics/worker.rs

The "real" WorkerMetrics has a field poll_count_histogram which is Option<Histogram>, and thus will attempt to parse tokio/src/runtime/metrics/histogram.rs. From there, you can see Histogram and HistogramBuilder both refers to HistogramScale which is hidden inside tokio_unstable. I think this wouldn't compile.

(I might be wrong though so feel free to correct me)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to read through this PR in more detail so bare with me, but we shouldn't be making this stable and public if it isn't actually needed to to stabilize worker_total_busy_duration (and I'm mostly sure that it isn't).

In general, all the #[allow(dead_code)] that are required for this chnage give me the impression that we're exposing something that is only really used in tokio_unstable and so we should find a way to expose it only unstable tokio_unstable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment!

I think the core issue here is that by exposing worker_total_busy_duration, we are also exposing the "real" WorkerMetrics at tokio/src/runtime/metrics/worker.rs, which in turn will attempt to parse HistogramScale.

Currently at master branch, when no tokio_unstable flag is passed in, the WorkerMetrics at tokio/src/runtime/metrics/mock.rs will be parsed (which is just an empty struct). And if the flag is passed in, the WorkerMetrics at tokio/src/runtime/metrics/worker.rs will be parsed

https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/metrics/mod.rs#L27-L40

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not actually public, it's still behind config_unstable. (Just verified via the autogenerated docs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I believe I understand.

I would suggest that instead of compiling with the entire worker metrics in order to access only the busy_duration_total field, we should gate all the fields that we won't be using on cfg_unstable_metrics. Otherwise users will still be paying the price for metrics which they don't have access to - and that is something that we would really like to avoid.

Have a look at the runtime Builder for an example of how we have fields and implementations that touch them gated on a cfg flag:

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,

As a general rule, if you need #[allow(dead_code)] then there is probably something that should be gated on a cfg flag instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hds . I've reverted changes to histogram.rs. Also the real Histogram, HistogramBuilder and HistogramBatch are gated behind unstable flag now, and instead I've created a mocked version of these for compilation.

For WorkerMetrics, as we target to stabilize more metrics, I'd suggest exposing all fields instead of stabilizing only busy_duration_total and putting other fields behind unstable.

Let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Owen-CH-Leung the problem with stabilizing all of WorkerMetrics is that when tokio_unstable is not enabled, the runtime will pay the price for all those metrics, but there will be no way to access them. For this reason I think that it would be better to only stabilize what we're exposing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hds . I've made changes to hide most of the fields of WorkerMetrics behind unstable flag, except for busy_duration_total, queue_depth and thread_id. The latter 2 fields are needed in order for set_queue_depth and set_thread_id to work properly.

I've also enriched the mock MetricsBatch to have minimal implementation of batch::MetricsBatch so that the worker_total_busy_duration API can function properly under stable build

Let me know your thoughts!

tokio/src/runtime/metrics/runtime.rs Outdated Show resolved Hide resolved
tokio/src/runtime/scheduler/mod.rs Outdated Show resolved Hide resolved
tokio/src/runtime/scheduler/mod.rs Outdated Show resolved Hide resolved
tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs Outdated Show resolved Hide resolved
@Owen-CH-Leung Owen-CH-Leung changed the title stabilize worker total busy duration, bring WorkerMetrics, MetricsBat… stabilize worker_total_busy_duration Oct 11, 2024
@Owen-CH-Leung Owen-CH-Leung marked this pull request as ready for review October 11, 2024 15:33
Copy link
Contributor

@hds hds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please bare with me as I haven't finished the review, but my first impression is that this PR is making more things public than is strictly necessary for the stabilization of worker_total_busy_duration and we should try to avoid that.

This is especially true since #6897 is also touching the histogram (although not taking it out of tokio_unstable) and that would be a breaking change if this PR is released first.

Comment on lines 38 to 48
/// Whether the histogram used to aggregate a metric uses a linear or
/// logarithmic scale.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub enum HistogramScale {
/// Linear bucket scale
Linear,

/// Logarithmic bucket scale
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to read through this PR in more detail so bare with me, but we shouldn't be making this stable and public if it isn't actually needed to to stabilize worker_total_busy_duration (and I'm mostly sure that it isn't).

In general, all the #[allow(dead_code)] that are required for this chnage give me the impression that we're exposing something that is only really used in tokio_unstable and so we should find a way to expose it only unstable tokio_unstable.

@@ -533,7 +533,7 @@ impl Handle {
self.shared.inject.len()
}

#[allow(dead_code)]
// #[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// #[allow(dead_code)]

@@ -18,7 +18,7 @@ impl Handle {
self.shared.injection_queue_depth()
}

#[allow(dead_code)]
// #[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// #[allow(dead_code)]

.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
cfg_unstable_metrics! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we grouped all the unstable functions together at the bottom of the impl block, instead of spreading them out.

@@ -15,40 +18,60 @@ use std::thread::ThreadId;
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Since this isn't a public method, it won't appear in the documentation.

}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this function duplicates part of the submit function in batch::MetricsBatch?

I think this is a problematic way of gradually stabilizing metrics, as it opens the possibility of having divirging implementations if a change is made to the "real" MetricsBatch by someone who doesn't realise that there is another one.

This is additionally confusing because this effectively becomes the "stable" implementation, but it lives in a module called mock.

I would propose that we instead split the metrics::MetricsBatch implementation into stable (always compiles) and unstable (gated by cfg option), the same way we've done elsewhere in this PR. The same as with another comment, we would group all the unstable functions into a single cfg_unstable_metrics! block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed spliting metrics::MetricsBatch is a much viable way of stabilising. I've adopted your suggestion and split it into stable & unstable (and group unstable functions into a single unstable block. Thanks a lot for reviewing!

Copy link
Contributor

@hds hds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is looking good now. Just a few organizational things so that we do conditional compilation the same as the rest of the code base.

Comment on lines 130 to 132
worker
.busy_duration_total
.store(self.busy_duration_total, Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the stablized items up to the top of the function.

Comment on lines 151 to 156
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
#[cfg(tokio_unstable)]
{
self.park_unpark_count += 1;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rest of the Tokio code base, we do this a different way, so let's stick to that convension. Instead of gating functionality within a function, we have a separate empty function definition when the cfg flag isn't enabled. So this function would become:

cfg_unstable_metrics! {
    /// The worker was unparked.
    pub(crate) fn unparked(&mut self) {
        self.park_unpark_count += 1;
    }
}

cfg_not_unstable_metrics! {
    /// The worker was unparked.
    pub(crate) fn unparked(&mut self) {}
}

Please do the same here. Keep a single cfg_unstable_metrics block (and a single cfg_not_unstable_metrics block) for all the functions that require this behavior, so that they're grouped together.

For the more complex functions above that have a mix of stablized and unstablized implementation, split the unstablized part out into a separate function with an impl in each of the macro blocks (see example in the comment on submit).

.store(self.steal_operations, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);

pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the comment on unparked, let's follow the convension elsewhere in the Tokio codebase where we need different implementations for stablized vs. unstablized functionality. Here that would mean the following:

pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
    worker
        .busy_duration_total
        .store(self.busy_duration_total, Relaxed);

    self.submit_unstable(worker, mean_poll_time);
}

cfg_not_unstable_metrics! {
    #[inline(always)]
    fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {}
}

cfg_unstable_metrics! {
    #[inline(always)]
    fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64){
        worker.mean_poll_time.store(_mean_poll_time, Relaxed);
        worker.park_count.store(self.park_count, Relaxed);
        worker
            .park_unpark_count
            .store(self.park_unpark_count, Relaxed);
        worker.noop_count.store(self.noop_count, Relaxed);
        worker.steal_count.store(self.steal_count, Relaxed);
        worker
            .steal_operations
            .store(self.steal_operations, Relaxed);
        worker.poll_count.store(self.poll_count, Relaxed);

        worker
            .local_schedule_count
            .store(self.local_schedule_count, Relaxed);
        worker.overflow_count.store(self.overflow_count, Relaxed);

        if let Some(poll_timer) = &self.poll_timer {
            let dst = worker.poll_count_histogram.as_ref().unwrap();
            poll_timer.poll_counts.submit(dst);
        }
    }
}

Use the same cfg_unstable_metrics and cfg_not_unstable_metrics blocks as for the other functions that need them (so there is only one of each in impl MetricsBatch.

}

impl MetricsBatch {
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
pub(crate) fn new(_worker_metrics: &WorkerMetrics) -> MetricsBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split this into 2 implementations each gated by cfg_(not_)unstable_metrics!.

Comment on lines 38 to 39
mod worker;
pub(crate) use worker::WorkerMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the modules and imports that are in both the macro blocks out above them, we don't need to gate them at all.

.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
cfg_not_unstable_metrics! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this down to be right above the cfg_unstable_metrics! block so that we keep the conditionally compiled implementations together.

@Owen-CH-Leung
Copy link
Contributor Author

@hds Thanks for your detailed review!

I've revised the PR to gate code based on cfg_(not_)unstable_metrics! macros, and also re-organize code in the mod.rs and worker.rs. Let me know if you have any other feedback.

@Owen-CH-Leung Owen-CH-Leung requested review from hds and rcoh December 30, 2024 17:17
Copy link
Contributor

@rcoh rcoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good to me. I think incremental stabilization is inherently annoying to do—I think it might be better if we invested in some primitives. I think we should also endeavor to standardize the way functions are made to be no-ops.

/// Number of tasks that were scheduled locally on this worker.
local_schedule_count: u64,

#[cfg(tokio_unstable)]
/// Number of tasks moved to the global queue to make space in the local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider reorganizing to put the stable fields on the top to make it clearer

Comment on lines 206 to 220
cfg_not_unstable_metrics! {
/// Start polling an individual task
pub(crate) fn start_poll(&mut self) {}
}

if let Some(poll_timer) = &mut self.poll_timer {
poll_timer.poll_started_at = Instant::now();
cfg_unstable_metrics! {
/// Stop polling an individual task
pub(crate) fn end_poll(&mut self) {
#[cfg(tokio_unstable)]
if let Some(poll_timer) = &mut self.poll_timer {
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
poll_timer.poll_counts.measure(elapsed, 1);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading through this, I wonder if we should make a macro specifically for this pattern, something like:

cfg_metrics! {
  stable: {
     ...
   },
   unstable: { ... }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are going to have a lot of a/b code when metrics or stable or not, could be helpful to avoid bugs

Comment on lines 241 to 243
#[cfg(tokio_unstable)] {
self.steal_count += _by as u64;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably pick a pattern—either cfg-ing the body of the function or the function entirely instead of using both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rcoh . I've added a new macro and put all the ab code implementation into this macro. I also recorganize fields so that stable fields come first. Please can I have your review again ? Thanks!

Comment on lines 252 to 254
pub(crate) fn incr_steal_count(&mut self, _by: u16) {
self.steal_count += _by as u64;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(crate) fn incr_steal_count(&mut self, _by: u16) {
self.steal_count += _by as u64;
}
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += by as u64;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks & revised

@@ -15,40 +18,50 @@ use std::thread::ThreadId;
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same—please move stable fields to top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Moved stable fields to top

Comment on lines 60 to 62
/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
pub(crate) queue_depth: MetricAtomicUsize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentionally stabilized?

Copy link
Contributor Author

@Owen-CH-Leung Owen-CH-Leung Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe queue_depth is already stabilised at master and is used by the current thread scheduler :

https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/scheduler/current_thread/mod.rs#L341

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-metrics Module: tokio/runtime/metrics R-loom-current-thread Run loom current-thread tests on this PR R-loom-multi-thread Run loom multi-thread tests on this PR R-loom-multi-thread-alt Run loom multi-thread alt tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants