Skip to content

Commit

Permalink
fix:Daemon,clang-tidy warring (#64)
Browse files Browse the repository at this point in the history
* fix daemonize,fix clang-tidy warring,fix log not do flush on process exit

* format file
  • Loading branch information
lqxhub authored Dec 4, 2023
1 parent ed5d6ba commit 3dcfc45
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 64 deletions.
40 changes: 18 additions & 22 deletions src/io_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "io_thread_pool.h"

#include <csignal>

#include <cassert>
#include <cstring>

#include "io_thread_pool.h"
#include "pstd/log.h"
#include "util.h"

Expand Down Expand Up @@ -41,11 +37,12 @@ bool IOThreadPool::SetWorkerNum(size_t num) {
return true;
}

bool IOThreadPool::Init(const char* ip, int port, NewTcpConnectionCallback cb) {
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);
bool IOThreadPool::Init(const char* ip, int port, const NewTcpConnectionCallback& cb) {
auto f = [this] { return ChooseNextWorkerEventLoop(); };

base_.Init();
printf("base loop %s %p, g_baseLoop %p\n", base_.GetName().c_str(), &base_, base_.Self());
INFO("base loop {} {}, g_baseLoop {}", base_.GetName(), static_cast<void*>(&base_),
static_cast<void*>(pikiwidb::EventLoop::Self()));
if (!base_.Listen(ip, port, cb, f)) {
ERROR("can not bind socket on addr {}:{}", ip, port);
return false;
Expand All @@ -56,7 +53,7 @@ bool IOThreadPool::Init(const char* ip, int port, NewTcpConnectionCallback cb) {

void IOThreadPool::Run(int ac, char* av[]) {
assert(state_ == State::kNone);
INFO("Process starting...");
INFO("Process {} starting...", name_);

// start loops in thread pool
StartWorkers();
Expand All @@ -68,15 +65,15 @@ void IOThreadPool::Run(int ac, char* av[]) {

worker_threads_.clear();

INFO("Process stopped, goodbye...");
INFO("Process {} stopped, goodbye...", name_);
}

void IOThreadPool::Exit() {
state_ = State::kStopped;

BaseLoop()->Stop();
for (size_t index = 0; index < worker_loops_.size(); ++index) {
EventLoop* loop = worker_loops_[index].get();
for (const auto& worker_loop : worker_loops_) {
EventLoop* loop = worker_loop.get();
loop->Stop();
}
}
Expand All @@ -100,7 +97,7 @@ void IOThreadPool::StartWorkers() {

size_t index = 1;
while (worker_loops_.size() < worker_num_) {
std::unique_ptr<EventLoop> loop(new EventLoop);
std::unique_ptr<EventLoop> loop = std::make_unique<EventLoop>();
if (!name_.empty()) {
loop->SetName(name_ + "_" + std::to_string(index++));
INFO("loop {}, name {}", static_cast<void*>(loop.get()), loop->GetName().c_str());
Expand All @@ -114,7 +111,7 @@ void IOThreadPool::StartWorkers() {
loop->Init();
loop->Run();
});
printf("thread %lu, thread loop %p, loop name %s \n", index, loop, loop->GetName().c_str());
INFO("thread {}, thread loop {}, loop name {}", index, static_cast<void*>(loop), loop->GetName().c_str());
worker_threads_.push_back(std::move(t));
}

Expand All @@ -123,21 +120,20 @@ void IOThreadPool::StartWorkers() {

void IOThreadPool::SetName(const std::string& name) { name_ = name; }

bool IOThreadPool::Listen(const char* ip, int port, NewTcpConnectionCallback ccb) {
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);
bool IOThreadPool::Listen(const char* ip, int port, const NewTcpConnectionCallback& ccb) {
auto f = [this] { return ChooseNextWorkerEventLoop(); };
auto loop = BaseLoop();
return loop->Execute([loop, ip, port, ccb, f]() { return loop->Listen(ip, port, std::move(ccb), f); }).get();
return loop->Execute([loop, ip, port, ccb, f]() { return loop->Listen(ip, port, ccb, f); }).get();
}

void IOThreadPool::Connect(const char* ip, int port, NewTcpConnectionCallback ccb, TcpConnectionFailCallback fcb,
EventLoop* loop) {
void IOThreadPool::Connect(const char* ip, int port, const NewTcpConnectionCallback& ccb,
const TcpConnectionFailCallback& fcb, EventLoop* loop) {
if (!loop) {
loop = ChooseNextWorkerEventLoop();
}

std::string ipstr(ip);
loop->Execute(
[loop, ipstr, port, ccb, fcb]() { loop->Connect(ipstr.c_str(), port, std::move(ccb), std::move(fcb)); });
std::string ipStr(ip);
loop->Execute([loop, ipStr, port, ccb, fcb]() { loop->Connect(ipStr.c_str(), port, ccb, fcb); });
}

std::shared_ptr<HttpServer> IOThreadPool::ListenHTTP(const char* ip, int port, HttpServer::OnNewClient cb) {
Expand Down
10 changes: 5 additions & 5 deletions src/io_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class IOThreadPool {
IOThreadPool() = default;
~IOThreadPool() = default;

static const size_t GetMaxWorkerNum() { return kMaxWorkers; }
static size_t GetMaxWorkerNum() { return kMaxWorkers; }

bool Init(const char* ip, int port, NewTcpConnectionCallback ccb);
bool Init(const char* ip, int port, const NewTcpConnectionCallback& ccb);
void Run(int argc, char* argv[]);
void Exit();
bool IsExit() const;
Expand All @@ -41,11 +41,11 @@ class IOThreadPool {
void SetName(const std::string& name);

// TCP server
bool Listen(const char* ip, int port, NewTcpConnectionCallback ccb);
bool Listen(const char* ip, int port, const NewTcpConnectionCallback& ccb);

// TCP client
void Connect(const char* ip, int port, NewTcpConnectionCallback ccb,
TcpConnectionFailCallback fcb = TcpConnectionFailCallback(), EventLoop* loop = nullptr);
void Connect(const char* ip, int port, const NewTcpConnectionCallback& ccb,
const TcpConnectionFailCallback& fcb = TcpConnectionFailCallback(), EventLoop* loop = nullptr);

// HTTP server
std::shared_ptr<HttpServer> ListenHTTP(const char* ip, int port,
Expand Down
87 changes: 54 additions & 33 deletions src/pikiwidb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
//
// PikiwiDB.cc

#include <spawn.h>
#include <sys/fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#include <csignal>
#include <iostream>
#include <thread>

Expand All @@ -33,25 +32,20 @@

std::unique_ptr<PikiwiDB> g_pikiwidb;

static void SignalHandler(int) {
if (g_pikiwidb) {
g_pikiwidb->Stop();
}
static void IntSigHandle(const int sig) {
INFO("Catch Signal {}, cleanup...", sig);
g_pikiwidb->Stop();
}

static void InitSignal() {
struct sigaction sig;
::memset(&sig, 0, sizeof(sig));

sig.sa_handler = SignalHandler;
sigaction(SIGINT, &sig, NULL);

// ignore sigpipe
sig.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sig, NULL);
static void SignalSetup() {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, &IntSigHandle);
signal(SIGQUIT, &IntSigHandle);
signal(SIGTERM, &IntSigHandle);
}

const unsigned PikiwiDB::kRunidSize = 40;
const uint32_t PikiwiDB::kRunidSize = 40;

static void Usage() {
std::cerr << "Usage: ./pikiwidb-server [/path/to/redis.conf] [options]\n\
Expand Down Expand Up @@ -83,7 +77,7 @@ bool PikiwiDB::ParseArgs(int ac, char* av[]) {
if (++i == ac) {
return false;
}
port_ = static_cast<unsigned short>(std::atoi(av[i]));
port_ = static_cast<int16_t>(std::atoi(av[i]));
} else if (strncasecmp(av[i], "--loglevel", 10) == 0) {
if (++i == ac) {
return false;
Expand All @@ -95,7 +89,7 @@ bool PikiwiDB::ParseArgs(int ac, char* av[]) {
}

master_ = std::string(av[++i]);
master_port_ = static_cast<unsigned short>(std::atoi(av[++i]));
master_port_ = static_cast<int16_t>(std::atoi(av[++i]));
} else {
std::cerr << "Unknow option " << av[i] << std::endl;
return false;
Expand All @@ -112,7 +106,8 @@ static void PdbCron() {
return;
}

if (Now() > (g_lastPDBSave + unsigned(g_config.saveseconds)) * 1000UL && PStore::dirty_ >= g_config.savechanges) {
if (Now() > (g_lastPDBSave + static_cast<unsigned>(g_config.saveseconds)) * 1000UL &&
PStore::dirty_ >= g_config.savechanges) {
int ret = fork();
if (ret == 0) {
{
Expand Down Expand Up @@ -218,8 +213,8 @@ bool PikiwiDB::Init() {
ERROR("number of threads can't exceeds {}, now is {}", kMaxWorkerNum, num);
return false;
}
worker_threads_.SetWorkerNum((size_t)(g_config.worker_threads_num));
slave_threads_.SetWorkerNum((size_t)(g_config.slave_threads_num));
worker_threads_.SetWorkerNum(static_cast<size_t>(g_config.worker_threads_num));
slave_threads_.SetWorkerNum(static_cast<size_t>(g_config.slave_threads_num));

PCommandTable::Init();
PCommandTable::AliasCommand(g_config.aliases);
Expand Down Expand Up @@ -249,12 +244,6 @@ bool PikiwiDB::Init() {
PREPL.SetMasterAddr(g_config.masterIp.c_str(), g_config.masterPort);
}

// output logo to console
char logo[512] = "";
snprintf(logo, sizeof logo - 1, pikiwidbLogo, PIKIWIDB_VERSION, static_cast<int>(sizeof(void*)) * 8,
static_cast<int>(g_config.port));
std::cout << logo;

cmd_table_manager_.InitCmdTable();

return true;
Expand All @@ -272,6 +261,7 @@ void PikiwiDB::Run() {

worker_threads_.Run(0, nullptr);

t.join(); // wait for slave thread exit
INFO("server exit running");
}

Expand All @@ -292,12 +282,28 @@ static void InitLogs() {
#endif
}

static void daemonize() {
if (fork()) {
exit(0); /* parent exits */
}
setsid(); /* create a new session */
}

static void closeStd() {
int fd;
fd = open("/dev/null", O_RDWR, 0);
if (fd != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
close(fd);
}
}

int main(int ac, char* av[]) {
[[maybe_unused]] rocksdb::DB* db;
g_pikiwidb = std::make_unique<PikiwiDB>();
pstd::InitRandom();
InitSignal();
InitLogs();

if (!g_pikiwidb->ParseArgs(ac - 1, av + 1)) {
Usage();
return -1;
Expand All @@ -310,14 +316,29 @@ int main(int ac, char* av[]) {
}
}

// output logo to console
char logo[512] = "";
snprintf(logo, sizeof logo - 1, pikiwidbLogo, PIKIWIDB_VERSION, static_cast<int>(sizeof(void*)) * 8,
static_cast<int>(pikiwidb::g_config.port));
std::cout << logo;

if (pikiwidb::g_config.daemonize) {
daemonize();
}

pstd::InitRandom();
SignalSetup();
InitLogs();

if (pikiwidb::g_config.daemonize) {
pid_t pid;
::posix_spawn(&pid, av[0], nullptr, nullptr, av, nullptr);
closeStd();
}

if (g_pikiwidb->Init()) {
g_pikiwidb->Run();
}

// when process exit, flush log
spdlog::get(logger::Logger::Instance().Name())->flush();
return 0;
}
8 changes: 4 additions & 4 deletions src/pikiwidb.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PikiwiDB final {

bool Init();
void Run();
void Recycle();
// void Recycle();
void Stop();

void OnNewConnection(pikiwidb::TcpConnection* obj);
Expand All @@ -32,13 +32,13 @@ class PikiwiDB final {

public:
pikiwidb::PString cfg_file_;
unsigned short port_{0};
uint16_t port_{0};
pikiwidb::PString log_level_;

pikiwidb::PString master_;
unsigned short master_port_{0};
uint16_t master_port_{0};

static const unsigned kRunidSize;
static const uint32_t kRunidSize;

private:
pikiwidb::IOThreadPool worker_threads_;
Expand Down

0 comments on commit 3dcfc45

Please sign in to comment.