diff --git a/dubbo/src/extension/invoker_extension.rs b/dubbo/src/extension/invoker_extension.rs index 58e5a2b6..e1e2e22f 100644 --- a/dubbo/src/extension/invoker_extension.rs +++ b/dubbo/src/extension/invoker_extension.rs @@ -1,31 +1,32 @@ -use std::collections::HashMap; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; +use crate::{ + extension::{ + invoker_extension::proxy::InvokerProxy, Extension, ExtensionFactories, ExtensionMetaInfo, + LoadExtensionPromise, + }, + params::extension_param::{ExtensionName, ExtensionType}, + url::UrlParam, + StdError, Url, +}; use async_trait::async_trait; use bytes::Bytes; use futures_core::Stream; -use crate::{StdError, Url}; -use crate::extension::{Extension, ExtensionFactories, ExtensionMetaInfo, LoadExtensionPromise}; -use crate::extension::invoker_extension::proxy::InvokerProxy; -use crate::params::extension_param::{ExtensionName, ExtensionType}; -use crate::url::UrlParam; - +use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin}; #[async_trait] pub trait Invoker { - - async fn invoke(&self, invocation: GrpcInvocation) -> Result + Send + 'static>>, StdError>; + async fn invoke( + &self, + invocation: GrpcInvocation, + ) -> Result + Send + 'static>>, StdError>; async fn url(&self) -> Result; - } pub enum CallType { Unary, ClientStream, ServerStream, - BiStream + BiStream, } pub struct GrpcInvocation { @@ -33,47 +34,54 @@ pub struct GrpcInvocation { method_name: String, arguments: Vec, attachments: HashMap, - call_type: CallType + call_type: CallType, } pub struct Argument { name: String, - value: Box> + Send + 'static> + value: Box> + Send + 'static>, } - pub trait Serializable { fn serialize(&self, serialization_type: String) -> Result; } - pub trait Deserializable { - fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result where Self: Sized; + fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result + where + Self: Sized; } pub mod proxy { - use std::pin::Pin; + use crate::{ + extension::invoker_extension::{GrpcInvocation, Invoker}, + StdError, Url, + }; use async_trait::async_trait; use bytes::Bytes; use futures_core::Stream; - use tokio::sync::mpsc::Sender; - use tokio::sync::oneshot; - use crate::extension::invoker_extension::{GrpcInvocation, Invoker}; - use crate::{StdError, Url}; + use std::pin::Pin; + use tokio::sync::{mpsc::Sender, oneshot}; pub(super) enum InvokerOpt { - Invoke(GrpcInvocation, oneshot::Sender + Send + 'static>>, StdError>>), - Url(oneshot::Sender>) + Invoke( + GrpcInvocation, + oneshot::Sender + Send + 'static>>, StdError>>, + ), + Url(oneshot::Sender>), } #[derive(Clone)] pub struct InvokerProxy { - tx: Sender + tx: Sender, } #[async_trait] impl Invoker for InvokerProxy { - async fn invoke(&self, invocation: GrpcInvocation) -> Result + Send + 'static>>, StdError> { + async fn invoke( + &self, + invocation: GrpcInvocation, + ) -> Result + Send + 'static>>, StdError> { let (tx, rx) = oneshot::channel(); let _ = self.tx.send(InvokerOpt::Invoke(invocation, tx)); let ret = rx.await?; @@ -97,33 +105,28 @@ pub mod proxy { InvokerOpt::Invoke(invocation, tx) => { let result = invoker.invoke(invocation).await; let _ = tx.send(result); - }, + } InvokerOpt::Url(tx) => { let _ = tx.send(invoker.url().await); } } } }); - InvokerProxy { - tx - } + InvokerProxy { tx } } } } - #[derive(Default)] pub(super) struct InvokerExtensionLoader { - factories: HashMap + factories: HashMap, } impl InvokerExtensionLoader { - pub fn register(&mut self, extension_name: String, factory: InvokerExtensionFactory) { self.factories.insert(extension_name, factory); } - pub fn remove(&mut self, extension_name: String) { self.factories.remove(&extension_name); } @@ -133,32 +136,29 @@ impl InvokerExtensionLoader { let extension_name = extension_name.value(); let factory = self.factories.get_mut(&extension_name).unwrap(); factory.create(url) - } } - - -type InvokerExtensionConstructor = fn(Url) -> Pin, StdError>> + Send + 'static>>; +type InvokerExtensionConstructor = fn( + Url, +) -> Pin< + Box, StdError>> + Send + 'static>, +>; pub(crate) struct InvokerExtensionFactory { constructor: InvokerExtensionConstructor, - instances: HashMap> + instances: HashMap>, } - impl InvokerExtensionFactory { pub fn new(constructor: InvokerExtensionConstructor) -> Self { - Self { constructor, - instances: HashMap::default() + instances: HashMap::default(), } } } - impl InvokerExtensionFactory { - pub fn create(&mut self, url: Url) -> Result, StdError> { let key = url.to_string(); @@ -166,15 +166,23 @@ impl InvokerExtensionFactory { Some(instance) => Ok(instance.clone()), None => { let constructor = self.constructor; - let creator = move |url: Url| { + let creator = move |url: Url| { let invoker_future = constructor(url); Box::pin(async move { let invoker = invoker_future.await?; Ok(InvokerProxy::from(invoker)) - }) as Pin> + Send + 'static>> + }) + as Pin< + Box< + dyn Future> + + Send + + 'static, + >, + > }; - let promise: LoadExtensionPromise = LoadExtensionPromise::new(Box::new(creator), url); + let promise: LoadExtensionPromise = + LoadExtensionPromise::new(Box::new(creator), url); self.instances.insert(key, promise.clone()); Ok(promise) } @@ -182,14 +190,14 @@ impl InvokerExtensionFactory { } } - -pub struct InvokerExtension(PhantomData) where T: Invoker + Send + 'static; - +pub struct InvokerExtension(PhantomData) +where + T: Invoker + Send + 'static; impl ExtensionMetaInfo for InvokerExtension where T: Invoker + Send + 'static, - T: Extension> + T: Extension>, { fn name() -> String { T::name() @@ -200,6 +208,8 @@ where } fn extension_factory() -> ExtensionFactories { - ExtensionFactories::InvokerExtensionFactory(InvokerExtensionFactory::new(::create)) + ExtensionFactories::InvokerExtensionFactory(InvokerExtensionFactory::new( + ::create, + )) } } diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs index d7020e64..f08860fa 100644 --- a/dubbo/src/extension/mod.rs +++ b/dubbo/src/extension/mod.rs @@ -15,11 +15,14 @@ * limitations under the License. */ -pub mod registry_extension; mod invoker_extension; +pub mod registry_extension; use crate::{ - extension::registry_extension::proxy::RegistryProxy, + extension::{ + invoker_extension::proxy::InvokerProxy, + registry_extension::{proxy::RegistryProxy, RegistryExtension}, + }, logger::tracing::{error, info}, params::extension_param::ExtensionType, registry::registry::StaticRegistry, @@ -29,8 +32,6 @@ use crate::{ use std::{future::Future, pin::Pin, sync::Arc}; use thiserror::Error; use tokio::sync::{oneshot, RwLock}; -use crate::extension::invoker_extension::proxy::InvokerProxy; -use crate::extension::registry_extension::RegistryExtension; pub static EXTENSIONS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| ExtensionDirectory::init()); @@ -96,20 +97,16 @@ impl ExtensionDirectory { self.registry_extension_loader .register(extension_name, registry_extension_factory); Ok(()) - }, - _ => { - Ok(()) } + _ => Ok(()), }, ExtensionType::Invoker => match extension_factories { ExtensionFactories::InvokerExtensionFactory(invoker_extension_factory) => { self.invoker_extension_loader .register(extension_name, invoker_extension_factory); Ok(()) - }, - _ => { - Ok(()) } + _ => Ok(()), }, } } @@ -123,7 +120,7 @@ impl ExtensionDirectory { ExtensionType::Registry => { self.registry_extension_loader.remove(extension_name); Ok(()) - }, + } ExtensionType::Invoker => { self.invoker_extension_loader.remove(extension_name); Ok(()) @@ -160,24 +157,24 @@ impl ExtensionDirectory { let _ = callback.send(Err(err)); } } - }, + } ExtensionType::Invoker => { let extension = self.invoker_extension_loader.load(url); match extension { Ok(mut extension) => { tokio::spawn(async move { - let invoker = extension.resolve().await; + let invoker = extension.resolve().await; match invoker { Ok(invoker) => { let _ = callback.send(Ok(Extensions::Invoker(invoker))); - }, + } Err(err) => { error!("load invoker extension failed: {}", err); let _ = callback.send(Err(err)); } } }); - }, + } Err(err) => { error!("load invoker extension failed: {}", err); let _ = callback.send(Err(err)); @@ -418,7 +415,6 @@ enum ExtensionOpt { ), } - #[allow(private_bounds)] #[async_trait::async_trait] pub trait Extension { @@ -443,10 +439,9 @@ pub(crate) enum Extensions { pub(crate) enum ExtensionFactories { RegistryExtensionFactory(registry_extension::RegistryExtensionFactory), - InvokerExtensionFactory(invoker_extension::InvokerExtensionFactory) + InvokerExtensionFactory(invoker_extension::InvokerExtensionFactory), } - #[derive(Error, Debug)] #[error("{0}")] pub(crate) struct RegisterExtensionError(String); diff --git a/dubbo/src/extension/registry_extension.rs b/dubbo/src/extension/registry_extension.rs index a4102265..2e9291b3 100644 --- a/dubbo/src/extension/registry_extension.rs +++ b/dubbo/src/extension/registry_extension.rs @@ -15,8 +15,7 @@ * limitations under the License. */ -use std::{collections::HashMap, future::Future, pin::Pin}; -use std::marker::PhantomData; +use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin}; use async_trait::async_trait; use thiserror::Error; @@ -31,8 +30,7 @@ use crate::{ use proxy::RegistryProxy; use crate::extension::{ - Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, - LoadExtensionPromise, + Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, LoadExtensionPromise, }; // extension://0.0.0.0/?extension-type=registry&extension-name=nacos®istry-url=nacos://127.0.0.1:8848 @@ -64,7 +62,9 @@ pub trait Registry { fn url(&self) -> &Url; } -pub struct RegistryExtension(PhantomData) where T: Registry + Send + Sync + 'static; +pub struct RegistryExtension(PhantomData) +where + T: Registry + Send + Sync + 'static; impl ExtensionMetaInfo for RegistryExtension where @@ -86,7 +86,6 @@ where } } - #[derive(Default)] pub(super) struct RegistryExtensionLoader { factories: HashMap, diff --git a/dubbo/src/params/extension_param.rs b/dubbo/src/params/extension_param.rs index 624d0313..08ec1c97 100644 --- a/dubbo/src/params/extension_param.rs +++ b/dubbo/src/params/extension_param.rs @@ -51,7 +51,7 @@ impl FromStr for ExtensionName { pub enum ExtensionType { Registry, - Invoker + Invoker, } impl UrlParam for ExtensionType { diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index b52abffe..f6fea892 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -22,9 +22,8 @@ pub mod protos { use dubbo::codegen::*; -use dubbo::extension; +use dubbo::{extension, extension::registry_extension::RegistryExtension}; use futures_util::StreamExt; -use dubbo::extension::registry_extension::RegistryExtension; use protos::{greeter_client::GreeterClient, GreeterRequest}; use registry_nacos::NacosRegistry; @@ -32,7 +31,9 @@ use registry_nacos::NacosRegistry; async fn main() { dubbo::logger::init(); - let _ = extension::EXTENSIONS.register::>().await; + let _ = extension::EXTENSIONS + .register::>() + .await; let builder = ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap()); diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs index 7f4c99a6..aecc1f8e 100644 --- a/examples/greeter/src/greeter/server.rs +++ b/examples/greeter/src/greeter/server.rs @@ -27,13 +27,13 @@ use dubbo::{ codegen::*, config::RootConfig, extension, + extension::registry_extension::RegistryExtension, logger::{ tracing::{info, span}, Level, }, Dubbo, }; -use dubbo::extension::registry_extension::RegistryExtension; use protos::{ greeter_server::{register_server, Greeter}, GreeterReply, GreeterRequest, @@ -61,7 +61,9 @@ async fn main() { Err(_err) => panic!("err: {:?}", _err), // response was droped }; - let _ = extension::EXTENSIONS.register::>().await; + let _ = extension::EXTENSIONS + .register::>() + .await; let mut f = Dubbo::new() .with_config(r) .add_registry("nacos://127.0.0.1:8848/");