Skip to content

Commit

Permalink
update codes
Browse files Browse the repository at this point in the history
  • Loading branch information
holmes1412 committed Nov 16, 2021
1 parent 1a7adc6 commit c9194e8
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
8 changes: 5 additions & 3 deletions benchmark/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using namespace srpc;

#define TEST_SECOND 20
#define GET_CURRENT_MS std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
#define GET_CURRENT_NS std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()

std::atomic<int> query_count(0);
Expand Down Expand Up @@ -40,7 +39,8 @@ static void do_echo_pb(CLIENT *client)
}
else
{
printf("status[%d] error[%d] errmsg:%s\n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
printf("status[%d] error[%d] errmsg:%s\n",
ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
++error_count;
}

Expand Down Expand Up @@ -69,7 +69,8 @@ static void do_echo_thrift(CLIENT *client)
}
else
{
printf("status[%d] error[%d] errmsg:%s \n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
printf("status[%d] error[%d] errmsg:%s \n",
ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
++error_count;
}

Expand Down Expand Up @@ -108,6 +109,7 @@ int main(int argc, char* argv[])
int REQUEST_BYTES = atoi(argv[6]);

request_msg.resize(REQUEST_BYTES, 'r');

//for (int i = 0; i < REQUEST_BYTES; i++)
// request_msg[i] = (unsigned char)(rand() % 256);

Expand Down
9 changes: 5 additions & 4 deletions src/module/rpc_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace srpc
{

bool RPCModule::client_task_begin(SubTask *task, RPCModuleData& data)
bool RPCModule::client_task_begin(SubTask *task, const RPCModuleData& data)
{
bool ret = this->client_begin(task, data);

Expand All @@ -32,7 +32,7 @@ bool RPCModule::client_task_begin(SubTask *task, RPCModuleData& data)
return ret;
}

bool RPCModule::server_task_begin(SubTask *task, RPCModuleData& data)
bool RPCModule::server_task_begin(SubTask *task, const RPCModuleData& data)
{
bool ret = this->server_begin(task, data);

Expand Down Expand Up @@ -107,9 +107,10 @@ bool SnowFlake::get_id(long long group_id, long long machine_id, long long *uid)
{
if (++this->sequence > this->sequence_max)
return false; // too many sequence_id in one millie second
} else {
this->sequence = 0L;
}
else
this->sequence = 0L;

seq_id = this->sequence++;

this->last_timestamp = timestamp;
Expand Down
10 changes: 5 additions & 5 deletions src/module/rpc_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ static constexpr char const *SRPC_SPAN_MESSAGE = "message";
class RPCModule
{
protected:
virtual bool client_begin(SubTask *task, RPCModuleData& data) = 0;
virtual bool server_begin(SubTask *task, RPCModuleData& data) = 0;
virtual bool client_begin(SubTask *task, const RPCModuleData& data) = 0;
virtual bool server_begin(SubTask *task, const RPCModuleData& data) = 0;
virtual bool client_end(SubTask *task, RPCModuleData& data) = 0;
virtual bool server_end(SubTask *task, RPCModuleData& data) = 0;

Expand All @@ -49,13 +49,13 @@ class RPCModule

size_t get_filters_size() const { return this->filters.size(); }
RPCModuleType get_module_type() const { return this->module_type; }
RPCModule(RPCModuleType module_type) { this->module_type = module_type; }

bool client_task_begin(SubTask *task, RPCModuleData& data);
bool server_task_begin(SubTask *task, RPCModuleData& data);
bool client_task_begin(SubTask *task, const RPCModuleData& data);
bool server_task_begin(SubTask *task, const RPCModuleData& data);
bool client_task_end(SubTask *task, RPCModuleData& data);
bool server_task_end(SubTask *task, RPCModuleData& data);

RPCModule(RPCModuleType module_type) { this->module_type = module_type; }
virtual ~RPCModule() {}

private:
Expand Down
16 changes: 8 additions & 8 deletions src/module/rpc_module_span.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ class RPCSpanModule : public RPCModule
using SPAN_CONTEXT = RPCSpanContext<typename RPCTYPE::REQ,
typename RPCTYPE::RESP>;

bool client_begin(SubTask *task, RPCModuleData& data) override;
bool client_begin(SubTask *task, const RPCModuleData& data) override;
bool client_end(SubTask *task, RPCModuleData& data) override;
bool server_begin(SubTask *task, RPCModuleData& data) override;
bool server_begin(SubTask *task, const RPCModuleData& data) override;
bool server_end(SubTask *task, RPCModuleData& data) override;

public:
Expand All @@ -105,15 +105,15 @@ template<class RPCTYPE>
class RPCMonitorModule : public RPCModule
{
public:
bool client_begin(SubTask *task, RPCModuleData& data) override
bool client_begin(SubTask *task, const RPCModuleData& data) override
{
return true;
}
bool client_end(SubTask *task, RPCModuleData& data) override
{
return true;
}
bool server_begin(SubTask *task, RPCModuleData& data) override
bool server_begin(SubTask *task, const RPCModuleData& data) override
{
return true;
}
Expand All @@ -130,15 +130,15 @@ template<class RPCTYPE>
class RPCEmptyModule : public RPCModule
{
public:
bool client_begin(SubTask *task, RPCModuleData& data) override
bool client_begin(SubTask *task, const RPCModuleData& data) override
{
return true;
}
bool client_end(SubTask *task, RPCModuleData& data) override
{
return true;
}
bool server_begin(SubTask *task, RPCModuleData& data) override
bool server_begin(SubTask *task, const RPCModuleData& data) override
{
return true;
}
Expand All @@ -155,7 +155,7 @@ class RPCEmptyModule : public RPCModule

template<class RPCTYPE>
bool RPCSpanModule<RPCTYPE>::client_begin(SubTask *task,
RPCModuleData& data)
const RPCModuleData& data)
{
auto *client_task = static_cast<CLIENT_TASK *>(task);
auto *req = client_task->get_req();
Expand Down Expand Up @@ -224,7 +224,7 @@ bool RPCSpanModule<RPCTYPE>::client_end(SubTask *task,

template<class RPCTYPE>
bool RPCSpanModule<RPCTYPE>::server_begin(SubTask *task,
RPCModuleData& data)
const RPCModuleData& data)
{
std::string ip;
unsigned short port;
Expand Down
1 change: 0 additions & 1 deletion src/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class RPCServer : public WFServer<typename RPCTYPE::REQ,
RPCServer();
RPCServer(const struct RPCServerParams *params);


int add_service(RPCService *service);
const RPCService* find_service(const std::string& name) const;
void add_filter(RPCFilter *filter);
Expand Down
23 changes: 11 additions & 12 deletions src/rpc_task.inl
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public:
protected:
using user_done_t = std::function<int (int, RPCWorker&)>;

// using WFComplexClientTask<RPCREQ, RPCRESP>::get_req;
// using WFComplexClientTask<RPCREQ, RPCRESP>::get_resp;
using WFComplexClientTask<RPCREQ, RPCRESP>::set_callback;

void init_failed() override;
Expand All @@ -138,7 +136,7 @@ public:
RPCClientTask(const std::string& service_name,
const std::string& method_name,
const RPCTaskParams *params,
std::list<RPCModule *>&& modules,
const std::list<RPCModule *>&& modules,
user_done_t&& user_done);

bool get_remote(std::string& ip, unsigned short *port) const;
Expand All @@ -164,7 +162,7 @@ class RPCServerTask : public WFServerTask<RPCREQ, RPCRESP>
public:
RPCServerTask(CommService *service,
std::function<void (WFNetworkTask<RPCREQ, RPCRESP> *)>& process,
std::list<RPCModule *>&& modules) :
const std::list<RPCModule *>&& modules) :
WFServerTask<RPCREQ, RPCRESP>(service, WFGlobal::get_scheduler(), process),
worker(new RPCContextImpl<RPCREQ, RPCRESP>(this), &this->req, &this->resp),
modules_(modules)
Expand Down Expand Up @@ -371,9 +369,9 @@ inline int RPCClientTask<RPCREQ, RPCRESP>::__serialize_input(const IDL *in)
return -1;
}

static inline bool addr_to_string(const struct sockaddr *addr,
char *ip_str, socklen_t len,
unsigned short *port)
static bool addr_to_string(const struct sockaddr *addr,
char *ip_str, socklen_t len,
unsigned short *port)
{
const char *ret = NULL;

Expand Down Expand Up @@ -402,7 +400,7 @@ inline RPCClientTask<RPCREQ, RPCRESP>::RPCClientTask(
const std::string& service_name,
const std::string& method_name,
const RPCTaskParams *params,
std::list<RPCModule *>&& modules,
const std::list<RPCModule *>&& modules,
user_done_t&& user_done):
WFComplexClientTask<RPCREQ, RPCRESP>(0, nullptr),
user_done_(std::move(user_done)),
Expand Down Expand Up @@ -607,14 +605,15 @@ bool RPCServerTask<RPCREQ, RPCRESP>::get_remote(std::string& ip,

return false;
}
static inline void log_format(std::string& key, std::string& value,
const RPCLogVector& fields)

static void log_format(std::string& key, std::string& value,
const RPCLogVector& fields)
{
if (fields.size() == 0)
return;

char buffer[100];
snprintf(buffer, 100, "%s%c%lld", SRPC_SPAN_LOG, ' ', GET_CURRENT_MS);
snprintf(buffer, 100, "%s%c%ld", SRPC_SPAN_LOG, ' ', GET_CURRENT_MS);
key = std::move(buffer);
value = "{\"";

Expand All @@ -623,8 +622,8 @@ static inline void log_format(std::string& key, std::string& value,
value = value + std::move(field.first) + "\":\""
+ std::move(field.second) + "\",";
}
value[value.length() - 1] = '}';

value[value.length() - 1] = '}';
}

template<class RPCREQ, class RPCRESP>
Expand Down

0 comments on commit c9194e8

Please sign in to comment.