From fc3cd8151cdb87a1a9ab14561ccd099571347f9a Mon Sep 17 00:00:00 2001 From: qinzuoyan Date: Wed, 19 Nov 2014 00:05:27 +0800 Subject: [PATCH] add multi_server_sample --- sample/multi_server_sample/Makefile | 76 +++++++ sample/multi_server_sample/README | 23 ++ sample/multi_server_sample/address_list.txt | 3 + sample/multi_server_sample/client.cc | 197 ++++++++++++++++++ sample/multi_server_sample/echo_service.proto | 16 ++ sample/multi_server_sample/server.cc | 89 ++++++++ 6 files changed, 404 insertions(+) create mode 100644 sample/multi_server_sample/Makefile create mode 100644 sample/multi_server_sample/README create mode 100644 sample/multi_server_sample/address_list.txt create mode 100644 sample/multi_server_sample/client.cc create mode 100644 sample/multi_server_sample/echo_service.proto create mode 100644 sample/multi_server_sample/server.cc diff --git a/sample/multi_server_sample/Makefile b/sample/multi_server_sample/Makefile new file mode 100644 index 0000000..5421172 --- /dev/null +++ b/sample/multi_server_sample/Makefile @@ -0,0 +1,76 @@ +# Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. See the AUTHORS file for names of contributors. + +#----------------------------------------------- +## Sofa-pbrpc path containing `include'and `lib'. +## +## Check file exist: +## $(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h +## $(SOFA_PBRPC)/lib/libsofa-pbrpc.a +## +SOFA_PBRPC=../../output +#----------------------------------------------- + +#----------------------------------------------- +# Uncomment exactly one of the lines labelled (A), (B), and (C) below +# to switch between compilation modes. +# +OPT ?= -O2 # (A) Production use (optimized mode) +# OPT ?= -g2 # (B) Debug mode, w/ full line-level debugging symbols +# OPT ?= -O2 -g2 # (C) Profiling mode: opt, but w/debugging symbols +#----------------------------------------------- + +#----------------------------------------------- +# !!! Do not change the following lines !!! +#----------------------------------------------- + +include ../../depends.mk + +CXX=g++ +INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(BOOST_HEADER_DIR) -I$(PROTOBUF_DIR)/include \ + -I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include +CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH) + +LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a +LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lrt -lz + +PROTO_SRC=echo_service.proto +PROTO_OBJ=$(patsubst %.proto,%.pb.o,$(PROTO_SRC)) +PROTO_OPTIONS=--proto_path=. --proto_path=$(SOFA_PBRPC)/include --proto_path=$(PROTOBUF_DIR)/include + +BIN=server client + +all: check_depends $(BIN) + +.PHONY: check_depends clean + +check_depends: + @if [ ! -f "$(PROTOBUF_DIR)/include/google/protobuf/message.h" ]; then echo "ERROR: need protobuf header"; exit 1; fi + @if [ ! -f "$(PROTOBUF_DIR)/lib/libprotobuf.a" ]; then echo "ERROR: need protobuf lib"; exit 1; fi + @if [ ! -f "$(PROTOBUF_DIR)/bin/protoc" ]; then echo "ERROR: need protoc binary"; exit 1; fi + @if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi + @if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi + @if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi + @if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi + +clean: + @rm -f $(BIN) *.o *.pb.* + +rebuild: clean all + +server: $(PROTO_OBJ) server.o + $(CXX) $^ -o $@ $(LIBRARY) $(LDFLAGS) + +client: $(PROTO_OBJ) client.o + $(CXX) $^ -o $@ $(LIBRARY) $(LDFLAGS) + +%.pb.o: %.pb.cc + $(CXX) $(CXXFLAGS) -c $< -o $@ + +%.pb.cc: %.proto + $(PROTOBUF_DIR)/bin/protoc $(PROTO_OPTIONS) --cpp_out=. $< + +%.o: %.cc $(PROTO_OBJ) + $(CXX) $(CXXFLAGS) -c $< -o $@ + diff --git a/sample/multi_server_sample/README b/sample/multi_server_sample/README new file mode 100644 index 0000000..cb8a629 --- /dev/null +++ b/sample/multi_server_sample/README @@ -0,0 +1,23 @@ +1, make + +2, start servers: + ./server 127.0.0.1 12345 &>/dev/null & + ./server 127.0.0.1 12346 &>/dev/null & + ./server 127.0.0.1 12347 &>/dev/null & + +3, start client: + ./client address_list.txt + +4, remove one address from `address_list.txt'. + +5, signal client to reload address list from `address_list.txt': + killall -s SIGTERM client + +6, add a new address into `address_list.txt'. + +7, signal client to reload address list from `address_list.txt': + killall -s SIGTERM client + +8, test done, stop all servers: + killall server + diff --git a/sample/multi_server_sample/address_list.txt b/sample/multi_server_sample/address_list.txt new file mode 100644 index 0000000..d98cd91 --- /dev/null +++ b/sample/multi_server_sample/address_list.txt @@ -0,0 +1,3 @@ +127.0.0.1:12345 +127.0.0.1:12346 +127.0.0.1:12347 diff --git a/sample/multi_server_sample/client.cc b/sample/multi_server_sample/client.cc new file mode 100644 index 0000000..00f601b --- /dev/null +++ b/sample/multi_server_sample/client.cc @@ -0,0 +1,197 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: qinzuoyan01@baidu.com (Qin Zuoyan) + +#include +#include +#include +#include +#include +#include + +#include +#include "echo_service.pb.h" + +class ReloadableAddressProvider : public sofa::pbrpc::RpcChannel::AddressProvider +{ +public: + typedef std::vector EventHandlerVector; +public: + ReloadableAddressProvider(const std::string& addr_file) : _addr_file(addr_file) { + Reload(); + } + + virtual ~ReloadableAddressProvider() { + sofa::pbrpc::ScopedLocker _(_lock); + for (EventHandlerVector::iterator it = _handler_list.begin(); + it != _handler_list.end(); ++it) { + delete *it; + } + } + + virtual void GetInitAddress(std::vector* address_list) { + sofa::pbrpc::ScopedLocker _(_lock); + address_list->assign(_addr_set.begin(), _addr_set.end()); + } + + virtual bool RegisterEventHandler(sofa::pbrpc::RpcChannel::EventHandler* event_handler) { + sofa::pbrpc::ScopedLocker _(_lock); + _handler_list.push_back(event_handler); + return true; + } + + void Reload() { + sofa::pbrpc::ScopedLocker _(_lock); + SLOG(NOTICE, "start reloading address list from file \"%s\"", _addr_file.c_str()); + // open file + std::ifstream ifs(_addr_file.c_str(), std::ifstream::in); + if (!ifs.good()) { + SLOG(ERROR, "open address list file \"%s\" fail", _addr_file.c_str()); + return; + } + // read new addresses + std::set new_addr_set; + std::string addr; + while (std::getline(ifs, addr)) { + if (!addr.empty()) { + new_addr_set.insert(addr); + } + } + // make diff + std::vector add_list; + std::vector remove_list; + std::set::iterator old_it = _addr_set.begin(); + std::set::iterator old_end = _addr_set.end(); + std::set::iterator new_it = new_addr_set.begin(); + std::set::iterator new_end = new_addr_set.end(); + while (old_it != old_end && new_it != new_end) { + if (*old_it == *new_it) { + // keep + ++old_it; + ++new_it; + } + else if (*old_it < *new_it) { + // remove + remove_list.push_back(*old_it); + ++old_it; + } + else { + // add + add_list.push_back(*new_it); + ++new_it; + } + } + if (old_it != old_end) { + remove_list.insert(remove_list.end(), old_it, old_end); + } + if (new_it != new_end) { + add_list.insert(add_list.end(), new_it, new_end); + } + // notice handler + if (!add_list.empty() || !remove_list.empty()) { + for (EventHandlerVector::iterator it = _handler_list.begin(); + it != _handler_list.end(); ++it) { + if (!add_list.empty()) { + (*it)->OnAddressAdded(add_list); + } + if (!remove_list.empty()) { + (*it)->OnAddressRemoved(remove_list); + } + } + } + // update _addr_set + _addr_set = new_addr_set; + } + +private: + std::string _addr_file; + sofa::pbrpc::MutexLock _lock; + std::set _addr_set; + EventHandlerVector _handler_list; +}; + +static ReloadableAddressProvider* g_address_provider; + +static void sigcatcher(int sig) +{ + SLOG(NOTICE, "signal catched: %d", sig); + if (g_address_provider) { + g_address_provider->Reload(); + } +} + +int main(int argc, char** argv) +{ + SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); + + if (argc < 2) { + fprintf(stderr, "USAGE: %s \n", argv[0]); + return EXIT_FAILURE; + } + + std::string addr_file = argv[1]; + g_address_provider = new ReloadableAddressProvider(addr_file); + + signal(SIGTERM, &sigcatcher); + + // Define an rpc client. + sofa::pbrpc::RpcClientOptions client_options; + sofa::pbrpc::RpcClient* rpc_client = new sofa::pbrpc::RpcClient(client_options); + + // Define an rpc channel. + sofa::pbrpc::RpcChannelOptions channel_options; + sofa::pbrpc::RpcChannel* rpc_channel = + new sofa::pbrpc::RpcChannel(rpc_client, g_address_provider, channel_options); + + // Define an rpc stub. + sofa::pbrpc::test::EchoServer_Stub* stub = + new sofa::pbrpc::test::EchoServer_Stub(rpc_channel); + + while (true) { + // Prepare parameters. + sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController(); + cntl->SetTimeout(3000); + sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest(); + request->set_message("Hello from qinzuoyan01"); + sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse(); + + // Sync call. + stub->Echo(cntl, request, response, NULL); + + // Check if the request has been sent. + // If has been sent, then can get the sent bytes. + SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str()); + SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false"); + if (cntl->IsRequestSent()) { + SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str()); + SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes()); + } + + // Check if failed. + if (cntl->Failed()) { + SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str()); + } + else { + SLOG(NOTICE, "request succeed: %s", response->message().c_str()); + } + + // Destroy objects. + delete cntl; + delete request; + delete response; + + sleep(1); + } + + delete stub; + delete rpc_channel; + delete rpc_client; + delete g_address_provider; + g_address_provider = NULL; + + return EXIT_SUCCESS; +} + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/sample/multi_server_sample/echo_service.proto b/sample/multi_server_sample/echo_service.proto new file mode 100644 index 0000000..637ac32 --- /dev/null +++ b/sample/multi_server_sample/echo_service.proto @@ -0,0 +1,16 @@ +package sofa.pbrpc.test; + +option cc_generic_services = true; +option java_generic_services = true; + +message EchoRequest { + required string message = 1; +} + +message EchoResponse { + required string message = 1; +} + +service EchoServer { + rpc Echo(EchoRequest) returns(EchoResponse); +} diff --git a/sample/multi_server_sample/server.cc b/sample/multi_server_sample/server.cc new file mode 100644 index 0000000..d502f6a --- /dev/null +++ b/sample/multi_server_sample/server.cc @@ -0,0 +1,89 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: qinzuoyan01@baidu.com (Qin Zuoyan) + +#include +#include +#include +#include +#include "echo_service.pb.h" + +class EchoServerImpl : public sofa::pbrpc::test::EchoServer +{ +public: + EchoServerImpl() {} + virtual ~EchoServerImpl() {} + +private: + virtual void Echo(google::protobuf::RpcController* controller, + const sofa::pbrpc::test::EchoRequest* request, + sofa::pbrpc::test::EchoResponse* response, + google::protobuf::Closure* done) + { + SLOG(NOTICE, "Echo(): request message from %s: %s", + static_cast(controller)->RemoteAddress().c_str(), + request->message().c_str()); + response->set_message("echo message: " + request->message()); + done->Run(); + } +}; + +bool thread_init_func() +{ + sleep(1); + SLOG(INFO, "Init work thread succeed"); + return true; +} + +void thread_dest_func() +{ + SLOG(INFO, "Destroy work thread succeed"); +} + +int main(int argc, char** argv) +{ + SOFA_PBRPC_SET_LOG_LEVEL(NOTICE); + + if (argc < 3) { + fprintf(stderr, "USAGE: %s \n", argv[0]); + return EXIT_FAILURE; + } + + std::string addr = argv[1] + std::string(":") + argv[2]; + + // Define an rpc server. + sofa::pbrpc::RpcServerOptions options; + options.work_thread_init_func = sofa::pbrpc::NewPermanentExtClosure(&thread_init_func); + options.work_thread_dest_func = sofa::pbrpc::NewPermanentExtClosure(&thread_dest_func); + sofa::pbrpc::RpcServer rpc_server(options); + + // Start rpc server. + if (!rpc_server.Start(addr)) { + SLOG(ERROR, "start server failed"); + return EXIT_FAILURE; + } + + // Register service. + sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl(); + if (!rpc_server.RegisterService(echo_service)) { + SLOG(ERROR, "export service failed"); + return EXIT_FAILURE; + } + + // Wait signal. + rpc_server.Run(); + + // Stop rpc server. + rpc_server.Stop(); + + // Delete closures. + // Attention: should delete the closures after server stopped, or may be crash. + delete options.work_thread_init_func; + delete options.work_thread_dest_func; + + return EXIT_SUCCESS; +} + +/* vim: set ts=4 sw=4 sts=4 tw=100 */