Skip to content

Commit

Permalink
fcos-policy-engine: fetch graph from fcos-graph-builder instance
Browse files Browse the repository at this point in the history
Removes previous empty dummy graph returned by fcos-policy-engine and
fetches the graph from fcos-graph-builder instance, which is running
under the same pod as fcos-policy-engine.

Signed-off-by: Allen Bai <[email protected]>
  • Loading branch information
Allen Bai committed Jul 21, 2020
1 parent de7f3e3 commit a43cb89
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 7 deletions.
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fcos-policy-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ reqwest = { version = "^0.10.1", features = ["json"] }
serde = "^1.0.70"
serde_derive = "^1.0.70"
serde_json = "^1.0.22"
serde_qs = "0.6.1"
structopt = "^0.3.7"
14 changes: 7 additions & 7 deletions fcos-policy-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ extern crate log;
#[macro_use]
extern crate prometheus;

mod utils;

use actix_web::{web, App, HttpResponse};
use commons::{graph, metrics, policy};
use commons::{metrics, policy};
use failure::{Error, Fallible};
use prometheus::{Histogram, IntCounter, IntGauge};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -83,7 +85,7 @@ pub(crate) struct AppState {
population: Arc<cbloom::Filter>,
}

#[derive(Deserialize)]
#[derive(Serialize, Deserialize)]
pub struct GraphQuery {
basearch: Option<String>,
stream: Option<String>,
Expand All @@ -108,11 +110,9 @@ pub(crate) async fn pe_serve_graph(
let wariness = compute_wariness(&query);
ROLLOUT_WARINESS.observe(wariness);

// TODO (zonggen): remove hard-coded empty graph and use the graph fetched from fcos-graph-builder
let cached_graph = graph::Graph::default();
let cached_graph = utils::fetch_graph_from_gb(stream.clone(), basearch.clone()).await?;

let arch_graph = policy::pick_basearch(cached_graph, basearch)?;
let throttled_graph = policy::throttle_rollouts(arch_graph, wariness);
let throttled_graph = policy::throttle_rollouts(cached_graph, wariness);
let final_graph = policy::filter_deadends(throttled_graph);

let json =
Expand Down
51 changes: 51 additions & 0 deletions fcos-policy-engine/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use actix::prelude::*;
use commons::graph;
use failure::{bail, Error, Fallible, SyncFailure};
use reqwest::Method;
use std::time::Duration;

/// Default timeout for HTTP requests (30 minutes).
const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60);
/// Default address of fcos-graph-builder, which is the same as fcos-policy-builer
const DEFAULT_GB_ADDR: &str = "http://127.0.0.1:5050/v1/graph";

/// Return a request builder with base URL and parameters set.
fn new_request(method: reqwest::Method, url: reqwest::Url) -> Fallible<reqwest::RequestBuilder> {
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REQ_TIMEOUT)
.build()?;
let builder = client.request(method, url);
Ok(builder)
}

/// Fetch the graph from the fcos-graph-builder instance with the query specified.
pub(crate) fn fetch_graph_from_gb(
stream: String,
basearch: String,
) -> impl Future<Output = Result<graph::Graph, Error>> {
async move {
if stream.trim().is_empty() {
bail!("unexpected missing stream");
}
if basearch.trim().is_empty() {
bail!("unexpected missing basearch");
}
let query = crate::GraphQuery {
stream: Some(stream),
basearch: Some(basearch),
rollout_wariness: None,
node_uuid: None,
};
// Cannot use `?` directly here otherwise will produce the error:
// the trait `std::marker::Sync` is not implemented for `(dyn std::error::Error + std::marker::Send + 'static)`
// Reference: https://github.com/rust-lang-nursery/failure/issues/284
let query_str = serde_qs::to_string(&query).map_err(SyncFailure::new)?;
let mut target = reqwest::Url::parse(DEFAULT_GB_ADDR)?;
target.set_query(Some(&query_str));
let req = new_request(Method::GET, target)?;
let resp = req.send().await?;
let content = resp.error_for_status()?;
let json = content.json::<graph::Graph>().await?;
Ok(json)
}
}

0 comments on commit a43cb89

Please sign in to comment.