Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
onewe committed May 10, 2024
1 parent 1064a1d commit 33ce8db
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 84 deletions.
118 changes: 64 additions & 54 deletions dubbo/src/extension/invoker_extension.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,87 @@
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use crate::{
extension::{
invoker_extension::proxy::InvokerProxy, Extension, ExtensionFactories, ExtensionMetaInfo,
LoadExtensionPromise,
},
params::extension_param::{ExtensionName, ExtensionType},
url::UrlParam,
StdError, Url,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::Stream;
use crate::{StdError, Url};
use crate::extension::{Extension, ExtensionFactories, ExtensionMetaInfo, LoadExtensionPromise};
use crate::extension::invoker_extension::proxy::InvokerProxy;
use crate::params::extension_param::{ExtensionName, ExtensionType};
use crate::url::UrlParam;

use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};

#[async_trait]
pub trait Invoker {

async fn invoke(&self, invocation: GrpcInvocation) -> Result<Pin<Box<dyn Stream<Item= Bytes> + Send + 'static>>, StdError>;
async fn invoke(
&self,
invocation: GrpcInvocation,
) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError>;

async fn url(&self) -> Result<Url, StdError>;

}

pub enum CallType {
Unary,
ClientStream,
ServerStream,
BiStream
BiStream,
}

pub struct GrpcInvocation {
service_name: String,
method_name: String,
arguments: Vec<Argument>,
attachments: HashMap<String, String>,
call_type: CallType
call_type: CallType,
}

pub struct Argument {
name: String,
value: Box<dyn Stream<Item = Box<dyn Serializable + Send + 'static>> + Send + 'static>
value: Box<dyn Stream<Item = Box<dyn Serializable + Send + 'static>> + Send + 'static>,
}


pub trait Serializable {
fn serialize(&self, serialization_type: String) -> Result<Bytes, StdError>;
}


pub trait Deserializable {
fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result<Self, StdError> where Self: Sized;
fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result<Self, StdError>
where
Self: Sized;
}

pub mod proxy {
use std::pin::Pin;
use crate::{
extension::invoker_extension::{GrpcInvocation, Invoker},
StdError, Url,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::Stream;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use crate::extension::invoker_extension::{GrpcInvocation, Invoker};
use crate::{StdError, Url};
use std::pin::Pin;
use tokio::sync::{mpsc::Sender, oneshot};

pub(super) enum InvokerOpt {
Invoke(GrpcInvocation, oneshot::Sender<Result<Pin<Box<dyn Stream<Item= Bytes> + Send + 'static>>, StdError>>),
Url(oneshot::Sender<Result<Url, StdError>>)
Invoke(
GrpcInvocation,
oneshot::Sender<Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError>>,
),
Url(oneshot::Sender<Result<Url, StdError>>),
}

#[derive(Clone)]
pub struct InvokerProxy {
tx: Sender<InvokerOpt>
tx: Sender<InvokerOpt>,
}

#[async_trait]
impl Invoker for InvokerProxy {
async fn invoke(&self, invocation: GrpcInvocation) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError> {
async fn invoke(
&self,
invocation: GrpcInvocation,
) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(InvokerOpt::Invoke(invocation, tx));
let ret = rx.await?;
Expand All @@ -97,33 +105,28 @@ pub mod proxy {
InvokerOpt::Invoke(invocation, tx) => {
let result = invoker.invoke(invocation).await;
let _ = tx.send(result);
},
}
InvokerOpt::Url(tx) => {
let _ = tx.send(invoker.url().await);
}
}
}
});
InvokerProxy {
tx
}
InvokerProxy { tx }
}
}
}


#[derive(Default)]
pub(super) struct InvokerExtensionLoader {
factories: HashMap<String, InvokerExtensionFactory>
factories: HashMap<String, InvokerExtensionFactory>,
}

impl InvokerExtensionLoader {

pub fn register(&mut self, extension_name: String, factory: InvokerExtensionFactory) {
self.factories.insert(extension_name, factory);
}


pub fn remove(&mut self, extension_name: String) {
self.factories.remove(&extension_name);
}
Expand All @@ -133,63 +136,68 @@ impl InvokerExtensionLoader {
let extension_name = extension_name.value();
let factory = self.factories.get_mut(&extension_name).unwrap();
factory.create(url)

}
}



type InvokerExtensionConstructor = fn(Url) -> Pin<Box<dyn Future<Output=Result<Box<dyn Invoker + Send + 'static>, StdError>> + Send + 'static>>;
type InvokerExtensionConstructor = fn(
Url,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn Invoker + Send + 'static>, StdError>> + Send + 'static>,
>;
pub(crate) struct InvokerExtensionFactory {
constructor: InvokerExtensionConstructor,
instances: HashMap<String, LoadExtensionPromise<InvokerProxy>>
instances: HashMap<String, LoadExtensionPromise<InvokerProxy>>,
}


impl InvokerExtensionFactory {
pub fn new(constructor: InvokerExtensionConstructor) -> Self {

Self {
constructor,
instances: HashMap::default()
instances: HashMap::default(),
}
}
}


impl InvokerExtensionFactory {

pub fn create(&mut self, url: Url) -> Result<LoadExtensionPromise<InvokerProxy>, StdError> {
let key = url.to_string();

match self.instances.get(&key) {
Some(instance) => Ok(instance.clone()),
None => {
let constructor = self.constructor;
let creator = move |url: Url| {
let creator = move |url: Url| {
let invoker_future = constructor(url);
Box::pin(async move {
let invoker = invoker_future.await?;
Ok(InvokerProxy::from(invoker))
}) as Pin<Box<dyn Future<Output=Result<InvokerProxy, StdError>> + Send + 'static>>
})
as Pin<
Box<
dyn Future<Output = Result<InvokerProxy, StdError>>
+ Send
+ 'static,
>,
>
};

let promise: LoadExtensionPromise<InvokerProxy> = LoadExtensionPromise::new(Box::new(creator), url);
let promise: LoadExtensionPromise<InvokerProxy> =
LoadExtensionPromise::new(Box::new(creator), url);
self.instances.insert(key, promise.clone());
Ok(promise)
}
}
}
}


