Skip to content

Commit

Permalink
Merge pull request #14 from faradayio/tokio_0_2
Browse files Browse the repository at this point in the history
BREAKING: Update to `tokio` 0.2
  • Loading branch information
emk authored Jan 14, 2020
2 parents 65e1c27 + 59a3fa0 commit f398483
Show file tree
Hide file tree
Showing 16 changed files with 140 additions and 168 deletions.
7 changes: 4 additions & 3 deletions bigml-parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ cli_test_dir = "0.1.5"

[dependencies]
bigml = { version = "=0.4.4", path = "../bigml" }
bytes = "0.4.12"
bytes = "0.5.3"
common_failures = "0.1.1"
# This makes the executable bigger, but it makes --help much nicer.
clap = { version = "2", features = ["wrap_help"] }
env_logger = "0.7"
failure = "0.1.5"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
futures = "0.3.1"
log = "0.4"
serde = { version = "1" }
serde_json = "1.0"
# This is pretty heavyweight, but it's easy to set up and nice for users.
structopt = "0.3.4"
tokio = "0.1.22"
tokio = { version = "0.2.6", features = ["fs", "io-std"] }
tokio-util = { version = "0.2.0", features = ["codec"] }
4 changes: 2 additions & 2 deletions bigml-parallel/src/line_delimited_json_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::{BufMut, BytesMut};
use failure::Error;
use serde::Serialize;
use std::marker::PhantomData;
use tokio::codec::Encoder;
use tokio_util::codec::Encoder;

/// A [`tokio::codec::Encoder`] that outputs a [line-delimited JSON
/// stream][json]. This can be used to output a `Stream` of values implementing
Expand All @@ -31,7 +31,7 @@ impl<T: Serialize> Encoder for LineDelimitedJsonCodec<T> {
fn encode(&mut self, item: T, buf: &mut BytesMut) -> Result<(), Error> {
let json = serde_json::to_vec(&item)?;
buf.reserve(json.len() + 1);
buf.put(json);
buf.put(&json[..]);
buf.put_u8(b'\n');
Ok(())
}
Expand Down
46 changes: 21 additions & 25 deletions bigml-parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ use bigml::{
use common_failures::{quick_main, Result};
use env_logger;
use failure::{Error, ResultExt};
use futures::{compat::Future01CompatExt, Future, FutureExt, TryFutureExt};
use futures::{self, stream, FutureExt, StreamExt, TryStreamExt};
use log::debug;
use std::{env, pin::Pin, sync::Arc};
use std::{env, sync::Arc};
use structopt::StructOpt;
use tokio::{
codec::{FramedRead, FramedWrite, LinesCodec},
io,
prelude::{stream, Stream},
runtime::Runtime,
};
use tokio::{io, runtime::Runtime};
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};

mod execution_input;
mod line_delimited_json_codec;
Expand All @@ -25,10 +21,10 @@ use execution_input::ExecutionInput;
use line_delimited_json_codec::LineDelimitedJsonCodec;

/// Our standard stream type, containing values of type `T`.
type BoxStream<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>;
type BoxStream<T> = futures::stream::BoxStream<'static, Result<T>>;

/// Our standard future type, yield a value of type `T`.
type BoxFuture<T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'static>>;
type BoxFuture<T> = futures::future::BoxFuture<'static, Result<T>>;

/// Our command-line arguments.
#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -88,7 +84,7 @@ fn run() -> Result<()> {
// Create a future for our async code, and pass it to an async runtime.
let fut = run_async(opt);
let mut runtime = Runtime::new().expect("Unable to create a runtime");
runtime.block_on(fut.boxed().compat())?;
runtime.block_on(fut.boxed())?;
Ok(())
}

Expand All @@ -100,11 +96,11 @@ async fn run_async(opt: Opt) -> Result<()> {
let resources: BoxStream<String> = if !opt.resources.is_empty() {
// Turn our `--resource` arguments into a stream.
let resources = opt.resources.clone();
Box::new(stream::iter_ok(resources.into_iter()))
stream::iter(resources.into_iter().map(Ok)).boxed()
} else {
// Parse standard input as a stream of dataset IDs.
let lines = FramedRead::new(io::stdin(), LinesCodec::new());
Box::new(lines.map_err(|e| -> Error { e.into() }))
lines.map_err(|e| -> Error { e.into() }).boxed()
};

// Wrap our command line arguments in a thread-safe reference counter, so
Expand All @@ -114,30 +110,30 @@ async fn run_async(opt: Opt) -> Result<()> {
// Transform our stream of IDs into a stream of _futures_, each of which will
// return an `Execution` object from BigML.
let opt2 = opt.clone();
let execution_futures: BoxStream<BoxFuture<Execution>> =
Box::new(resources.map(move |resource| {
let execution_futures: BoxStream<BoxFuture<Execution>> = resources
.map_ok(move |resource| {
resource_id_to_execution(opt2.clone(), resource).boxed()
}));
})
.boxed();

// Now turn the stream of futures into a stream of executions, using
// `buffer_unordered` to execute up to `opt.max_tasks` in parallel. This is
// basically the "payoff" for all the async code up above, and it is
// wonderful.
let executions: BoxStream<Execution> = Box::new(
execution_futures
// Convert back to legacy `tokio::Future` for `buffered_unordered`.
.map(|fut| fut.compat())
// TODO: This has weird buffering behavior, and appears to wait
// until it buffers `opt.max_tasks` items.
.buffer_unordered(opt.max_tasks),
);
//
// TODO: In tokio 0.1, this had weird buffering behavior, and
// appeared to wait until it buffered `opt.max_tasks` items. I have
// not verified this in tokio 0.2.
let executions: BoxStream<Execution> = execution_futures
.try_buffer_unordered(opt.max_tasks)
.boxed();

// Copy our stream of `Execution`s to standard output as line-delimited
// JSON.
//
// TODO: `forward` may also have weird buffering behavior.
let stdout = FramedWrite::new(io::stdout(), LineDelimitedJsonCodec::new());
let (_executions, _stdout) = executions.forward(stdout).compat().await?;
executions.forward(stdout).await?;
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions bigml-parallel/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bigml::{
};
use cli_test_dir::*;
use common_failures::prelude::*;
use futures::{FutureExt, TryFutureExt};
use futures::FutureExt;
use serde_json;
use std::{env, future::Future, io::Write};
use tokio::runtime::Runtime;
Expand All @@ -26,7 +26,7 @@ where
T: Send + 'static,
{
let mut runtime = Runtime::new().expect("Unable to create a runtime");
runtime.block_on(fut.boxed().compat())
runtime.block_on(fut.boxed())
}

#[test]
Expand Down
13 changes: 6 additions & 7 deletions bigml/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ env_logger = "0.7"

[dependencies]
bigml_derive = { version = "0.2.2", path = "../bigml_derive" }
bytes = "0.4.12"
bytes = "0.5.3"
chrono = { version = "0.4", features = ["serde"] }
failure = "0.1.1"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
futures = "0.3.1"
lazy_static = "1.3"
log = "0.4"
mime = "0.3"
mpart-async = "0.2.1"
reqwest = "0.9"
reqwest = { version = "0.10", features = ["json", "stream"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tokio = "0.1.22"
tokio-timer = "0.1"
url = "1.2"
tokio = { version = "0.2.6", features = ["fs", "macros"] }
tokio-util = { version = "0.2.0", features = ["codec"] }
url = "2.1"
22 changes: 12 additions & 10 deletions bigml/examples/create_execution.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use bigml;
use env_logger;
use failure;
use futures::executor::block_on;
#[macro_use]
extern crate log;

use bigml::resource;
use failure::ResultExt;
use futures::{FutureExt, TryFutureExt};
use std::env;
use std::io::{self, Write};
use std::process;
use std::result;
use std::str::FromStr;
use tokio::prelude::*;
use failure::{Error, ResultExt};
use futures::FutureExt;
use std::{
env,
io::{self, Write},
process, result,
str::FromStr,
};

type Result<T> = result::Result<T, failure::Error>;
/// A custom `Result`, for convenience.
pub type Result<T, E = Error> = result::Result<T, E>;

/// A local helper function which does the real work, and which can return
/// an error (unlike `main`).
Expand Down Expand Up @@ -43,7 +45,7 @@ fn helper(
}

// Execute the script, wait for it to complete, and print the result.
let execution = client.create_and_wait(&args).boxed().compat().wait()?;
let execution = block_on(client.create_and_wait(&args).boxed())?;
println!("{:#?}", execution);

Ok(())
Expand Down
18 changes: 4 additions & 14 deletions bigml/examples/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
// `create_source_from_path`.
#![allow(deprecated)]

use bigml;
use bigml::{self, resource::Resource};
use env_logger;

use bigml::resource::Resource;
use futures::{FutureExt, TryFutureExt};
use futures::{executor::block_on, FutureExt};
use std::env;
use std::io::{self, Write};
use std::path::Path;
use std::process;
use tokio::prelude::*;

fn main() {
env_logger::init();
Expand All @@ -31,17 +29,9 @@ fn main() {

let client = bigml::Client::new(bigml_username, bigml_api_key)
.expect("can't create bigml::Client");
let initial_response = client
.create_source_from_path(&path)
.boxed()
.compat()
.wait()
let initial_response = block_on(client.create_source_from_path(path).boxed())
.expect("can't create source");
let response = client
.wait(initial_response.id())
.boxed()
.compat()
.wait()
let response = block_on(client.wait(initial_response.id()).boxed())
.expect("error waiting for resource");

println!("{:#?}", &response);
Expand Down
Loading

0 comments on commit f398483

Please sign in to comment.