Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sofa-pbrpc cookie #81

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ OPT ?= -O2 # (A) Production use (optimized mode)
include depends.mk

LIB=libsofa-pbrpc.a
LIB_FULL=libsofa-pbrpc-full.a
LIB_SRC=$(wildcard src/sofa/pbrpc/*.cc)
PLUGIN_SRC=$(wildcard src/sofa/pbrpc/plugin/*/*.cc)
LIB_OBJ=$(patsubst %.cc,%.o,$(LIB_SRC))
PLUGIN_OBJ=$(patsubst %.cc,%.o,$(PLUGIN_SRC))
PROTO=$(wildcard src/sofa/pbrpc/*.proto)
PROTO_SRC=$(patsubst %.proto,%.pb.cc,$(PROTO))
PROTO_HEADER=$(patsubst %.proto,%.pb.h,$(PROTO))
Expand All @@ -45,6 +48,7 @@ PUB_INC=src/sofa/pbrpc/pbrpc.h src/sofa/pbrpc/closure_helper.h src/sofa/pbrpc/cl
src/sofa/pbrpc/fast_lock.h src/sofa/pbrpc/rw_lock.h src/sofa/pbrpc/scoped_locker.h \
src/sofa/pbrpc/condition_variable.h src/sofa/pbrpc/wait_event.h src/sofa/pbrpc/http.h \
src/sofa/pbrpc/buffer.h src/sofa/pbrpc/buf_handle.h src/sofa/pbrpc/profiling_linker.h \
src/sofa/pbrpc/rpc_attachment.h \
$(PROTO) $(PROTO_HEADER)

#-----------------------------------------------
Expand Down Expand Up @@ -84,7 +88,7 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi

clean:
rm -f $(LIB) $(BIN) $(LIB_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)
rm -f $(LIB) $(LIB_FULL) $(BIN) $(LIB_OBJ) $(PLUGIN_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)

rebuild: clean all

Expand All @@ -95,6 +99,9 @@ $(LIB_OBJ): $(PROTO_HEADER)
$(LIB): $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(LIB_OBJ) $(PROTO_OBJ)

$(LIB_FULL): $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)

$(BIN): $(LIB) $(BIN_OBJ)
$(CXX) $(BIN_OBJ) -o $@ $(LIB) $(LDFLAGS)

Expand All @@ -104,19 +111,22 @@ $(BIN): $(LIB) $(BIN_OBJ)
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@

build: $(LIB) $(BIN)
build: $(LIB) $(LIB_FULL) $(BIN)
@echo
@echo 'Build succeed, run "make install" to install sofa-pbrpc to "'$(PREFIX)'".'

install: $(LIB) $(BIN)
install: $(LIB) $(LIB_FULL) $(BIN)
mkdir -p $(PREFIX)/include/sofa/pbrpc
cp -r $(PUB_INC) $(TARGET_DIRECTORY) $(PREFIX)/include/sofa/pbrpc/
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr
cp src/sofa/pbrpc/smart_ptr/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
cp src/sofa/pbrpc/smart_ptr/detail/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
mkdir -p $(PREFIX)/include/sofa/pbrpc/plugin/cookie
cp src/sofa/pbrpc/plugin/cookie/*.h $(PREFIX)/include/sofa/pbrpc/plugin/cookie
mkdir -p $(PREFIX)/lib
cp $(LIB) $(PREFIX)/lib/
cp $(LIB_FULL) $(PREFIX)/lib/
mkdir -p $(PREFIX)/bin
cp $(BIN) $(PREFIX)/bin/
@echo
Expand Down
4 changes: 2 additions & 2 deletions sample/echo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(PROTOBUF_DIR)/include \
-I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include
CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH)

LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lz

UNAME_S := $(shell uname -s)
Expand All @@ -57,7 +57,7 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a" ]; then echo "ERROR: need sofa-pbrpc-full lib"; exit 1; fi

clean:
@rm -f $(BIN) *.o *.pb.*
Expand Down
21 changes: 20 additions & 1 deletion sample/echo/client_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;
sofa::pbrpc::RpcCookieManager g_cookie_manager;

void EchoCallback(sofa::pbrpc::RpcController* cntl,
sofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
Expand All @@ -24,8 +28,17 @@ void EchoCallback(sofa::pbrpc::RpcController* cntl,
if (cntl->Failed()) {
SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
}
else {
else
{
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

delete cntl;
Expand All @@ -46,9 +59,15 @@ int main()
sofa::pbrpc::RpcChannelOptions channel_options;
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321", channel_options);

RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
cookie->Load();
cookie->Set("type", "async");
cookie->Set("logid", "123456");

// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
Expand Down
17 changes: 17 additions & 0 deletions sample/echo/client_sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
// Author: [email protected] (Qin Zuoyan)

#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;
sofa::pbrpc::RpcCookieManager g_cookie_manager;

// Using global RpcClient object can help share resources such as threads and buffers.
sofa::pbrpc::RpcClient g_rpc_client;

Expand All @@ -21,6 +25,11 @@ int main()
// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
cookie->Load();
cookie->Set("type", "sync");
cookie->Set("logid", "123456");
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request =
new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
Expand Down Expand Up @@ -50,6 +59,14 @@ int main()
else
{
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
cookie.reset(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

// Destroy objects.
Expand Down
14 changes: 14 additions & 0 deletions sample/echo/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <signal.h>
#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;

bool WebServlet(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response)
{
SLOG(INFO, "WebServlet(): request message from %s:%u",
Expand Down Expand Up @@ -53,6 +56,17 @@ class EchoServerImpl : public sofa::pbrpc::test::EchoServer
SLOG(INFO, "Header[\"%s\"]=\"%s\"", it->first.c_str(), it->second.c_str());
}
}
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie());
if (cntl->GetRequestAttachment(cookie.get()))
{
std::string type;
std::string logid;
cookie->Get("type", type);
cookie->Get("logid", logid);
SLOG(INFO, "cookie info : type=%s, logid=%s", type.c_str(), logid.c_str());
}
cookie->Set("version", "1.00");
cntl->SetResponseAttachment(cookie.get());
response->set_message("echo message: " + request->message());
done->Run();
}
Expand Down
48 changes: 45 additions & 3 deletions src/sofa/pbrpc/binary_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,48 @@ void BinaryRpcRequest::ProcessRequest(
bool parse_request_return = false;
if (compress_type == CompressTypeNone)
{
parse_request_return = request->ParseFromZeroCopyStream(_req_body.get());
parse_request_return =
request->ParseFromBoundedZeroCopyStream(_req_body.get(), _req_header.data_size);
}
else
{
ReadBufferPtr read_buffer(new ReadBuffer());
int bytes_read = 0;
while (bytes_read < _req_header.data_size)
{
const char* read_pos = NULL;
int cur_size;
int bytes_remain = _req_header.data_size - bytes_read;
int handle_offset = _req_body->CurrentHandleOffset();
if (!_req_body->Next(reinterpret_cast<const void**>(&read_pos), &cur_size))
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << _req_meta.sequence_id() << "}"
<< ": bad request buffer";
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: bad request buffer",
RpcEndpointToString(_remote_endpoint).c_str(),
_req_meta.sequence_id());
#endif
SendFailedResponse(stream, RPC_ERROR_PARSE_REQUEST_MESSAGE, "bad request buffer");
return;
}
char* handle_data = const_cast<char*>(read_pos) - handle_offset;
if (bytes_remain >= cur_size)
{
read_buffer->Append(BufHandle(handle_data, cur_size, handle_offset));
bytes_read += cur_size;
}
else
{
_req_body->BackUp(cur_size - bytes_remain);
read_buffer->Append(BufHandle(handle_data, bytes_remain, handle_offset));
bytes_read += bytes_remain;
}
}
sofa::pbrpc::scoped_ptr<AbstractCompressedInputStream> is(
get_compressed_input_stream(_req_body.get(), compress_type));
get_compressed_input_stream(read_buffer.get(), compress_type));
parse_request_return = request->ParseFromZeroCopyStream(is.get());
}
if (!parse_request_return)
Expand Down Expand Up @@ -123,6 +159,9 @@ void BinaryRpcRequest::ProcessRequest(
cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ?
_req_meta.expected_response_compress_type() : CompressTypeNone);

cntl->SetRequestSize(_req_header.data_size);
cntl->SetRequestAttachBuffer(_req_body);

CallMethod(method_board, controller, request, response);
}

Expand Down Expand Up @@ -170,11 +209,14 @@ ReadBufferPtr BinaryRpcRequest::AssembleSucceedResponse(
return ReadBufferPtr();
}
header.data_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size;
header.message_size = header.meta_size + header.data_size;
ReadBufferPtr response_attach_buffer = cntl->GetResponseAttachBuffer();
header.message_size = header.meta_size + header.data_size + response_attach_buffer->TotalCount();

write_buffer.SetData(header_pos, reinterpret_cast<const char*>(&header), header_size);

ReadBufferPtr read_buffer(new ReadBuffer());
write_buffer.SwapOut(read_buffer.get());
read_buffer->Append(response_attach_buffer.get());

return read_buffer;
}
Expand Down
5 changes: 5 additions & 0 deletions src/sofa/pbrpc/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ bool ReadBuffer::Next(const void** data, int* size)
}
}

int ReadBuffer::CurrentHandleOffset()
{
return _cur_it->offset + _cur_pos;
}

// BackUp() can only be called after a successful Next().
// "count" should be greater than or equal to 0.
void ReadBuffer::BackUp(int count)
Expand Down
4 changes: 4 additions & 0 deletions src/sofa/pbrpc/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class ReadBuffer : public google::protobuf::io::ZeroCopyInputStream

// implements ZeroCopyInputStream ----------------------------------
bool Next(const void** data, int* size);

// Get the offset of current buffer handle.
int CurrentHandleOffset();

void BackUp(int count);
bool Skip(int count);
int64 ByteCount() const;
Expand Down
Loading