pub struct InvokerExtension<T>(PhantomData<T>) where T: Invoker + Send + 'static;

pub struct InvokerExtension<T>(PhantomData<T>)
where
T: Invoker + Send + 'static;

impl<T> ExtensionMetaInfo for InvokerExtension<T>
where
T: Invoker + Send + 'static,
T: Extension<Target= Box<dyn Invoker + Send + 'static>>
T: Extension<Target = Box<dyn Invoker + Send + 'static>>,
{
fn name() -> String {
T::name()
Expand All @@ -200,6 +208,8 @@ where
}

fn extension_factory() -> ExtensionFactories {
ExtensionFactories::InvokerExtensionFactory(InvokerExtensionFactory::new(<T as Extension>::create))
ExtensionFactories::InvokerExtensionFactory(InvokerExtensionFactory::new(
<T as Extension>::create,
))
}
}
31 changes: 13 additions & 18 deletions dubbo/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

pub mod registry_extension;
mod invoker_extension;
pub mod registry_extension;

use crate::{
extension::registry_extension::proxy::RegistryProxy,
extension::{
invoker_extension::proxy::InvokerProxy,
registry_extension::{proxy::RegistryProxy, RegistryExtension},
},
logger::tracing::{error, info},
params::extension_param::ExtensionType,
registry::registry::StaticRegistry,
Expand All @@ -29,8 +32,6 @@ use crate::{
use std::{future::Future, pin::Pin, sync::Arc};
use thiserror::Error;
use tokio::sync::{oneshot, RwLock};
use crate::extension::invoker_extension::proxy::InvokerProxy;
use crate::extension::registry_extension::RegistryExtension;

pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
Expand Down Expand Up @@ -96,20 +97,16 @@ impl ExtensionDirectory {
self.registry_extension_loader
.register(extension_name, registry_extension_factory);
Ok(())
},
_ => {
Ok(())
}
_ => Ok(()),
},
ExtensionType::Invoker => match extension_factories {
ExtensionFactories::InvokerExtensionFactory(invoker_extension_factory) => {
self.invoker_extension_loader
.register(extension_name, invoker_extension_factory);
Ok(())
},
_ => {
Ok(())
}
_ => Ok(()),
},
}
}
Expand All @@ -123,7 +120,7 @@ impl ExtensionDirectory {
ExtensionType::Registry => {
self.registry_extension_loader.remove(extension_name);
Ok(())
},
}
ExtensionType::Invoker => {
self.invoker_extension_loader.remove(extension_name);
Ok(())
Expand Down Expand Up @@ -160,24 +157,24 @@ impl ExtensionDirectory {
let _ = callback.send(Err(err));
}
}
},
}
ExtensionType::Invoker => {
let extension = self.invoker_extension_loader.load(url);
match extension {
Ok(mut extension) => {
tokio::spawn(async move {
let invoker = extension.resolve().await;
let invoker = extension.resolve().await;
match invoker {
Ok(invoker) => {
let _ = callback.send(Ok(Extensions::Invoker(invoker)));
},
}
Err(err) => {
error!("load invoker extension failed: {}", err);
let _ = callback.send(Err(err));
}
}
});
},
}
Err(err) => {
error!("load invoker extension failed: {}", err);
let _ = callback.send(Err(err));
Expand Down Expand Up @@ -418,7 +415,6 @@ enum ExtensionOpt {
),
}


#[allow(private_bounds)]
#[async_trait::async_trait]
pub trait Extension {
Expand All @@ -443,10 +439,9 @@ pub(crate) enum Extensions {

pub(crate) enum ExtensionFactories {
RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
InvokerExtensionFactory(invoker_extension::InvokerExtensionFactory)
InvokerExtensionFactory(invoker_extension::InvokerExtensionFactory),
}


#[derive(Error, Debug)]
#[error("{0}")]
pub(crate) struct RegisterExtensionError(String);
Expand Down
11 changes: 5 additions & 6 deletions dubbo/src/extension/registry_extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
* limitations under the License.
*/

use std::{collections::HashMap, future::Future, pin::Pin};
use std::marker::PhantomData;
use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};

use async_trait::async_trait;
use thiserror::Error;
Expand All @@ -31,8 +30,7 @@ use crate::{
use proxy::RegistryProxy;

use crate::extension::{
Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType,
LoadExtensionPromise,
Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, LoadExtensionPromise,
};

// extension://0.0.0.0/?extension-type=registry&extension-name=nacos&registry-url=nacos://127.0.0.1:8848
Expand Down Expand Up @@ -64,7 +62,9 @@ pub trait Registry {
fn url(&self) -> &Url;
}

pub struct RegistryExtension<T>(PhantomData<T>) where T: Registry + Send + Sync + 'static;
pub struct RegistryExtension<T>(PhantomData<T>)
where
T: Registry + Send + Sync + 'static;

impl<T> ExtensionMetaInfo for RegistryExtension<T>
where
Expand All @@ -86,7 +86,6 @@ where
}
}


#[derive(Default)]
pub(super) struct RegistryExtensionLoader {
factories: HashMap<String, RegistryExtensionFactory>,
Expand Down
Loading

0 comments on commit 33ce8db

Please sign in to comment.