Skip to content

Commit

Permalink
Update: Net client surpported coroutine async request
Browse files Browse the repository at this point in the history
  • Loading branch information
i0gan committed Mar 31, 2024
1 parent 4555b26 commit d47374b
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void DefaultStartUp(std::string strArgvList, std::vector<std::shared_ptr<PluginS
AddPluginServer(serverList, "type=login id=2 area=0 ip=127.0.0.1 port=10301 web_port=80 master_ip=127.0.0.1 master_port=10001");
AddPluginServer(serverList, "type=lobby id=1000 area=0 ip=127.0.0.1 port=10401 master_ip=127.0.0.1 master_port=10001");
AddPluginServer(serverList, "type=lobby id=1001 area=0 ip=127.0.0.1 port=10402 master_ip=127.0.0.1 master_port=10001");
AddPluginServer(serverList, "type=proxy id=500 area=0 ip=127.0.0.1 port=10501 master_ip=127.0.0.1 master_port=10001");
AddPluginServer(serverList, "type=proxy id=500 area=0 ip=127.0.0.1 port=10501 ws_port=10502 master_ip=127.0.0.1 master_port=10001");
}

void TutorialStartUp(std::string strArgvList, std::vector<std::shared_ptr<PluginServer>>& serverList) {
Expand Down
24 changes: 22 additions & 2 deletions src/node/login/http/http_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Guid HttpModule::CreatePlayerGUID() {
return xID;
}

bool HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {
Coroutine<bool> HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {
std::string res_str;
ReqLogin req;
AckLogin ack;
Expand All @@ -83,7 +83,23 @@ bool HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {
break;
}
#endif
// test
dout << "async query.... master ...";
rpc::NnReqMinWorkloadNodeInfo req;
req.add_type_list(ST_PROXY);
auto data = co_await m_net_client_->RequestPB(DEFAULT_NODE_MASTER_ID, rpc::MasterRPC::NN_REQ_MIN_WORKLOAD_NODE_INFO, req, rpc::MasterRPC::NN_ACK_MIN_WORKLOAD_NODE_INFO);
if (data.error) {
co_return;
}

string guid;
rpc::NnAckMinWorkloadNodeInfo ack;
if (!INetModule::ReceivePB(data.ack_msg_id, data.data, data.length, ack, guid)) {
co_return;
}
dout << "master response.... min proxy id: " << ack.list()[0].id() << " name: " << ack.list()[0].name() << endl;

/*
if (!m_mysql_->IsHave("account", req.account)) {
dout << "AccountPasswordLogin 注册账号: account: " << req.account << " " << req.password << std::endl;
// 注册该账号
Expand All @@ -98,6 +114,7 @@ bool HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {
ack.msg = "server error, this player is not exsited!\n";
}
}
*/
} else if (req.type == LoginType::PhonePasswordLogin) {
}
dout << "AccountPasswordLogin: account: " << req.account << " " << req.password << " AccountID: " << account_id << std::endl;
Expand All @@ -111,12 +128,14 @@ bool HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {

time_t login_time = SquickGetTimeS();

/*
// 缓存到redis
m_redis_->HashSet(account_id, "account", req.account);
m_redis_->HashSet(account_id, "token", token);
m_redis_->HashSet(account_id, "account_id", account_id);
m_redis_->HashSet(account_id, "login_limit_time", std::to_string(1209600)); // 14天
m_redis_->HashSet(account_id, "login_time", std::to_string(login_time));
*/

json j;
j["account"] = req.account;
Expand All @@ -132,7 +151,8 @@ bool HttpModule::OnLogin(std::shared_ptr<HttpRequest> request) {

ajson::save_to(rep_ss, ack);

return m_http_server_->ResponseMsg(request, rep_ss.str(), WebStatus::WEB_OK);
m_http_server_->ResponseMsg(request, rep_ss.str(), WebStatus::WEB_OK);
co_return;
}

nlohmann::json HttpModule::GetUser(std::shared_ptr<HttpRequest> req) {
Expand Down
2 changes: 1 addition & 1 deletion src/node/login/http/http_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class HttpModule : public IHttpModule {
virtual bool Update();

protected:
bool OnLogin(std::shared_ptr<HttpRequest> request);
Coroutine<bool> OnLogin(std::shared_ptr<HttpRequest> request);
bool OnWorldList(std::shared_ptr<HttpRequest> request);
bool OnWorldEnter(std::shared_ptr<HttpRequest> request);
WebStatus Middleware(std::shared_ptr<HttpRequest> request);
Expand Down
27 changes: 23 additions & 4 deletions src/node/master/node/node_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ void NodeModule::OnNnReqNodeRegister(const socket_t sock, const int msg_id, cons
NtfSubscribNode(new_node_id);
}



void NodeModule::AddSubscribeNode(int new_node_id, vector<int> types) {
for (auto t : types) {
if (t != 0) {
Expand Down Expand Up @@ -164,10 +162,30 @@ void NodeModule::OnNnNtfNodeReport(const socket_t sock, const int msg_id, const
}

void NodeModule::OnNnReqMinWorkNodeInfo(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {

rpc::NnReqMinWorkloadNodeInfo req;
rpc::NnAckMinWorkloadNodeInfo ack;
string guid;

rpc::MsgBase msg_base;
if (!msg_base.ParseFromArray(msg, len)) {
return;
}
if (!req.ParseFromString(msg_base.msg_data())) {
return;
}
dout << "find ....\n";
for (auto type : req.type_list()) {
int id = GetLoadBanlanceNode((ServerType)type);
if (id == -1) continue;
dout << " added min id: " << id << endl;
auto p = ack.add_list();
auto iter = node_map_.find(id);
*p = *iter->second.info;
}
reqid_t req_id = msg_base.req_id();
m_net_->SendMsgPB(rpc::MasterRPC::NN_ACK_MIN_WORKLOAD_NODE_INFO, ack, sock, "", req_id);
}


int NodeModule::GetLoadBanlanceNode(ServerType type) {
int node_id = -1;
int min_workload = 99999;
Expand Down Expand Up @@ -207,6 +225,7 @@ std::string NodeModule::GetServersStatus() {
n["cpu_count"] = sd->cpu_count();
n["status"] = sd->state();
n["workload"] = sd->workload();
n["max_online"] = sd->max_online();
n["update_time"] = sd->update_time();
statusRoot["node_list"][to_string(sd->id())] = n;
}
Expand Down
2 changes: 1 addition & 1 deletion src/node/proxy/logic/logic_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bool LogicModule::AfterStart() {
m_ws_->AddReceiveCallBack(rpc::ProxyRPC::REQ_CONNECT_PROXY, this, &LogicModule::OnReqConnectWithWS);
m_ws_->AddReceiveCallBack(this, &LogicModule::OnOtherMessage);

m_ws_->Startialization(100, 8888);
m_ws_->Startialization(DEFAULT_NODE_MAX_SERVER_CONNECTION, pm_->GetArg("ws_port=", 10502));
m_ws_->AddEventCallBack(this, &LogicModule::OnWebSocketClientEvent);
return true;
}
Expand Down
10 changes: 7 additions & 3 deletions src/proto/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ message Server {
int32 cpu_count = 6;
ServerState state = 7;
int32 type = 8;
int32 area = 9; // 区
int32 area = 9;
int32 connections = 10;
int32 workload = 11;
bytes key = 12; //密钥
bytes key = 12;
bytes public_ip = 13;
int32 update_time = 14; // 更新时间
int32 update_time = 14;
repeated int32 added_node_id_list = 15;
int32 ws_port = 16;
int32 http_port = 17;
int32 https_port = 18;
}

//message ServerList { repeated Server list = 1; }
Expand Down
9 changes: 5 additions & 4 deletions src/proto/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import "base.proto";

// only be used in inner node
message MsgBase {
int32 id = 1;
bytes guid = 2;
bytes msg_data = 3;
repeated bytes broadcast = 4;
int32 id = 1; // node id
bytes guid = 2; // player guid
bytes msg_data = 3; // data
repeated bytes broadcast = 4; // broadcast guid list
int64 req_id = 5; // request id
}

// Node RPC 0 ~ 500
Expand Down
28 changes: 21 additions & 7 deletions src/squick/imodule/i_node_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <squick/plugin/net/export.h>
#include <struct/struct.h>

#define DEFAULT_MASTER_ID 1
#define DEFAULT_NODE_MASTER_ID 1
#define DEFAULT_NODE_CPUT_COUNT 100
#define DEFAULT_NODE_MAX_SERVER_CONNECTION 100000

class INodeBaseModule : public IModule {
public:
Expand Down Expand Up @@ -46,6 +48,19 @@ class INodeBaseModule : public IModule {

static std::string EnumNodeTypeToString(ServerType type)
{
switch (type) {
case ServerType::ST_MASTER: return "master";
case ServerType::ST_LOGIN: return "login";
case ServerType::ST_WORLD: return "world";
case ServerType::ST_DB_PROXY: return "db_proxy";
case ServerType::ST_PROXY: return "proxy";
case ServerType::ST_LOBBY: return "lobby";
case ServerType::ST_GAME_MGR: return "game_mgr";
case ServerType::ST_GAME: return "game";
case ServerType::ST_MICRO: return "micro";
case ServerType::ST_CDN: return "cdn";
case ServerType::ST_ROBOT: return "robot";
}
return "";
}

Expand All @@ -70,7 +85,6 @@ class INodeBaseModule : public IModule {
m_net_->AddEventCallBack(this, &INodeBaseModule::OnServerSocketEvent);
m_net_->ExpandBufferSize();


node_info_.info->set_id(pm_->GetArg("id=", 0));
std::string name = pm_->GetArg("type=", "proxy") + pm_->GetArg("id=", "0");
node_info_.info->set_type(StringNodeTypeToEnum(pm_->GetArg("type=", "proxy")));
Expand All @@ -81,8 +95,8 @@ class INodeBaseModule : public IModule {
node_info_.info->set_public_ip(pm_->GetArg("public_ip=", "127.0.0.1"));
node_info_.info->set_area(pm_->GetArg("area=", 0));
node_info_.info->set_update_time(SquickGetTimeS());
node_info_.info->set_max_online(10000);
node_info_.info->set_cpu_count(8);
node_info_.info->set_max_online(DEFAULT_NODE_MAX_SERVER_CONNECTION);
node_info_.info->set_cpu_count(DEFAULT_NODE_CPUT_COUNT);

pm_->SetAppType(node_info_.info->type());
pm_->SetArea(node_info_.info->area());
Expand Down Expand Up @@ -115,7 +129,7 @@ class INodeBaseModule : public IModule {
bool ret = false;
node_info_.listen_types = types;
ConnectData s;
s.id = DEFAULT_MASTER_ID;
s.id = DEFAULT_NODE_MASTER_ID;
s.type = StringNodeTypeToEnum("master");
s.ip = pm_->GetArg("master_ip=", "127.0.0.1");
s.port = pm_->GetArg("master_port=", 10001);
Expand Down Expand Up @@ -170,7 +184,7 @@ class INodeBaseModule : public IModule {
req.set_id(pm_->GetAppID());
auto s = req.add_list();
*s = *node_info_.info.get();
m_net_client_->SendPBByID(DEFAULT_MASTER_ID, rpc::MasterRPC::NN_NTF_NODE_REPORT, req);
m_net_client_->SendPBByID(DEFAULT_NODE_MASTER_ID, rpc::MasterRPC::NN_NTF_NODE_REPORT, req);
}
}

Expand Down Expand Up @@ -247,7 +261,7 @@ class INodeBaseModule : public IModule {
AddNodes(ack.node_add_list());
}
else {
dout << "注册失败!";
m_log_->LogError(Guid(0, pm_->GetAppID()), "Register faild!");
}
}

Expand Down
16 changes: 9 additions & 7 deletions src/squick/plugin/net/i_net_client_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ struct ConnectData {
};

struct NetClientResponseData {
bool error;
reqid_t req_id;
int req_msg_id;
int ack_msg_id;
socket_t sock;
char* data;
const char* data;
size_t length;
};

Expand Down Expand Up @@ -94,15 +95,16 @@ class INetClientModule : public IModule {

////////////////////////////////////////////////////////////////////////////////
virtual bool IsConnected(const int node_id) = 0;
virtual bool SendByID(const int serverID, const uint16_t msg_id, const std::string& strData, const string id = "") = 0;
virtual bool SendPBByID(const int serverID, const uint16_t msg_id, const google::protobuf::Message& xData, const string id = "") = 0;
virtual void SendToAllNode(const uint16_t msg_id, const std::string& strData, const string id = "") = 0;
virtual void SendToAllNodeByType(const ServerType eType, const uint16_t msg_id, const std::string& strData, const string id = "") = 0;
virtual bool SendByID(const int serverID, const uint16_t msg_id, const std::string& strData, const string guid = "", reqid_t req_id = 0) = 0;
virtual bool SendPBByID(const int serverID, const uint16_t msg_id, const google::protobuf::Message& xData, const string guid = "", reqid_t req_id = 0) = 0;
virtual void SendToAllNode(const uint16_t msg_id, const std::string& strData, const string guid = "") = 0;
virtual void SendToAllNodeByType(const ServerType eType, const uint16_t msg_id, const std::string& strData, const string guid = "") = 0;
virtual void SendPBToAllNode(const uint16_t msg_id, const google::protobuf::Message& xData, const string id = "") = 0;
virtual void SendPBToAllNodeByType(const ServerType eType, const uint16_t msg_id, const google::protobuf::Message& xData, const string id = "") = 0;
virtual void SendPBToAllNodeByType(const ServerType eType, const uint16_t msg_id, const google::protobuf::Message& xData, const string guid = "") = 0;
////////////////////////////////////////////////////////////////////////////////
// coroutine
virtual Awaitable<NetClientResponseData> RequestByID(const int serverID, const uint16_t msg_id, const std::string& strData, int ack_msg_id) = 0;
virtual Awaitable<NetClientResponseData> Request(const int serverID, const uint16_t msg_id, const std::string& data, int ack_msg_id) = 0;
virtual Awaitable<NetClientResponseData> RequestPB(const int node_id, const uint16_t msg_id, const google::protobuf::Message& pb, int ack_msg_id) = 0;

virtual MapEx<int, ConnectData>& GetServerList() = 0;
virtual std::shared_ptr<ConnectData> GetServerNetInfo(const ServerType eType) = 0;
Expand Down
7 changes: 2 additions & 5 deletions src/squick/plugin/net/i_net_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,9 @@ class INetModule : public IModule {
virtual bool Update() = 0;

virtual bool SendMsgWithOutHead(const int msg_id, const std::string &msg, const socket_t sock) = 0;

virtual bool SendMsgToAllClientWithOutHead(const int msg_id, const std::string &msg) = 0;

virtual bool SendMsgPB(const uint16_t msg_id, const google::protobuf::Message &xData, const socket_t sock) = 0;
virtual bool SendMsgPB(const uint16_t msg_id, const google::protobuf::Message &xData, const socket_t sock, const string nPlayer) = 0;
virtual bool SendMsg(const uint16_t msg_id, const std::string &xData, const socket_t sock, const string guid) = 0;
virtual bool SendMsgPB(const uint16_t msg_id, const google::protobuf::Message &xData, const socket_t sock, const string guid = "", reqid_t req_id = 0) = 0;
virtual bool SendMsg(const uint16_t msg_id, const std::string &xData, const socket_t sock, const string guid = "", reqid_t req_id = 0) = 0;

virtual bool SendMsgPBToAllClient(const uint16_t msg_id, const google::protobuf::Message &xData) = 0;

Expand Down
3 changes: 1 addition & 2 deletions src/squick/plugin/net/net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,11 @@ bool Net::Log(int severity, const char *msg) {
return true;
}

// 发送带有头部的消息
// Rpc send
bool Net::SendMsgWithOutHead(const int16_t msg_id, const char *msg, const size_t len, const socket_t sock /*= 0*/) {
std::string strOutData;
int nAllLen = EnCode(msg_id, msg, len, strOutData);
if (nAllLen == len + IMsgHead::SQUICK_Head::SQUICK_HEAD_LENGTH) {

return SendMsg(strOutData.c_str(), strOutData.length(), sock);
}

Expand Down
Loading

0 comments on commit d47374b

Please sign in to comment.