From f897d1d923069b44a54f13d4eded1bfe3c610079 Mon Sep 17 00:00:00 2001 From: Jay Date: Tue, 16 Mar 2021 16:44:25 +0800 Subject: [PATCH] *: implements health check (#518) This PR provides simple health check implementations referring to the go version. It provides a HealthService to maintain the service statuses and wake up watchers. I use a standalone crate to avoid introducing protobuf specific code into the grpcio crate. And I don't reuse grpcio-proto to avoid the dependency of protobuf-build which just make things complicated by introducing a lot of dependencies and resulting in a dependency circle. Signed-off-by: Jay Lee --- Cargo.toml | 9 +- health/Cargo.toml | 26 + health/README.md | 8 + health/src/lib.rs | 38 ++ health/src/proto.rs | 21 + health/src/proto/grpc.health.v1.rs | 48 ++ health/src/proto/health.rs | 514 ++++++++++++++++++ health/src/proto/health_grpc.rs | 140 +++++ health/src/service.rs | 237 ++++++++ health/tests/health_check.rs | 161 ++++++ tests-and-examples/Cargo.toml | 5 +- .../tests/cases/health_check.rs | 96 ---- tests-and-examples/tests/cases/mod.rs | 1 - 13 files changed, 1204 insertions(+), 100 deletions(-) create mode 100644 health/Cargo.toml create mode 100644 health/README.md create mode 100644 health/src/lib.rs create mode 100644 health/src/proto.rs create mode 100644 health/src/proto/grpc.health.v1.rs create mode 100644 health/src/proto/health.rs create mode 100644 health/src/proto/health_grpc.rs create mode 100644 health/src/service.rs create mode 100644 health/tests/health_check.rs delete mode 100644 tests-and-examples/tests/cases/health_check.rs diff --git a/Cargo.toml b/Cargo.toml index 9d89096b9..6de989d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,14 @@ log = "0.4" parking_lot = "0.11" [workspace] -members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"] +members = [ + "proto", + "benchmark", + "compiler", + "health", + "interop", + "tests-and-examples" +] [features] default = ["protobuf-codec", "secure", "use-bindgen"] diff --git a/health/Cargo.toml b/health/Cargo.toml new file mode 100644 index 000000000..df3a48a54 --- /dev/null +++ b/health/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "grpcio-health" +version = "0.8.0" +edition = "2018" +authors = ["The TiKV Project Developers"] +license = "Apache-2.0" +keywords = ["grpc", "healthcheck"] +repository = "https://github.com/tikv/grpc-rs" +homepage = "https://github.com/tikv/grpc-rs" +documentation = "https://docs.rs/grpcio-health" +description = "Health check wrappers for grpcio" +categories = ["network-programming"] +readme = "README.md" + +[features] +default = ["protobuf-codec", "use-bindgen"] +protobuf-codec = ["grpcio/protobuf-codec", "protobuf"] +prost-codec = ["grpcio/prost-codec", "prost"] +use-bindgen = ["grpcio/use-bindgen"] + +[dependencies] +futures = "0.3" +grpcio = { path = "..", features = ["secure"], version = "0.8.0", default-features = false } +prost = { version = "0.7", optional = true } +protobuf = { version = "2", optional = true } +log = "0.4" diff --git a/health/README.md b/health/README.md new file mode 100644 index 000000000..41e56e748 --- /dev/null +++ b/health/README.md @@ -0,0 +1,8 @@ +# grpcio-healthcheck + +[![Crates.io](https://img.shields.io/crates/v/grpcio-health.svg?maxAge=2592000)](https://crates.io/crates/grpcio-health) +[![docs.rs](https://docs.rs/grpcio-health/badge.svg)](https://docs.rs/grpcio-health) + +grpcio-health provides health check protos as well as some helper structs to make +health check easily. + diff --git a/health/src/lib.rs b/health/src/lib.rs new file mode 100644 index 000000000..f1fb76711 --- /dev/null +++ b/health/src/lib.rs @@ -0,0 +1,38 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +//! grpcio-health provides health check protos as well as some helper structs to make +//! health check easily. For the detail design of health checking service, see +//! https://github.com/grpc/grpc/blob/master/doc/health-checking.md. +//! +//! ### Usage +//! +//! The crate provides a default implementation of `Health` service, you can use it +//! to maintain the service states. First, you need to register it to the server builder +//! so that it can serve health check service later. +//! ```ignore +//! use grpcio_health::{HealthService, create_health}; +//! +//! let service = HealthService::default(); +//! let builder = builder.register_service(create_health(service.clone())); +//! ``` +//! Then insert service status for query. +//! ```ignore +//! service.set_serving_status("", ServingStatus::Serving); +//! ``` +//! `""` means overall health status. You can also provide specific service name. +//! +//! Client can either use `check` to do one time query or `watch` to observe status changes. +//! ```ignore +//! use grpcio_health::proto::HealthCheckRequest; +//! +//! let client = HealthClient::new(ch); +//! let req = HealthCheckRequest { service: "".to_string(), ..Default::default() }; +//! let status_resp = client.check_async(&req).await.unwrap(); +//! assert_eq!(statuss_resp.status, ServingStatus::Serving); +//! ``` + +pub mod proto; +mod service; + +pub use self::proto::{create_health, HealthClient, ServingStatus}; +pub use self::service::HealthService; diff --git a/health/src/proto.rs b/health/src/proto.rs new file mode 100644 index 000000000..244a014ab --- /dev/null +++ b/health/src/proto.rs @@ -0,0 +1,21 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +#[cfg(feature = "prost-codec")] +mod reexports { + include!("proto/grpc.health.v1.rs"); + + pub use self::health_check_response::ServingStatus; +} + +#[cfg(feature = "protobuf-codec")] +#[allow(deprecated)] +mod health; +#[cfg(feature = "protobuf-codec")] +mod health_grpc; +#[cfg(feature = "protobuf-codec")] +mod reexports { + pub use super::health::*; + pub use HealthCheckResponseServingStatus as ServingStatus; +} + +pub use self::reexports::*; diff --git a/health/src/proto/grpc.health.v1.rs b/health/src/proto/grpc.health.v1.rs new file mode 100644 index 000000000..272356477 --- /dev/null +++ b/health/src/proto/grpc.health.v1.rs @@ -0,0 +1,48 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HealthCheckRequest { + #[prost(string, tag="1")] + pub service: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HealthCheckResponse { + #[prost(enumeration="health_check_response::ServingStatus", tag="1")] + pub status: i32, +} +/// Nested message and enum types in `HealthCheckResponse`. +pub mod health_check_response { + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum ServingStatus { + Unknown = 0, + Serving = 1, + NotServing = 2, + /// Used only by the Watch method. + ServiceUnknown = 3, + } +} +const METHOD_HEALTH_CHECK: ::grpcio::Method = ::grpcio::Method{ty: ::grpcio::MethodType::Unary, name: "/grpc.health.v1.Health/Check", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, }; +const METHOD_HEALTH_WATCH: ::grpcio::Method = ::grpcio::Method{ty: ::grpcio::MethodType::ServerStreaming, name: "/grpc.health.v1.Health/Watch", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, }; +#[derive(Clone)] +pub struct HealthClient { client: ::grpcio::Client } +impl HealthClient { +pub fn new(channel: ::grpcio::Channel) -> Self { HealthClient { client: ::grpcio::Client::new(channel) }} +pub fn check_opt(&self, req: &HealthCheckRequest, opt: ::grpcio::CallOption) -> ::grpcio::Result { self.client.unary_call(&METHOD_HEALTH_CHECK, req, opt) } +pub fn check(&self, req: &HealthCheckRequest) -> ::grpcio::Result { self.check_opt(req, ::grpcio::CallOption::default()) } +pub fn check_async_opt(&self, req: &HealthCheckRequest, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver,> { self.client.unary_call_async(&METHOD_HEALTH_CHECK, req, opt) } +pub fn check_async(&self, req: &HealthCheckRequest) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver,> { self.check_async_opt(req, ::grpcio::CallOption::default()) } +pub fn watch_opt(&self, req: &HealthCheckRequest, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver,> { self.client.server_streaming(&METHOD_HEALTH_WATCH, req, opt) } +pub fn watch(&self, req: &HealthCheckRequest) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver,> { self.watch_opt(req, ::grpcio::CallOption::default()) } +pub fn spawn(&self, f: F) where F: ::futures::Future + Send + 'static {self.client.spawn(f)} +} +pub trait Health { +fn check(&mut self, ctx: ::grpcio::RpcContext, req: HealthCheckRequest, sink: ::grpcio::UnarySink); +fn watch(&mut self, ctx: ::grpcio::RpcContext, req: HealthCheckRequest, sink: ::grpcio::ServerStreamingSink); +} +pub fn create_health(s: S) -> ::grpcio::Service { +let mut builder = ::grpcio::ServiceBuilder::new(); +let mut instance = s.clone(); +builder = builder.add_unary_handler(&METHOD_HEALTH_CHECK, move |ctx, req, resp| instance.check(ctx, req, resp)); +let mut instance = s; +builder = builder.add_server_streaming_handler(&METHOD_HEALTH_WATCH, move |ctx, req, resp| instance.watch(ctx, req, resp)); +builder.build() +} diff --git a/health/src/proto/health.rs b/health/src/proto/health.rs new file mode 100644 index 000000000..229967061 --- /dev/null +++ b/health/src/proto/health.rs @@ -0,0 +1,514 @@ +// This file is generated by rust-protobuf 2.8.0. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![cfg_attr(rustfmt, rustfmt_skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `grpc/health/v1/health.proto` + +use protobuf::Message as Message_imported_for_functions; +use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_8_0; + +#[derive(PartialEq,Clone,Default)] +pub struct HealthCheckRequest { + // message fields + pub service: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HealthCheckRequest { + fn default() -> &'a HealthCheckRequest { + ::default_instance() + } +} + +impl HealthCheckRequest { + pub fn new() -> HealthCheckRequest { + ::std::default::Default::default() + } + + // string service = 1; + + + pub fn get_service(&self) -> &str { + &self.service + } + pub fn clear_service(&mut self) { + self.service.clear(); + } + + // Param is passed by value, moved + pub fn set_service(&mut self, v: ::std::string::String) { + self.service = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_service(&mut self) -> &mut ::std::string::String { + &mut self.service + } + + // Take field + pub fn take_service(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.service, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for HealthCheckRequest { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.service)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.service.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.service); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if !self.service.is_empty() { + os.write_string(1, &self.service)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HealthCheckRequest { + HealthCheckRequest::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "service", + |m: &HealthCheckRequest| { &m.service }, + |m: &mut HealthCheckRequest| { &mut m.service }, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "HealthCheckRequest", + fields, + file_descriptor_proto() + ) + }) + } + } + + fn default_instance() -> &'static HealthCheckRequest { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const HealthCheckRequest, + }; + unsafe { + instance.get(HealthCheckRequest::new) + } + } +} + +impl ::protobuf::Clear for HealthCheckRequest { + fn clear(&mut self) { + self.service.clear(); + self.unknown_fields.clear(); + } +} +impl ::std::fmt::Debug for HealthCheckRequest { + #[allow(unused_variables)] + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HealthCheckRequest { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +#[derive(PartialEq,Clone,Default)] +pub struct HealthCheckResponse { + // message fields + pub status: HealthCheckResponseServingStatus, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HealthCheckResponse { + fn default() -> &'a HealthCheckResponse { + ::default_instance() + } +} + +impl HealthCheckResponse { + pub fn new() -> HealthCheckResponse { + ::std::default::Default::default() + } + + // .grpc.health.v1.HealthCheckResponse.ServingStatus status = 1; + + + pub fn get_status(&self) -> HealthCheckResponseServingStatus { + self.status + } + pub fn clear_status(&mut self) { + self.status = HealthCheckResponseServingStatus::Unknown; + } + + // Param is passed by value, moved + pub fn set_status(&mut self, v: HealthCheckResponseServingStatus) { + self.status = v; + } +} + +impl ::protobuf::Message for HealthCheckResponse { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type == ::protobuf::wire_format::WireTypeVarint {self.status = is.read_enum()?;} else {return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));} + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.status != HealthCheckResponseServingStatus::Unknown { + my_size += ::protobuf::rt::enum_size(1, self.status); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if self.status != HealthCheckResponseServingStatus::Unknown { + os.write_enum(1, self.status.value())?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HealthCheckResponse { + HealthCheckResponse::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "status", + |m: &HealthCheckResponse| { &m.status }, + |m: &mut HealthCheckResponse| { &mut m.status }, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "HealthCheckResponse", + fields, + file_descriptor_proto() + ) + }) + } + } + + fn default_instance() -> &'static HealthCheckResponse { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const HealthCheckResponse, + }; + unsafe { + instance.get(HealthCheckResponse::new) + } + } +} + +impl ::protobuf::Clear for HealthCheckResponse { + fn clear(&mut self) { + self.status = HealthCheckResponseServingStatus::Unknown; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for HealthCheckResponse { + #[allow(unused_variables)] + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HealthCheckResponse { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +#[derive(Clone,PartialEq,Eq,Debug,Hash)] +pub enum HealthCheckResponseServingStatus { + Unknown = 0, + Serving = 1, + NotServing = 2, + ServiceUnknown = 3, +} + +impl ::protobuf::ProtobufEnum for HealthCheckResponseServingStatus { + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(HealthCheckResponseServingStatus::Unknown), + 1 => ::std::option::Option::Some(HealthCheckResponseServingStatus::Serving), + 2 => ::std::option::Option::Some(HealthCheckResponseServingStatus::NotServing), + 3 => ::std::option::Option::Some(HealthCheckResponseServingStatus::ServiceUnknown), + _ => ::std::option::Option::None + } + } + + fn values() -> &'static [Self] { + static values: &'static [HealthCheckResponseServingStatus] = &[ + HealthCheckResponseServingStatus::Unknown, + HealthCheckResponseServingStatus::Serving, + HealthCheckResponseServingStatus::NotServing, + HealthCheckResponseServingStatus::ServiceUnknown, + ]; + values + } + + fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::EnumDescriptor, + }; + unsafe { + descriptor.get(|| { + ::protobuf::reflect::EnumDescriptor::new("HealthCheckResponseServingStatus", file_descriptor_proto()) + }) + } + } +} + +impl ::std::marker::Copy for HealthCheckResponseServingStatus { +} + +impl ::std::default::Default for HealthCheckResponseServingStatus { + fn default() -> Self { + HealthCheckResponseServingStatus::Unknown + } +} + +impl ::protobuf::reflect::ProtobufValue for HealthCheckResponseServingStatus { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor()) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x1bgrpc/health/v1/health.proto\x12\x0egrpc.health.v1\".\n\x12HealthCh\ + eckRequest\x12\x18\n\x07service\x18\x01\x20\x01(\tR\x07service\"\xb1\x01\ + \n\x13HealthCheckResponse\x12I\n\x06status\x18\x01\x20\x01(\x0e21.grpc.h\ + ealth.v1.HealthCheckResponse.ServingStatusR\x06status\"O\n\rServingStatu\ + s\x12\x0b\n\x07UNKNOWN\x10\0\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bN\ + OT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x032\xae\x01\n\x06He\ + alth\x12P\n\x05Check\x12\".grpc.health.v1.HealthCheckRequest\x1a#.grpc.h\ + ealth.v1.HealthCheckResponse\x12R\n\x05Watch\x12\".grpc.health.v1.Health\ + CheckRequest\x1a#.grpc.health.v1.HealthCheckResponse0\x01Ba\n\x11io.grpc\ + .health.v1B\x0bHealthProtoP\x01Z,google.golang.org/grpc/health/grpc_heal\ + th_v1\xaa\x02\x0eGrpc.Health.V1J\xb5\x12\n\x06\x12\x04\x11\0>\x01\n\xc6\ + \x05\n\x01\x0c\x12\x03\x11\0\x122\xb7\x04\x20Copyright\x202015\x20The\ + \x20gRPC\x20Authors\n\n\x20Licensed\x20under\x20the\x20Apache\x20License\ + ,\x20Version\x202.0\x20(the\x20\"License\");\n\x20you\x20may\x20not\x20u\ + se\x20this\x20file\x20except\x20in\x20compliance\x20with\x20the\x20Licen\ + se.\n\x20You\x20may\x20obtain\x20a\x20copy\x20of\x20the\x20License\x20at\ + \n\n\x20\x20\x20\x20\x20http://www.apache.org/licenses/LICENSE-2.0\n\n\ + \x20Unless\x20required\x20by\x20applicable\x20law\x20or\x20agreed\x20to\ + \x20in\x20writing,\x20software\n\x20distributed\x20under\x20the\x20Licen\ + se\x20is\x20distributed\x20on\x20an\x20\"AS\x20IS\"\x20BASIS,\n\x20WITHO\ + UT\x20WARRANTIES\x20OR\x20CONDITIONS\x20OF\x20ANY\x20KIND,\x20either\x20\ + express\x20or\x20implied.\n\x20See\x20the\x20License\x20for\x20the\x20sp\ + ecific\x20language\x20governing\x20permissions\x20and\n\x20limitations\ + \x20under\x20the\x20License.\n2\x81\x01\x20The\x20canonical\x20version\ + \x20of\x20this\x20proto\x20can\x20be\x20found\x20at\n\x20https://github.\ + com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto\n\n\x08\n\ + \x01\x02\x12\x03\x13\0\x17\n\x08\n\x01\x08\x12\x03\x15\0+\n\t\n\x02\x08%\ + \x12\x03\x15\0+\n\x08\n\x01\x08\x12\x03\x16\0C\n\t\n\x02\x08\x0b\x12\x03\ + \x16\0C\n\x08\n\x01\x08\x12\x03\x17\0\"\n\t\n\x02\x08\n\x12\x03\x17\0\"\ + \n\x08\n\x01\x08\x12\x03\x18\0,\n\t\n\x02\x08\x08\x12\x03\x18\0,\n\x08\n\ + \x01\x08\x12\x03\x19\0*\n\t\n\x02\x08\x01\x12\x03\x19\0*\n\n\n\x02\x04\0\ + \x12\x04\x1b\0\x1d\x01\n\n\n\x03\x04\0\x01\x12\x03\x1b\x08\x1a\n\x0b\n\ + \x04\x04\0\x02\0\x12\x03\x1c\x02\x15\n\r\n\x05\x04\0\x02\0\x04\x12\x04\ + \x1c\x02\x1b\x1c\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x1c\x02\x08\n\x0c\n\ + \x05\x04\0\x02\0\x01\x12\x03\x1c\t\x10\n\x0c\n\x05\x04\0\x02\0\x03\x12\ + \x03\x1c\x13\x14\n\n\n\x02\x04\x01\x12\x04\x1f\0'\x01\n\n\n\x03\x04\x01\ + \x01\x12\x03\x1f\x08\x1b\n\x0c\n\x04\x04\x01\x04\0\x12\x04\x20\x02%\x03\ + \n\x0c\n\x05\x04\x01\x04\0\x01\x12\x03\x20\x07\x14\n\r\n\x06\x04\x01\x04\ + \0\x02\0\x12\x03!\x04\x10\n\x0e\n\x07\x04\x01\x04\0\x02\0\x01\x12\x03!\ + \x04\x0b\n\x0e\n\x07\x04\x01\x04\0\x02\0\x02\x12\x03!\x0e\x0f\n\r\n\x06\ + \x04\x01\x04\0\x02\x01\x12\x03\"\x04\x10\n\x0e\n\x07\x04\x01\x04\0\x02\ + \x01\x01\x12\x03\"\x04\x0b\n\x0e\n\x07\x04\x01\x04\0\x02\x01\x02\x12\x03\ + \"\x0e\x0f\n\r\n\x06\x04\x01\x04\0\x02\x02\x12\x03#\x04\x14\n\x0e\n\x07\ + \x04\x01\x04\0\x02\x02\x01\x12\x03#\x04\x0f\n\x0e\n\x07\x04\x01\x04\0\ + \x02\x02\x02\x12\x03#\x12\x13\n/\n\x06\x04\x01\x04\0\x02\x03\x12\x03$\ + \x04\x18\"\x20\x20Used\x20only\x20by\x20the\x20Watch\x20method.\n\n\x0e\ + \n\x07\x04\x01\x04\0\x02\x03\x01\x12\x03$\x04\x13\n\x0e\n\x07\x04\x01\ + \x04\0\x02\x03\x02\x12\x03$\x16\x17\n\x0b\n\x04\x04\x01\x02\0\x12\x03&\ + \x02\x1b\n\r\n\x05\x04\x01\x02\0\x04\x12\x04&\x02%\x03\n\x0c\n\x05\x04\ + \x01\x02\0\x06\x12\x03&\x02\x0f\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03&\ + \x10\x16\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03&\x19\x1a\n\n\n\x02\x06\0\ + \x12\x04)\0>\x01\n\n\n\x03\x06\0\x01\x12\x03)\x08\x0e\n^\n\x04\x06\0\x02\ + \0\x12\x03,\x02>\x1aQ\x20If\x20the\x20requested\x20service\x20is\x20unkn\ + own,\x20the\x20call\x20will\x20fail\x20with\x20status\n\x20NOT_FOUND.\n\ + \n\x0c\n\x05\x06\0\x02\0\x01\x12\x03,\x06\x0b\n\x0c\n\x05\x06\0\x02\0\ + \x02\x12\x03,\x0c\x1e\n\x0c\n\x05\x06\0\x02\0\x03\x12\x03,)<\n\xde\x06\n\ + \x04\x06\0\x02\x01\x12\x03=\x02E\x1a\xd0\x06\x20Performs\x20a\x20watch\ + \x20for\x20the\x20serving\x20status\x20of\x20the\x20requested\x20service\ + .\n\x20The\x20server\x20will\x20immediately\x20send\x20back\x20a\x20mess\ + age\x20indicating\x20the\x20current\n\x20serving\x20status.\x20\x20It\ + \x20will\x20then\x20subsequently\x20send\x20a\x20new\x20message\x20whene\ + ver\n\x20the\x20service's\x20serving\x20status\x20changes.\n\n\x20If\x20\ + the\x20requested\x20service\x20is\x20unknown\x20when\x20the\x20call\x20i\ + s\x20received,\x20the\n\x20server\x20will\x20send\x20a\x20message\x20set\ + ting\x20the\x20serving\x20status\x20to\n\x20SERVICE_UNKNOWN\x20but\x20wi\ + ll\x20*not*\x20terminate\x20the\x20call.\x20\x20If\x20at\x20some\n\x20fu\ + ture\x20point,\x20the\x20serving\x20status\x20of\x20the\x20service\x20be\ + comes\x20known,\x20the\n\x20server\x20will\x20send\x20a\x20new\x20messag\ + e\x20with\x20the\x20service's\x20serving\x20status.\n\n\x20If\x20the\x20\ + call\x20terminates\x20with\x20status\x20UNIMPLEMENTED,\x20then\x20client\ + s\n\x20should\x20assume\x20this\x20method\x20is\x20not\x20supported\x20a\ + nd\x20should\x20not\x20retry\x20the\n\x20call.\x20\x20If\x20the\x20call\ + \x20terminates\x20with\x20any\x20other\x20status\x20(including\x20OK),\n\ + \x20clients\x20should\x20retry\x20the\x20call\x20with\x20appropriate\x20\ + exponential\x20backoff.\n\n\x0c\n\x05\x06\0\x02\x01\x01\x12\x03=\x06\x0b\ + \n\x0c\n\x05\x06\0\x02\x01\x02\x12\x03=\x0c\x1e\n\x0c\n\x05\x06\0\x02\ + \x01\x06\x12\x03=)/\n\x0c\n\x05\x06\0\x02\x01\x03\x12\x03=0Cb\x06proto3\ +"; + +static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, +}; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + unsafe { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) + } +} +pub use super::health_grpc::*; diff --git a/health/src/proto/health_grpc.rs b/health/src/proto/health_grpc.rs new file mode 100644 index 000000000..ec6a8b4e4 --- /dev/null +++ b/health/src/proto/health_grpc.rs @@ -0,0 +1,140 @@ +// This file is generated. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] + +const METHOD_HEALTH_CHECK: ::grpcio::Method< + super::health::HealthCheckRequest, + super::health::HealthCheckResponse, +> = ::grpcio::Method { + ty: ::grpcio::MethodType::Unary, + name: "/grpc.health.v1.Health/Check", + req_mar: ::grpcio::Marshaller { + ser: ::grpcio::pb_ser, + de: ::grpcio::pb_de, + }, + resp_mar: ::grpcio::Marshaller { + ser: ::grpcio::pb_ser, + de: ::grpcio::pb_de, + }, +}; + +const METHOD_HEALTH_WATCH: ::grpcio::Method< + super::health::HealthCheckRequest, + super::health::HealthCheckResponse, +> = ::grpcio::Method { + ty: ::grpcio::MethodType::ServerStreaming, + name: "/grpc.health.v1.Health/Watch", + req_mar: ::grpcio::Marshaller { + ser: ::grpcio::pb_ser, + de: ::grpcio::pb_de, + }, + resp_mar: ::grpcio::Marshaller { + ser: ::grpcio::pb_ser, + de: ::grpcio::pb_de, + }, +}; + +#[derive(Clone)] +pub struct HealthClient { + client: ::grpcio::Client, +} + +impl HealthClient { + pub fn new(channel: ::grpcio::Channel) -> Self { + HealthClient { + client: ::grpcio::Client::new(channel), + } + } + + pub fn check_opt( + &self, + req: &super::health::HealthCheckRequest, + opt: ::grpcio::CallOption, + ) -> ::grpcio::Result { + self.client.unary_call(&METHOD_HEALTH_CHECK, req, opt) + } + + pub fn check( + &self, + req: &super::health::HealthCheckRequest, + ) -> ::grpcio::Result { + self.check_opt(req, ::grpcio::CallOption::default()) + } + + pub fn check_async_opt( + &self, + req: &super::health::HealthCheckRequest, + opt: ::grpcio::CallOption, + ) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver> { + self.client.unary_call_async(&METHOD_HEALTH_CHECK, req, opt) + } + + pub fn check_async( + &self, + req: &super::health::HealthCheckRequest, + ) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver> { + self.check_async_opt(req, ::grpcio::CallOption::default()) + } + + pub fn watch_opt( + &self, + req: &super::health::HealthCheckRequest, + opt: ::grpcio::CallOption, + ) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver> { + self.client.server_streaming(&METHOD_HEALTH_WATCH, req, opt) + } + + pub fn watch( + &self, + req: &super::health::HealthCheckRequest, + ) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver> { + self.watch_opt(req, ::grpcio::CallOption::default()) + } + pub fn spawn(&self, f: F) + where + F: ::futures::Future + Send + 'static, + { + self.client.spawn(f) + } +} + +pub trait Health { + fn check( + &mut self, + ctx: ::grpcio::RpcContext, + req: super::health::HealthCheckRequest, + sink: ::grpcio::UnarySink, + ); + fn watch( + &mut self, + ctx: ::grpcio::RpcContext, + req: super::health::HealthCheckRequest, + sink: ::grpcio::ServerStreamingSink, + ); +} + +pub fn create_health(s: S) -> ::grpcio::Service { + let mut builder = ::grpcio::ServiceBuilder::new(); + let mut instance = s.clone(); + builder = builder.add_unary_handler(&METHOD_HEALTH_CHECK, move |ctx, req, resp| { + instance.check(ctx, req, resp) + }); + let mut instance = s; + builder = builder.add_server_streaming_handler(&METHOD_HEALTH_WATCH, move |ctx, req, resp| { + instance.watch(ctx, req, resp) + }); + builder.build() +} diff --git a/health/src/service.rs b/health/src/service.rs new file mode 100644 index 000000000..956b0e11d --- /dev/null +++ b/health/src/service.rs @@ -0,0 +1,237 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::proto::{Health, HealthCheckRequest, HealthCheckResponse, ServingStatus}; +use futures::{FutureExt, SinkExt, Stream, StreamExt}; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, UnarySink, WriteFlags}; +use log::info; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +#[cfg(feature = "protobuf-codec")] +use protobuf::ProtobufEnum; + +const VERSION_STEP: usize = 8; +const STATUS_MASK: usize = 7; + +fn state_to_status(state: usize) -> ServingStatus { + ServingStatus::from_i32((state & STATUS_MASK) as i32).unwrap() +} + +/// Struct that stores the state of a service and wake all subscribers when there +/// is any updates. +struct StatusCast { + state: AtomicUsize, + subscribers: Mutex>, +} + +impl StatusCast { + fn new(status: ServingStatus) -> StatusCast { + StatusCast { + state: AtomicUsize::new(VERSION_STEP | (status as usize)), + subscribers: Mutex::default(), + } + } + + /// Updates the status to specified one and update version. + fn broadcast(&self, status: ServingStatus) { + let mut subscribers = self.subscribers.lock().unwrap(); + let state = self.state.load(Ordering::Relaxed); + let new_state = ((state + VERSION_STEP) & !STATUS_MASK) | (status as usize); + self.state.store(new_state, Ordering::Relaxed); + + for (_, s) in subscribers.drain() { + s.wake(); + } + } +} + +/// Struct that gets notified when service status changes. +struct StatusSubscriber { + cast: Arc, + last_state: usize, + id: u64, +} + +impl StatusSubscriber { + fn new(id: u64, cast: Arc) -> StatusSubscriber { + StatusSubscriber { + cast, + last_state: 0, + id, + } + } +} + +impl Stream for StatusSubscriber { + type Item = ServingStatus; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let s = &mut *self; + let cur_state = s.cast.state.load(Ordering::Relaxed); + if cur_state != s.last_state { + let status = state_to_status(cur_state); + s.last_state = cur_state; + return Poll::Ready(Some(status)); + } + + let mut subscribers = s.cast.subscribers.lock().unwrap(); + + let cur_state = s.cast.state.load(Ordering::Relaxed); + if cur_state != s.last_state { + let status = state_to_status(cur_state); + s.last_state = cur_state; + return Poll::Ready(Some(status)); + } + + match subscribers.entry(s.id) { + Entry::Occupied(mut e) => { + if !e.get().will_wake(cx.waker()) { + e.insert(cx.waker().clone()); + } + } + Entry::Vacant(v) => { + v.insert(cx.waker().clone()); + } + } + Poll::Pending + } +} + +impl Drop for StatusSubscriber { + fn drop(&mut self) { + let mut subscribers = self.cast.subscribers.lock().unwrap(); + subscribers.remove(&self.id); + } +} + +#[derive(Default)] +struct Inner { + id: u64, + shutdown: bool, + status: HashMap, + casts: HashMap>, +} + +/// Simple implementation for `Health` service. +#[derive(Clone, Default)] +pub struct HealthService { + inner: Arc>, +} + +impl HealthService { + /// Resets the serving status of a service or inserts a new service status. + pub fn set_serving_status(&self, service: &str, status: ServingStatus) { + let cast = { + let mut inner = self.inner.lock().unwrap(); + if inner.shutdown { + info!("health: status changing for {} to {:?} is ignored because health service is shutdown", service, status); + return; + } + + if let Some(val) = inner.status.get_mut(service) { + *val = status; + } else { + inner.status.insert(service.to_string(), status); + } + + if let Some(cast) = inner.casts.get(service) { + cast.clone() + } else { + return; + } + }; + cast.broadcast(status); + } + + /// Sets all serving status to NotServing, and configures the server to + /// ignore all future status changes. + /// + /// This changes serving status for all services. + pub fn shutdown(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.shutdown = true; + for val in inner.status.values_mut() { + *val = ServingStatus::NotServing; + } + for cast in inner.casts.values() { + cast.broadcast(ServingStatus::NotServing); + } + } +} + +#[allow(clippy::useless_conversion)] +fn build_response(status: ServingStatus) -> HealthCheckResponse { + HealthCheckResponse { + status: status.into(), + ..Default::default() + } +} + +impl Health for HealthService { + fn check( + &mut self, + ctx: RpcContext, + req: HealthCheckRequest, + sink: UnarySink, + ) { + let status = { + let inner = self.inner.lock().unwrap(); + inner.status.get(&req.service).cloned() + }; + if let Some(status) = status { + let resp = build_response(status); + ctx.spawn(sink.success(resp).map(|_| ())); + return; + } + ctx.spawn( + sink.fail(RpcStatus::new( + RpcStatusCode::NOT_FOUND, + Some("unknown service".to_owned()), + )) + .map(|_| ()), + ) + } + + fn watch( + &mut self, + ctx: RpcContext, + req: HealthCheckRequest, + mut sink: ServerStreamingSink, + ) { + let name = req.service; + let (id, v) = { + let mut inner = self.inner.lock().unwrap(); + inner.id += 1; + if let Some(c) = inner.casts.get(&name) { + (inner.id, c.clone()) + } else { + let status = match inner.status.get(&name) { + Some(s) => *s, + None => ServingStatus::ServiceUnknown, + }; + let c = Arc::new(StatusCast::new(status)); + inner.casts.insert(name.clone(), c.clone()); + (inner.id, c) + } + }; + let sub = StatusSubscriber::new(id, v); + let inner = self.inner.clone(); + ctx.spawn(async move { + let _ = sink + .send_all(&mut sub.map(|s| Ok((build_response(s), WriteFlags::default())))) + .await; + let mut inner = inner.lock().unwrap(); + if let Some(c) = inner.casts.get(&name) { + // If there is any subscriber, then cast reference count should not be 1 as + // it's referenced by all subscriber. + if Arc::strong_count(c) == 1 { + inner.casts.remove(&name); + } + } + }) + } +} diff --git a/health/tests/health_check.rs b/health/tests/health_check.rs new file mode 100644 index 000000000..0cd0b72fa --- /dev/null +++ b/health/tests/health_check.rs @@ -0,0 +1,161 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use futures::executor::block_on; +use futures::prelude::*; +use grpcio::*; +use grpcio_health::proto::*; +use grpcio_health::*; +use std::sync::*; +use std::time::Duration; + +const TEST_SERVICE: &str = "grpc.test.TestService"; + +#[track_caller] +fn assert_status(status: ServingStatus, client: &HealthClient, name: &str) { + let mut req = HealthCheckRequest::default(); + req.service = name.to_string(); + let resp = client.check(&req).unwrap(); + assert_eq!(resp.status, status.into()) +} + +#[track_caller] +fn watch(client: &HealthClient, name: &str) -> ClientSStreamReceiver { + let mut req = HealthCheckRequest::default(); + req.service = name.to_string(); + let opt = CallOption::default().timeout(Duration::from_millis(500)); + client.watch_opt(&req, opt).unwrap() +} + +#[track_caller] +fn assert_code(code: RpcStatusCode, client: &HealthClient, name: &str) { + let mut req = HealthCheckRequest::default(); + req.service = name.to_string(); + match client.check(&req) { + Err(Error::RpcFailure(s)) if s.status == code => return, + r => panic!("{} != {:?}", code, r), + } +} + +#[track_caller] +fn assert_next(status: ServingStatus, ss: &mut ClientSStreamReceiver) { + let resp = block_on(ss.next()).unwrap().unwrap(); + assert_eq!(resp.status, status.into()); +} + +fn setup() -> (Server, HealthService, HealthClient) { + let env = Arc::new(Environment::new(1)); + let service = HealthService::default(); + let health_service = create_health(service.clone()); + let mut server = ServerBuilder::new(env.clone()) + .register_service(health_service) + .bind("127.0.0.1", 0) + .build() + .unwrap(); + server.start(); + let (_, port) = server.bind_addrs().next().unwrap(); + + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let client = HealthClient::new(ch); + (server, service, client) +} + +#[test] +fn test_health_check() { + let (_server, service, client) = setup(); + + // Not exist service should return NOT_FOUND. + assert_code(RpcStatusCode::NOT_FOUND, &client, ""); + assert_code(RpcStatusCode::NOT_FOUND, &client, TEST_SERVICE); + + // Service status can be updated + service.set_serving_status("", ServingStatus::Serving); + assert_status(ServingStatus::Serving, &client, ""); + service.set_serving_status("", ServingStatus::NotServing); + assert_status(ServingStatus::NotServing, &client, ""); + service.set_serving_status("", ServingStatus::Unknown); + assert_status(ServingStatus::Unknown, &client, ""); + service.set_serving_status(TEST_SERVICE, ServingStatus::Serving); + assert_status(ServingStatus::Serving, &client, TEST_SERVICE); + assert_status(ServingStatus::Unknown, &client, ""); + + // After shutdown, further updates will be abandonded. + service.shutdown(); + service.set_serving_status(TEST_SERVICE, ServingStatus::Serving); + assert_status(ServingStatus::NotServing, &client, TEST_SERVICE); + assert_status(ServingStatus::NotServing, &client, ""); +} + +#[test] +fn test_health_watch() { + let (_server, service, client) = setup(); + + // Not existed service should return ServiceUnknown. + let mut statuses = watch(&client, ""); + assert_next(ServingStatus::ServiceUnknown, &mut statuses); + service.set_serving_status("", ServingStatus::Serving); + assert_next(ServingStatus::Serving, &mut statuses); + service.set_serving_status("", ServingStatus::NotServing); + assert_next(ServingStatus::NotServing, &mut statuses); + service.set_serving_status("", ServingStatus::Unknown); + assert_next(ServingStatus::Unknown, &mut statuses); + + // Updating other service should not notify the stream. + service.set_serving_status(TEST_SERVICE, ServingStatus::NotServing); + match block_on(statuses.next()).unwrap() { + Err(Error::RpcFailure(r)) if r.status == RpcStatusCode::DEADLINE_EXCEEDED => (), + r => panic!("unexpected status {:?}", r), + } + + // Watch should fetch init status immediately. + statuses = watch(&client, TEST_SERVICE); + assert_next(ServingStatus::NotServing, &mut statuses); + + // Only latest state can be watched. + service.set_serving_status(TEST_SERVICE, ServingStatus::Serving); + service.set_serving_status(TEST_SERVICE, ServingStatus::NotServing); + service.set_serving_status(TEST_SERVICE, ServingStatus::ServiceUnknown); + service.set_serving_status(TEST_SERVICE, ServingStatus::Unknown); + let mut seen = 0; + loop { + let resp = block_on(statuses.next()).unwrap().unwrap(); + if resp.status != ServingStatus::Unknown.into() { + seen += 1; + continue; + } + break; + } + assert!(seen <= 1); +} + +#[test] +fn test_health_watch_multiple() { + let (_server, service, client) = setup(); + + // Watch should fetch service status immediately. + let mut statuses0 = vec![watch(&client, "")]; + assert_next(ServingStatus::ServiceUnknown, &mut statuses0[0]); + + service.set_serving_status("", ServingStatus::Serving); + statuses0.push(watch(&client, "")); + for s in &mut statuses0 { + assert_next(ServingStatus::Serving, s); + } + + service.set_serving_status("", ServingStatus::NotServing); + statuses0.push(watch(&client, "")); + for s in &mut statuses0 { + assert_next(ServingStatus::NotServing, s); + } + + // Multiple watchers for multiple service should work correctly. + let mut statuses1 = vec![watch(&client, TEST_SERVICE)]; + assert_next(ServingStatus::ServiceUnknown, &mut statuses1[0]); + service.set_serving_status(TEST_SERVICE, ServingStatus::NotServing); + service.set_serving_status("", ServingStatus::Serving); + for s in &mut statuses0 { + assert_next(ServingStatus::Serving, s); + } + for s in &mut statuses1 { + assert_next(ServingStatus::NotServing, s); + } +} diff --git a/tests-and-examples/Cargo.toml b/tests-and-examples/Cargo.toml index 456a20b05..2fda2a1d8 100644 --- a/tests-and-examples/Cargo.toml +++ b/tests-and-examples/Cargo.toml @@ -7,8 +7,8 @@ publish = false [features] default = ["protobuf-codec"] -protobuf-codec = ["protobuf", "grpcio/protobuf-codec", "grpcio-proto/protobuf-codec"] -prost-codec = ["prost", "bytes", "grpcio/prost-codec", "grpcio-proto/prost-codec"] +protobuf-codec = ["protobuf", "grpcio/protobuf-codec", "grpcio-proto/protobuf-codec", "grpcio-health/protobuf-codec"] +prost-codec = ["prost", "bytes", "grpcio/prost-codec", "grpcio-proto/prost-codec", "grpcio-health/prost-codec"] [dependencies] grpcio-sys = { path = "../grpc-sys", version = "0.8" } @@ -20,6 +20,7 @@ prost = { version = "0.7", optional = true } bytes = { version = "1.0", optional = true } log = "0.4" grpcio = { path = "..", version = "0.8", default-features = false, features = ["secure"] } +grpcio-health = { path = "../health", version = "0.8", default-features = false } [dev-dependencies] serde_json = "1.0" diff --git a/tests-and-examples/tests/cases/health_check.rs b/tests-and-examples/tests/cases/health_check.rs deleted file mode 100644 index dc9031bfc..000000000 --- a/tests-and-examples/tests/cases/health_check.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - -use futures::prelude::*; -use grpcio::*; -use grpcio_proto::health::v1::health::*; -use std::collections::*; -use std::sync::*; - -type StatusRegistry = HashMap; - -#[derive(Clone)] -struct HealthService { - status: Arc>, -} - -impl Health for HealthService { - fn check( - &mut self, - ctx: RpcContext<'_>, - req: HealthCheckRequest, - sink: UnarySink, - ) { - let status = self.status.read().unwrap(); - let res = match status.get(req.get_service()) { - None => sink.fail(RpcStatus::new(RpcStatusCode::NOT_FOUND, None)), - Some(s) => { - let mut resp = HealthCheckResponse::default(); - resp.set_status(*s); - sink.success(resp) - } - }; - ctx.spawn( - res.map_err(|e| println!("failed to report result: {:?}", e)) - .map(|_| ()), - ); - } -} - -fn check_health( - client: &HealthClient, - status: &Arc>, - service: &str, - exp: HealthCheckResponse_ServingStatus, -) { - status.write().unwrap().insert(service.to_owned(), exp); - let mut req = HealthCheckRequest::default(); - req.set_service(service.to_owned()); - let status = client.check(&req).unwrap().get_status(); - assert_eq!(status, exp); -} - -#[test] -fn test_health_check() { - let env = Arc::new(Environment::new(1)); - let status: Arc> = Arc::default(); - let service = create_health(HealthService { - status: status.clone(), - }); - let mut server = ServerBuilder::new(env.clone()) - .register_service(service) - .bind("127.0.0.1", 0) - .build() - .unwrap(); - server.start(); - let (_, port) = server.bind_addrs().next().unwrap(); - - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); - let client = HealthClient::new(ch); - - check_health( - &client, - &status, - "test", - HealthCheckResponse_ServingStatus::SERVING, - ); - check_health( - &client, - &status, - "test", - HealthCheckResponse_ServingStatus::NOT_SERVING, - ); - check_health( - &client, - &status, - "test", - HealthCheckResponse_ServingStatus::UNKNOWN, - ); - - let mut req = HealthCheckRequest::default(); - req.set_service("not-exist".to_owned()); - let err = client.check(&req).unwrap_err(); - match err { - Error::RpcFailure(s) => assert_eq!(s.status, RpcStatusCode::NOT_FOUND), - e => panic!("unexpected error: {:?}", e), - } -} diff --git a/tests-and-examples/tests/cases/mod.rs b/tests-and-examples/tests/cases/mod.rs index ec3b375d3..fa07d202a 100644 --- a/tests-and-examples/tests/cases/mod.rs +++ b/tests-and-examples/tests/cases/mod.rs @@ -3,7 +3,6 @@ mod auth_context; mod cancel; mod credential; -mod health_check; mod kick; mod metadata; mod misc;