Skip to content

Commit

Permalink
20240514
Browse files Browse the repository at this point in the history
  • Loading branch information
kwsc98 committed May 14, 2024
1 parent 633515a commit 8c86537
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 226 deletions.
28 changes: 13 additions & 15 deletions dubbo-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,30 @@
*/

use proc_macro::TokenStream;
use quote::{ToTokens};
use quote::ToTokens;
use syn::parse::Parser;

mod server_macro;
mod trait_macro;



#[proc_macro_attribute]
pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream {
let attr = DubboAttr::from_attr(attr);
match attr {
Ok(attr) => { trait_macro::dubbo_trait(attr, item) }
Err(err) => { err.into_compile_error().into() }
Ok(attr) => trait_macro::dubbo_trait(attr, item),
Err(err) => err.into_compile_error().into(),
}
}

#[proc_macro_attribute]
pub fn dubbo_server(attr: TokenStream, item: TokenStream) -> TokenStream {
let attr = DubboAttr::from_attr(attr);
match attr {
Ok(attr) => { server_macro::dubbo_server(attr, item) }
Err(err) => { err.into_compile_error().into() }
Ok(attr) => server_macro::dubbo_server(attr, item),
Err(err) => err.into_compile_error().into(),
}
}


#[derive(Default)]
struct DubboAttr {
package: Option<String>,
Expand All @@ -56,7 +53,9 @@ impl DubboAttr {
.and_then(|args| Self::build_attr(args))
}

fn build_attr(args: syn::punctuated::Punctuated::<syn::Meta, syn::Token![,]>) -> Result<DubboAttr, syn::Error> {
fn build_attr(
args: syn::punctuated::Punctuated<syn::Meta, syn::Token![,]>,
) -> Result<DubboAttr, syn::Error> {
let mut package = None;
let mut version = None;
for arg in args {
Expand All @@ -71,10 +70,12 @@ impl DubboAttr {
.to_string()
.to_lowercase();
let lit = match &namevalue.value {
syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit.to_token_stream().to_string(),
syn::Expr::Lit(syn::ExprLit { lit, .. }) => {
lit.to_token_stream().to_string()
}
expr => expr.to_token_stream().to_string(),
}
.replace("\"", "");
.replace("\"", "");
match ident.as_str() {
"package" => {
let _ = package.insert(lit);
Expand All @@ -99,9 +100,6 @@ impl DubboAttr {
}
}
}
Ok(DubboAttr {
package,
version,
})
Ok(DubboAttr { package, version })
}
}
35 changes: 18 additions & 17 deletions dubbo-macro/src/server_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/


use crate::DubboAttr;
use proc_macro::TokenStream;
use quote::{quote, ToTokens};
use syn::{parse_macro_input, FnArg, ItemImpl, ImplItem};
use syn::{parse_macro_input, FnArg, ImplItem, ItemImpl};

pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
let version = match attr.version {
Expand Down Expand Up @@ -48,7 +47,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
let token = quote! {
let result : Result<#req_type,_> = serde_json::from_slice(param_req[idx].as_bytes());
if let Err(err) = result {
param.res = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string()));
param.result = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string()));
return param;
}
let #req : #req_type = result.unwrap();
Expand All @@ -62,7 +61,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
);
vec.push(quote! {
if &param.method_name[..] == stringify!(#method) {
let param_req = &param.req;
let param_req = &param.args;
let mut idx = 0;
#(
#req
Expand All @@ -72,7 +71,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
#req_pat,
)*
).await;
param.res = match res {
param.result = match res {
Ok(res) => {
let res = serde_json::to_string(&res).unwrap();
Ok(res)
Expand All @@ -87,20 +86,23 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
vec
});
let service_unique = match &attr.package {
None => { quote!(stringify!(#item_trait)) }
Some(attr) => { quote!(#attr.to_owned() + "." + stringify!(#item_trait)) }
None => {
quote!(stringify!(#item_trait))
}
Some(attr) => {
let service_unique = attr.to_owned() + "." + &item_trait.to_string();
quote!(&#service_unique)
}
};
let expanded = quote! {

#server_item
use dubbo::triple::server::support::RpcServer;
use dubbo::triple::server::support::RpcFuture;
use dubbo::triple::server::support::RpcMsg;
impl RpcServer for #item_self {
fn invoke (&self, param : RpcMsg) -> RpcFuture<RpcMsg> {
impl dubbo::triple::server::support::RpcServer for #item_self {
fn invoke (&self, param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcFuture<dubbo::triple::server::support::RpcContext> {
let mut rpc = self.clone();
Box::pin(async move {rpc.prv_invoke(param).await})
}
fn get_info(&self) -> (&str , &str , Option<&str> , Vec<String>) {
fn get_info(&self) -> (&str, Option<&str>, Vec<String>) {
let mut methods = vec![];
#(
methods.push(stringify!(#items_ident_fn).to_string());
Expand All @@ -110,9 +112,9 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
}

impl #item_self {
async fn prv_invoke (&self, mut param : RpcMsg) -> RpcMsg {
async fn prv_invoke (&self, mut param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcContext {
#(#items_fn)*
param.res = Err(
param.result = Err(
dubbo::status::Status::new(dubbo::status::Code::NotFound,format!("not find method by {}",param.method_name))
);
return param;
Expand All @@ -122,7 +124,6 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream {
expanded.into()
}


fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream {
let impl_item = item.impl_token;
let trait_ident = item.trait_.unwrap().1;
Expand All @@ -141,4 +142,4 @@ fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream {
)*
}
}
}
}
44 changes: 20 additions & 24 deletions dubbo-macro/src/trait_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream {
vec.push(token);
vec
});
let package = stringify!(#trait_ident);
let package = trait_ident.to_string();
let service_unique = match &attr.package {
None => { quote!(#package) }
Some(attr) => { quote!(#attr.to_owned() + "." + #package) }
None => package.to_owned(),
Some(attr) => attr.to_owned() + "." + &package,
};
let path = "/".to_string() + &service_unique + "/" + &ident.to_string();
fn_quote.push(
quote! {
#[allow(non_snake_case)]
Expand All @@ -82,22 +83,26 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream {
req_vec.push(req_poi_str.unwrap());
)*
let _version : Option<&str> = #version;
let request = Request::new(TripleRequestWrapper::new(req_vec));
let request = dubbo::invocation::Request::new(dubbo::triple::triple_wrapper::TripleRequestWrapper::new(req_vec));
let service_unique = #service_unique;
let method_name = stringify!(#ident).to_string();
let invocation = dubbo::invocation::RpcInvocation::default()
.with_service_unique_name(service_unique.to_owned())
.with_method_name(method_name.clone());
let path = "/".to_string() + service_unique + "/" + &method_name;
let path = http::uri::PathAndQuery::from_str(
&path,
).unwrap();
let res = self.inner.unary::<TripleRequestWrapper,TripleResponseWrapper>(request, path, invocation).await;
let path = http::uri::PathAndQuery::from_static(
#path,
);
let res = self.inner.unary::<dubbo::triple::triple_wrapper::TripleRequestWrapper,dubbo::triple::triple_wrapper::TripleResponseWrapper>(request, path, invocation).await;
match res {
Ok(res) => {
let response_wrapper = res.into_parts().1;
let res: #output_type = serde_json::from_slice(&response_wrapper.data).unwrap();
Ok(res)
let data = &response_wrapper.data;
if data.starts_with(b"null") {
Err(dubbo::status::Status::new(dubbo::status::Code::DataLoss,"null".to_string()))
} else {
let res: #output_type = serde_json::from_slice(data).unwrap();
Ok(res)
}
},
Err(err) => Err(err)
}
Expand All @@ -107,33 +112,24 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream {
}
let rpc_client = syn::Ident::new(&format!("{}Client", trait_ident), trait_ident.span());
let expanded = quote! {
use dubbo::triple::client::TripleClient;
use dubbo::triple::triple_wrapper::TripleRequestWrapper;
use dubbo::triple::triple_wrapper::TripleResponseWrapper;
use dubbo::triple::codec::prost::ProstCodec;
use dubbo::invocation::Request;
use dubbo::invocation::Response;
use dubbo::triple::client::builder::ClientBuilder;
use std::str::FromStr;

#item_trait

#vis struct #rpc_client {
inner: TripleClient
inner: dubbo::triple::client::TripleClient
}
impl #rpc_client {
#(
#fn_quote
)*
pub fn new(builder: ClientBuilder) -> #rpc_client {
#rpc_client {inner: TripleClient::new(builder),}
pub fn new(builder: dubbo::triple::client::builder::ClientBuilder) -> #rpc_client {
#rpc_client {inner: dubbo::triple::client::TripleClient::new(builder),}
}
}
};
TokenStream::from(expanded)
}


fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream {
let trait_ident = &item.ident;
let item_fn = item.items.iter().fold(vec![], |mut vec, e| {
Expand Down Expand Up @@ -162,4 +158,4 @@ fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream {
)*
}
}
}
}
7 changes: 7 additions & 0 deletions dubbo/src/config/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ServiceConfig {
pub group: String,
pub protocol: String,
pub interface: String,
pub serialization: Option<String>,
}

impl ServiceConfig {
Expand All @@ -41,4 +42,10 @@ impl ServiceConfig {
pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
pub fn serialization(self, serialization: Option<String>) -> Self {
Self {
serialization,
..self
}
}
}
15 changes: 10 additions & 5 deletions dubbo/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use crate::{
logger::tracing::{debug, info},
protocol::{BoxExporter, Protocol},
registry::protocol::RegistryProtocol,
triple::server::support::{RpcHttp2Server, RpcServer},
Url,
};
use futures::{future, Future};
use crate::triple::server::support::{RpcHttp2Server, RpcServer};

// Invoker是否可以基于hyper写一个通用的

Expand Down Expand Up @@ -93,12 +93,17 @@ impl Dubbo {
.protocols
.get_protocol_or_default(service_config.protocol.as_str());
let interface_name = service_config.interface.clone();
let protocol_url = format!(
"{}/{}?interface={}",
let mut protocol_url = format!(
"{}/{}?interface={}&category={}&protocol={}",
protocol.to_url(),
interface_name,
interface_name
interface_name,
"providers",
"tri"
);
if let Some(serialization) = &service_config.serialization {
protocol_url.push_str(&format!("&prefer.serialization={}", serialization));
}
info!("protocol_url: {:?}", protocol_url);
protocol_url.parse().ok()
} else {
Expand Down Expand Up @@ -142,7 +147,7 @@ impl Dubbo {
.with_registries(registry_extensions.clone())
.with_services(self.service_registry.clone()),
);
let mut async_vec: Vec<Pin<Box<dyn Future<Output=BoxExporter> + Send>>> = Vec::new();
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
info!("base: {:?}, service url: {:?}", name, url);
Expand Down
Loading

0 comments on commit 8c86537

Please sign in to comment.