Skip to content

Commit

Permalink
refact(loadbalance): update ld api design
Browse files Browse the repository at this point in the history
  • Loading branch information
yang20150702 committed Jun 20, 2024
1 parent 929e6b0 commit 67addec
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
67 changes: 47 additions & 20 deletions dubbo/src/loadbalancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

pub mod random;

use crate::{invocation::Metadata, StdError};
use futures_core::future::BoxFuture;
use tower::{balance::p2c::Balance, discover::ServiceList, load::Constant, ServiceExt};
use std::error::Error;
use tokio::time::Duration;
use tower::{discover::ServiceList, ServiceExt};
use tower_service::Service;
use tracing::debug;

use crate::{
codegen::RpcInvocation,
invocation::Metadata,
invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
loadbalancer::random::RandomLoadBalancer,
param::Param,
protocol::triple::triple_invoker::TripleInvoker,
svc::NewService,
StdError,
};

use crate::protocol::triple::triple_invoker::TripleInvoker;

pub struct NewLoadBalancer<N> {
inner: N,
}
Expand Down Expand Up @@ -110,11 +113,10 @@ where
// let service_list = ServiceList::new(service_list);

// let p2c = tower::balance::p2c::Balance::new(service_list);
let p = P2cBalanceer::default();
let ivk: Balance<
ServiceList<Vec<Constant<CloneInvoker<TripleInvoker>, i32>>>,
http::Request<CloneBody>,
> = p.select_invokers(routes, metadata);
// let p: Box<dyn LoadBalancer<Invoker = BoxService<http::Request<CloneBody>, http::Response<UnsyncBoxBody<bytes::Bytes, status::Status>>, Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync> = get_loadbalancer("p2c").into();
let p = get_loadbalancer("p2c");
// let ivk = p.select_invokers(invokers, metadata);
let ivk = p.select_invokers(routes, metadata);

ivk.oneshot(req).await
};
Expand All @@ -123,27 +125,43 @@ where
}
}

type DubboBoxService = tower::util::BoxService<
http::Request<CloneBody>,
http::Response<crate::BoxBody>,
Box<dyn Error + Send + Sync>,
>;

pub trait LoadBalancer {
type Invoker;

fn select_invokers(
self,
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
metadata: Metadata,
) -> Self::Invoker;
}

fn get_loadbalancer(
loadbalancer: &str,
) -> Box<dyn LoadBalancer<Invoker = DubboBoxService> + Send + Sync + 'static> {
match loadbalancer {
"random" => {
println!("random!");
Box::new(RandomLoadBalancer::default())
}
"p2c" => Box::new(P2cBalancer::default()),
_ => Box::new(P2cBalancer::default()),
}
}
const DEFAULT_RTT: Duration = Duration::from_millis(30);
#[derive(Debug, Default)]
pub struct P2cBalanceer {}
pub struct P2cBalancer {}

impl LoadBalancer for P2cBalanceer {
type Invoker = Balance<
ServiceList<Vec<Constant<CloneInvoker<TripleInvoker>, i32>>>,
http::Request<CloneBody>,
>;
impl LoadBalancer for P2cBalancer {
type Invoker = DubboBoxService;

fn select_invokers(
self,
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
_metadata: Metadata,
) -> Self::Invoker {
Expand All @@ -152,9 +170,18 @@ impl LoadBalancer for P2cBalanceer {
.into_iter()
.map(|invoker| tower::load::Constant::new(invoker, 1))
.collect();
let service_list = ServiceList::new(service_list);

let p = tower::balance::p2c::Balance::new(service_list);
p
let decay = Duration::from_secs(10);
let service_list = ServiceList::new(service_list);
let s = tower::load::PeakEwmaDiscover::new(
service_list,
DEFAULT_RTT,
decay,
tower::load::CompleteOnResponse::default(),
);

let p = tower::balance::p2c::Balance::new(s);
let svc = DubboBoxService::new(p);
svc
}
}
15 changes: 10 additions & 5 deletions dubbo/src/loadbalancer/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use rand::prelude::SliceRandom;
use tracing::debug;

use super::LoadBalancer;
use super::{DubboBoxService, LoadBalancer};
use crate::{
invocation::Metadata, loadbalancer::CloneInvoker,
protocol::triple::triple_invoker::TripleInvoker,
Expand All @@ -28,10 +28,15 @@ use crate::{
pub struct RandomLoadBalancer {}

impl LoadBalancer for RandomLoadBalancer {
type Invoker = CloneInvoker<TripleInvoker>;
type Invoker = DubboBoxService;

fn select_invokers(self, invokers: Vec<Self::Invoker>, _metadata: Metadata) -> Self::Invoker {
debug!("random loadbalance");
invokers.choose(&mut rand::thread_rng()).unwrap().clone()
fn select_invokers(
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
metadata: Metadata,
) -> Self::Invoker {
debug!("random loadbalance {:?}", metadata);
let ivk = invokers.choose(&mut rand::thread_rng()).unwrap().clone();
DubboBoxService::new(ivk)
}
}

0 comments on commit 67addec

Please sign in to comment.