Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio 多线程并发的最小单位 #73

Open
xiaoxiaojx opened this issue Jul 19, 2024 · 0 comments
Open

Tokio 多线程并发的最小单位 #73

xiaoxiaojx opened this issue Jul 19, 2024 · 0 comments
Labels

Comments

@xiaoxiaojx
Copy link
Owner

image

在调试 tokio 代码时发现一个有意思的点, 我在 examples/tinyhttp.rs 的代码基础上添加了3个日志打点分别是[UserFuc1]、[UserFuc2]、[UserFuc3], 其打印的 thread_id 竟然各不相同

意味着基于 Rust 🦀️ 实现多线程并发的单位可以细化到函数块, 一个函数能够在多个线程并发接替着执行下去 🤔️。这个特性在其他语言库中比如 C 的 libuv 中也未曾见到

// examples/tinyhttp.rs

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Parse the arguments, bind the TCP socket we'll be listening to, spin up
    // our worker threads, and start shipping sockets to those worker threads.
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8080".to_string());
    let server = TcpListener::bind(&addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (stream, _) = server.accept().await?;
        tokio::spawn(async move {
            let thread_id = std::thread::current().id();
            println!("[UserFuc1] Thread ID in spawned thread: {:?}", thread_id);

            if let Err(e) = process(stream).await {
                println!("failed to process connection; error = {}", e);
            }
            let thread_id = std::thread::current().id();
            println!("[UserFuc2] Thread ID in spawned thread: {:?}", thread_id);

            test_fn().await;
            let thread_id = std::thread::current().id();
            println!("[UserFuc3] Thread ID in spawned thread: {:?}", thread_id);
        });
    }
}

还好之前有阅读过 Rust 的异步编程文档, 其中就有提到 async transforms a block of code into a state machine that implements a trait called Future

这表明 async 函数在 Rust 中类似于语法糖, 就好比在 JavaScript 中当你写了 async 函数又想兼容低版本浏览器环境, 通常会使用 babel 在其打包阶段编译为 es5 的代码

// Source.js

async function spawn_cb() {
  console.log(1)
  
  await process()
  
  console.log(2)
}

如下编译后的代码 case 0, 3, 4, end 也就相当于是状态机中的 4种函数运行状态了, _context.next 就记录了下一个步骤的状态

// Transpiled.js

function spawn_cb() {
  return _spawn_cb.apply(this, arguments);
}

function _spawn_cb() {
  _spawn_cb = _asyncToGenerator(
  /*#__PURE__*/
  regeneratorRuntime.mark(function _callee() {
    return regeneratorRuntime.wrap(function _callee$(_context) {
      while (1) {
        switch (_context.prev = _context.next) {
          case 0:
            console.log(1);
            _context.next = 3;
            return process();

          case 3:
            console.log(2);

          case 4:
          case "end":
            return _context.stop();
        }
      }
    }, _callee);
  }));
  return _spawn_cb.apply(this, arguments);
}

回过头来说为什么一个函数能够在多个线程并发接替着执行下去不太好实现, 其原因就是当函数在线程1运行到了第几行代码, 其他线程 2,3,4 又怎会知道何谈接着往下运行了

这里最大的阻碍是函数运行的状态与上下文变量需要记录下来, 而 Rust 会把 async 函数编译为状态机代码, 似乎一下就正好解决了这个问题

最后让 ChatGPT 试着打印 Rust 把 async 函数编译为状态机的代码, 此时就比较清楚了

编译后的代码有了记录当前运行的状态 State 与上下文变量 MyFuture, 当不同线程调用 poll 时, 如果在线程 1运行到了什么阶段然后通过 state 来记录下来, 其他线程调用 poll 时就能顺着最新 state 继续往下执行了~

// examples/tinyhttp.rs

enum State {
    Start,
    AfterProcess,
    AfterTestFn,
    Done,
}

struct MyFuture {
    state: State,
    stream: StreamType, // 假设 process 函数的参数类型
    process_future: Option<ProcessFuture>, // 假设 process 函数返回的 Future 类型
    test_fn_future: Option<TestFnFuture>, // 假设 test_fn 函数返回的 Future 类型
}

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match self.state {
                State::Start => {
                    let thread_id = std::thread::current().id();
                    println!("[UserFuc1] Thread ID in spawned thread: {:?}", thread_id);

                    self.process_future = Some(process(self.stream));
                    self.state = State::AfterProcess;
                }
                State::AfterProcess => {
                    if let Some(fut) = &mut self.process_future {
                        match Pin::new(fut).poll(cx) {
                            Poll::Pending => return Poll::Pending,
                            Poll::Ready(Err(e)) => {
                                println!("failed to process connection; error = {}", e);
                                self.process_future = None;
                            }
                            Poll::Ready(Ok(_)) => {
                                self.process_future = None;
                            }
                        }
                    }

                    let thread_id = std::thread::current().id();
                    println!("[UserFuc2] Thread ID in spawned thread: {:?}", thread_id);

                    self.test_fn_future = Some(test_fn());
                    self.state = State::AfterTestFn;
                }
                State::AfterTestFn => {
                    if let Some(fut) = &mut self.test_fn_future {
                        match Pin::new(fut).poll(cx) {
                            Poll::Pending => return Poll::Pending,
                            Poll::Ready(_) => {
                                self.test_fn_future = None;
                            }
                        }
                    }

                    let thread_id = std::thread::current().id();
                    println!("[UserFuc3] Thread ID in spawned thread: {:?}", thread_id);

                    self.state = State::Done;
                }
                State::Done => {
                    return Poll::Ready(());
                }
            }
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant