Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fcos-policy-engine: fetch graph from fcos-graph-builder instance #21

Merged
merged 1 commit into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fcos-policy-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ reqwest = { version = "^0.10.1", features = ["json"] }
serde = "^1.0.70"
serde_derive = "^1.0.70"
serde_json = "^1.0.22"
serde_qs = "0.6.1"
structopt = "^0.3.7"
14 changes: 7 additions & 7 deletions fcos-policy-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ extern crate log;
#[macro_use]
extern crate prometheus;

mod utils;

use actix_web::{web, App, HttpResponse};
use commons::{graph, metrics, policy};
use commons::{metrics, policy};
use failure::{Error, Fallible};
use prometheus::{Histogram, IntCounter, IntGauge};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -83,7 +85,7 @@ pub(crate) struct AppState {
population: Arc<cbloom::Filter>,
}

#[derive(Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct GraphQuery {
basearch: Option<String>,
stream: Option<String>,
Expand All @@ -108,11 +110,9 @@ pub(crate) async fn pe_serve_graph(
let wariness = compute_wariness(&query);
ROLLOUT_WARINESS.observe(wariness);

// TODO (zonggen): remove hard-coded empty graph and use the graph fetched from fcos-graph-builder
let cached_graph = graph::Graph::default();
let cached_graph = utils::fetch_graph_from_gb(stream.clone(), basearch.clone()).await?;

let arch_graph = policy::pick_basearch(cached_graph, basearch)?;
let throttled_graph = policy::throttle_rollouts(arch_graph, wariness);
let throttled_graph = policy::throttle_rollouts(cached_graph, wariness);
let final_graph = policy::filter_deadends(throttled_graph);

let json =
Expand Down
51 changes: 51 additions & 0 deletions fcos-policy-engine/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use actix::prelude::*;
use commons::graph;
use failure::{bail, Error, Fallible, SyncFailure};
use reqwest::Method;
use std::time::Duration;

/// Default timeout for HTTP requests (30 minutes).
const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60);
/// Default address of fcos-graph-builder, which is the same as fcos-policy-builer
const DEFAULT_GB_ADDR: &str = "http://127.0.0.1:5050/v1/graph";
lucab marked this conversation as resolved.
Show resolved Hide resolved

/// Return a request builder with base URL and parameters set.
fn new_request(method: reqwest::Method, url: reqwest::Url) -> Fallible<reqwest::RequestBuilder> {
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REQ_TIMEOUT)
.build()?;
let builder = client.request(method, url);
Ok(builder)
}

/// Fetch the graph from the fcos-graph-builder instance with the query specified.
pub(crate) fn fetch_graph_from_gb(
stream: String,
basearch: String,
) -> impl Future<Output = Result<graph::Graph, Error>> {
async move {
if stream.trim().is_empty() {
bail!("unexpected missing stream");
}
if basearch.trim().is_empty() {
bail!("unexpected missing basearch");
}
let query = crate::GraphQuery {
stream: Some(stream),
basearch: Some(basearch),
rollout_wariness: None,
node_uuid: None,
};
// Cannot use `?` directly here otherwise will produce the error:
// the trait `std::marker::Sync` is not implemented for `(dyn std::error::Error + std::marker::Send + 'static)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because errors coming from serde_qs are not Sync (due to their use of error-chain).
Can you please open a bug to the upstream repo and ask them to consider using a better library for error-handling (i.e. thiserror or snafu)?

// Reference: https://github.com/rust-lang-nursery/failure/issues/284
let query_str = serde_qs::to_string(&query).map_err(SyncFailure::new)?;
let mut target = reqwest::Url::parse(DEFAULT_GB_ADDR)?;
target.set_query(Some(&query_str));
let req = new_request(Method::GET, target)?;
let resp = req.send().await?;
let content = resp.error_for_status()?;
let json = content.json::<graph::Graph>().await?;
Ok(json)
}
}