Skip to content

Commit

Permalink
channel: fix send issue in state change API (#516)
Browse files Browse the repository at this point in the history
async/await doesn't work well with temporary variables, this PR changes
it to `impl Future` to get around Send requirement.

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay authored Mar 10, 2021
1 parent f9974a8 commit c640299
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
6 changes: 3 additions & 3 deletions src/call/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ impl From<i32> for RpcStatusCode {
}
}

impl Into<i32> for RpcStatusCode {
fn into(self) -> i32 {
self.0
impl From<RpcStatusCode> for i32 {
fn from(code: RpcStatusCode) -> i32 {
code.0
}
}

Expand Down
33 changes: 17 additions & 16 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, i32, ptr};
Expand Down Expand Up @@ -601,30 +602,30 @@ impl Channel {
///
/// `check_connectivity_state` needs to be called to get the current state. Returns false
/// means deadline excceeds before observing any state changes.
pub async fn wait_for_state_change(
pub fn wait_for_state_change(
&self,
last_observed: ConnectivityState,
deadline: impl Into<Deadline>,
) -> bool {
) -> impl Future<Output = bool> {
let (cq_f, prom) = CallTag::action_pair();
let prom_box = Box::new(prom);
let tag = Box::into_raw(prom_box);
let cq_ref = match self.cq.borrow() {
Ok(r) => r,
let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
unsafe {
grpcio_sys::grpc_channel_watch_connectivity_state(
self.inner.channel,
last_observed,
deadline.into().spec(),
cq_ref.as_ptr(),
tag as *mut _,
)
}
true
} else {
// It's already shutdown.
Err(_) => return false,
false
};

unsafe {
grpcio_sys::grpc_channel_watch_connectivity_state(
self.inner.channel,
last_observed,
deadline.into().spec(),
cq_ref.as_ptr(),
tag as *mut _,
)
}
cq_f.await.unwrap()
async move { should_wait && cq_f.await.unwrap() }
}

/// Wait for this channel to be connected.
Expand Down
5 changes: 4 additions & 1 deletion tests-and-examples/tests/cases/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ fn test_connectivity() {
ch.check_connectivity_state(false),
ConnectivityState::GRPC_CHANNEL_READY
);
let client = GreeterClient::new(ch);
let client = GreeterClient::new(ch.clone());
let req = HelloRequest::default();
let resp = client.say_hello(&req).unwrap();
assert!(!resp.get_message().is_empty());
client.spawn(async move {
ch.wait_for_connected(Duration::from_secs(3)).await;
});
}

0 comments on commit c640299

Please sign in to comment.