diff --git a/include/Helper.hpp b/include/Helper.hpp index 89f233c..948fcdf 100644 --- a/include/Helper.hpp +++ b/include/Helper.hpp @@ -2,11 +2,45 @@ #define __REDIS_SERVER_HELPER_HPP__ #include #include +#include +#include #include + +/** + * @brief Convert a string to lowercase. + * + * This function takes a string and converts all its characters to lowercase. + * It uses the std::transform algorithm with a lambda function that applies + * std::tolower to each character. + * + * @param s The input string to be converted. + * @return std::string The lowercase version of the input string. + */ inline std::string strTolower(std::string s) { std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return std::tolower(c); } // correct ); return s; } + +/** + * @brief Generate a random string of a given length + * + * @param length The length of the string to generate + * @return std::string A randomly generated string of the specified length + * containing alphanumeric characters (0-9, A-Z, a-z) + */ +inline std::string randomString(std::size_t length) { + std::srand(std::time(0)); // use current time as seed for random generator + auto randChar = []() -> char { + const char charset[] = "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + const size_t max_index = (sizeof(charset) - 1); + return charset[rand() % max_index]; + }; + std::string str(length, 0); + std::generate_n(str.begin(), length, randChar); + return str; +} #endif \ No newline at end of file diff --git a/include/RedisServer.hpp b/include/RedisServer.hpp index 1c29def..e4a5b2b 100644 --- a/include/RedisServer.hpp +++ b/include/RedisServer.hpp @@ -1,6 +1,7 @@ #ifndef REDIS_SERVER_HPP #define REDIS_SERVER_HPP #include "Config.hpp" +#include "TCPClient.hpp" #include "Types.hpp" #include #include @@ -14,12 +15,17 @@ namespace Redis { class Server { public: using SharedPtr = std::shared_ptr; + /** * @brief Construct a new Redis Server. - * + * @param port The server port. + * @param masterIp Optional master server ip to create a replica. + * @param masterPort Optional master server port to create a replica. */ - Server(); + Server(int port = 6379, std::optional masterIp = std::nullopt, + std::optional masterPort = std::nullopt); + virtual ~Server(); /** * @brief Giving a message from redis client, parse it and return the expected * response. @@ -31,6 +37,15 @@ class Server { */ std::optional handleRequest(const std::string &message); + /** + * @brief Is this server a replica of another master redis server. + * + * @return Bool True if it's a replica. + */ + bool isReplica() const { + return masterIp.has_value() && masterPort.has_value(); + } + private: /** * @brief Initialize the cmdsLUT which holds redis command as a key @@ -124,6 +139,8 @@ class Server { */ std::string infoCommand(const std::vector &commands); + bool handShakeMaster(); + /** * @brief The main server database. Reading from the database should be thread * safe. Inserting in the database should only be done through the function @@ -149,6 +166,55 @@ class Server { std::unordered_map< std::string, std::function &)>> cmdsLUT; + + /** + * @brief The port number on which this Redis server is listening. + */ + int port; + + /** + * @brief The IP address of the master Redis server, if this server is a + * replica. This is std::nullopt if this server is not a replica. + */ + std::optional masterIp; + + /** + * @brief The port number of the master Redis server, if this server is a + * replica. This is std::nullopt if this server is not a replica. + */ + std::optional masterPort; + + /** + * @brief The replication ID of the master server. + * This is used to identify the replication stream. + */ + std::string masterReplId; + + /** + * @brief The current replication offset of this server. + * This represents how much of the master's replication stream has been + * processed. + */ + int masterReplOffset = 0; + + /** + * @brief A TCP client used to connect to the master server when this server + * is a replica. This is std::nullopt if this server is not a replica. + */ + std::optional replicaClient = std::nullopt; + + /** + * @brief The I/O context for asynchronous operations. + * This is used by Asio for managing asynchronous operations. + */ + asio::io_context ioContext; + + /** + * @brief A thread dedicated to running the I/O context. + * This allows asynchronous operations to be processed independently of the + * main thread. + */ + std::thread ioThread; }; } // namespace Redis diff --git a/include/TCPClient.hpp b/include/TCPClient.hpp new file mode 100644 index 0000000..e3aab8d --- /dev/null +++ b/include/TCPClient.hpp @@ -0,0 +1,78 @@ +#ifndef __REDIS_SERVER_TCP_CLIENT_HPP__ +#define __REDIS_SERVER_TCP_CLIENT_HPP__ +#include "Logging.hpp" +#include +#include + +using asio::ip::tcp; + +/** + * @brief A class representing a TCP client using the Asio library. + * + * This class provides functionality to connect to a TCP server, + * send messages, and receive responses. + */ +class TCPClient { +public: + /** + * @brief Constructs a TCPClient and establishes a connection to the server. + * + * @param io_context The Asio io_context to use for asynchronous operations. + * @param ip The IP address of the server to connect to. + * @param port The port number of the server to connect to. + */ + TCPClient(asio::io_context &io_context, std::string ip, int port) + : ioContext_(io_context), socket_(ioContext_) { + tcp::resolver resolver(ioContext_); + tcp::resolver::results_type endpoints = + resolver.resolve(ip, std::to_string(port)); + asio::connect(socket_, endpoints); + } + + /** + * @brief Sends a message to the connected server. + * + * @param msg The message to send. + * @return An error code indicating the success or failure of the operation. + */ + std::error_code send(const std::string &msg) { + asio::error_code error; + asio::write(socket_, asio::buffer(msg), error); + return error; + } + + /** + * @brief Reads a message from the connected server. + * + * This method reads until it encounters a "\r\n" sequence or reaches EOF. + * + * @param msg A reference to a string where the received message will be + * stored. + * @return An error code indicating the success or failure of the operation. + */ + std::error_code read(std::string &msg) { + asio::streambuf receive_buffer; + asio::error_code error; + asio::read_until(socket_, receive_buffer, "\r\n", error); + if (error && error != asio::error::eof) { + LOG_ERROR("Receiving failed {}", error.message()); + return error; + } + msg = std::string((std::istreambuf_iterator(&receive_buffer)), + std::istreambuf_iterator()); + return error; + } + +private: + /** + * @brief Reference to the Asio io_context used for asynchronous operations. + */ + asio::io_context &ioContext_; + + /** + * @brief The TCP socket used for communication with the server. + */ + tcp::socket socket_; +}; + +#endif \ No newline at end of file diff --git a/src/RedisServer.cpp b/src/RedisServer.cpp index 3c7434e..fff6531 100644 --- a/src/RedisServer.cpp +++ b/src/RedisServer.cpp @@ -3,14 +3,28 @@ #include "Logging.hpp" #include "RDBFile.hpp" #include "RESP/RESP.hpp" +#include #include #include +#include + namespace fs = std::filesystem; namespace Redis { -Server::Server() { +Server::Server(int port, std::optional masterIp, + std::optional masterPort) + : port(port), masterIp(masterIp), masterPort(masterPort), + masterReplId(randomString(40)) { + initCmdsLUT(); + + if (isReplica() && !handShakeMaster()) { + ioContext.stop(); + ioThread.join(); + throw std::runtime_error("Couldn't connect to the master server"); + } + fs::path rdbFilePath = fs::path(config_.dir) / fs::path(config_.dbfilename); auto rdbDatabase = parseRDBFile(rdbFilePath); if (rdbDatabase) { @@ -20,6 +34,52 @@ Server::Server() { } } +Server::~Server() { + if (isReplica()) { + ioContext.stop(); + ioThread.join(); + } +} + +bool Server::handShakeMaster() { + replicaClient.emplace(ioContext, *masterIp, *masterPort); + ioThread = std::thread([&] { ioContext.run(); }); + LOG_INFO("Pinging the master server on {} {}", *masterIp, *masterPort); + auto readWrite = [&](const std::string_view &cmd, + const std::string_view &expectedReply) -> bool { + auto error = replicaClient->send(std::string(cmd)); + if (error) { + LOG_ERROR("Error Sending ping to the master {}", error.message()); + return false; + } + std::string reply; + error = replicaClient->read(reply); + if (error) { + LOG_ERROR("Error getting reply from the master {}", error.message()); + return false; + } + LOG_INFO("Received reply from master {}", reply); + return expectedReply == reply; + }; + + if (!readWrite("*1\r\n$4\r\nPING\r\n", "+PONG\r\n")) { + return false; + } + if (!readWrite(RESP::toStringArray( + {"REPLCONF", "listening-port", std::to_string(port)}), + "+OK\r\n")) { + return false; + } + if (!readWrite(RESP::toStringArray({"REPLCONF", "capa", "psync2"}), + "+OK\r\n")) { + return false; + } + + readWrite(RESP::toStringArray({"PSYNC", "?", "-1"}), ""); + + return true; +} + void Server::initCmdsLUT() { cmdsLUT["ping"] = std::bind(&Server::pingCommand, this, std::placeholders::_1); @@ -139,14 +199,23 @@ std::string Server::keysCommand(const std::vector &commands) { } std::string Server::infoCommand(const std::vector &commands) { - constexpr char replication[] = "$11\r\nrole:master\r\n"; + std::vector info; + if (isReplica()) { + info.push_back("role:slave"); + } else { + info.push_back("role:master"); + } + info.push_back("master_replid:" + masterReplId); + info.push_back("master_repl_offset:" + std::to_string(masterReplOffset)); + info.push_back(""); + std::string infoBString = RESP::toStringArray(info); if (commands.size() == 2 && commands[1] == "replication") { // Return replication info only - return replication; + return infoBString; } - return replication; // return all available options, more stuff in the future. + return infoBString; // return all available options, more stuff in the future. } std::optional Server::handleRequest(const std::string &message) { diff --git a/src/Server.cpp b/src/Server.cpp index 392550b..be36fb8 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -39,9 +39,18 @@ int main(int argc, char **argv) { } bool debug = result["debug"].as(); int port = result["port"].as(); - std::optional master; + std::optional masterIp; + std::optional masterPort; if (result.count("replicaof")) { - master = result["replicaof"].as(); + std::string replica = result["replicaof"].as(); + std::size_t index = replica.find(" "); + if (index == std::string::npos) { + LOG_ERROR("Replica should have the format ServerIP ServerPort"); + exit(EXIT_FAILURE); + } + masterIp = replica.substr(0, index); + masterPort = std::stoi(replica.substr(index, replica.size() - index)); + LOG_INFO("Starting a replica server on {}:{}", *masterIp, *masterPort); } // Change the LogLevel to print everything @@ -50,7 +59,8 @@ int main(int argc, char **argv) { // Creating the server try { - Redis::Server::SharedPtr redisServer = std::make_shared(); + Redis::Server::SharedPtr redisServer = + std::make_shared(port, masterIp, masterPort); LOG_INFO("Starting the server on port {}", port); asio::io_context io_context; TCPServer server(io_context, port, redisServer); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f2a28de..0189c65 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -33,6 +33,12 @@ target_link_libraries( redis_server quill_wrapper_recommended ) +add_executable(tcp_client_test tcp_client_test.cpp test_main.cpp) +target_link_libraries( + tcp_client_test + gtest gmock + redis_server quill_wrapper_recommended +) include(GoogleTest) gtest_discover_tests(parsing_test) diff --git a/tests/redis_server_tests.cpp b/tests/redis_server_tests.cpp index 975c88d..db5360a 100644 --- a/tests/redis_server_tests.cpp +++ b/tests/redis_server_tests.cpp @@ -44,11 +44,18 @@ TEST(REDIS_SERVER, INFO) { Redis::Server server; auto res = server.handleRequest("*2\r\n$4\r\nINFO\r\n$11\r\replication\r\n"); ASSERT_TRUE(res.has_value()); - EXPECT_EQ(*res, std::string("$11\r\nrole:master\r\n")); + EXPECT_NE(res->find("$11\r\nrole:master\r\n"), std::string::npos); // This should return only replication info for now // In the future we should check on the other info res = server.handleRequest("*1\r\n$4\r\nINFO\r\n"); ASSERT_TRUE(res.has_value()); - EXPECT_EQ(*res, std::string("$11\r\nrole:master\r\n")); -} \ No newline at end of file + EXPECT_NE(res->find("$11\r\nrole:master\r\n"), std::string::npos); + + // Create a replica server + Redis::Server replica(6379, "192.168.1.1", 7534); + ASSERT_TRUE(replica.isReplica()); + res = replica.handleRequest("*2\r\n$4\r\nINFO\r\n$11\r\replication\r\n"); + ASSERT_TRUE(res.has_value()); + EXPECT_NE(res->find("$10\r\nrole:slave\r\n"), std::string::npos); +} diff --git a/tests/tcp_client_test.cpp b/tests/tcp_client_test.cpp new file mode 100644 index 0000000..c1c0fff --- /dev/null +++ b/tests/tcp_client_test.cpp @@ -0,0 +1,24 @@ +#include "RESP/Constants.hpp" +#include "RedisServer.hpp" +#include +#include +#include +#include + +TEST(TCP_CLIENT, PING) { + Redis::Server::SharedPtr redisServer = std::make_shared(); + asio::io_context io_context; + TCPServer server(io_context, 12345, redisServer); + server.start(); + std::thread t([&] { io_context.run(); }); + + TCPClient client(io_context, "localhost", 12345); + auto error = client.send("*1\r\n$4\r\nPING\r\n"); + ASSERT_FALSE(error); + std::string message; + error = client.read(message); + ASSERT_FALSE(error); + ASSERT_EQ(message, "+PONG\r\n"); + io_context.stop(); + t.join(); +}