Skip to content

Commit

Permalink
initial replication support
Browse files Browse the repository at this point in the history
  • Loading branch information
HemaZ committed Sep 25, 2024
1 parent acc835a commit 6779965
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 12 deletions.
34 changes: 34 additions & 0 deletions include/Helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,45 @@
#define __REDIS_SERVER_HELPER_HPP__
#include <algorithm>
#include <cctype>
#include <ctime>
#include <random>
#include <string>

/**
* @brief Convert a string to lowercase.
*
* This function takes a string and converts all its characters to lowercase.
* It uses the std::transform algorithm with a lambda function that applies
* std::tolower to each character.
*
* @param s The input string to be converted.
* @return std::string The lowercase version of the input string.
*/
inline std::string strTolower(std::string s) {
std::transform(s.begin(), s.end(), s.begin(),
[](unsigned char c) { return std::tolower(c); } // correct
);
return s;
}

/**
* @brief Generate a random string of a given length
*
* @param length The length of the string to generate
* @return std::string A randomly generated string of the specified length
* containing alphanumeric characters (0-9, A-Z, a-z)
*/
inline std::string randomString(std::size_t length) {
std::srand(std::time(0)); // use current time as seed for random generator
auto randChar = []() -> char {
const char charset[] = "0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randChar);
return str;
}
#endif
70 changes: 68 additions & 2 deletions include/RedisServer.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef REDIS_SERVER_HPP
#define REDIS_SERVER_HPP
#include "Config.hpp"
#include "TCPClient.hpp"
#include "Types.hpp"
#include <chrono>
#include <memory>
Expand All @@ -14,12 +15,17 @@ namespace Redis {
class Server {
public:
using SharedPtr = std::shared_ptr<Redis::Server>;

/**
* @brief Construct a new Redis Server.
*
* @param port The server port.
* @param masterIp Optional master server ip to create a replica.
* @param masterPort Optional master server port to create a replica.
*/
Server();
Server(int port = 6379, std::optional<std::string> masterIp = std::nullopt,
std::optional<int> masterPort = std::nullopt);

virtual ~Server();
/**
* @brief Giving a message from redis client, parse it and return the expected
* response.
Expand All @@ -31,6 +37,15 @@ class Server {
*/
std::optional<std::string> handleRequest(const std::string &message);

/**
* @brief Is this server a replica of another master redis server.
*
* @return Bool True if it's a replica.
*/
bool isReplica() const {
return masterIp.has_value() && masterPort.has_value();
}

private:
/**
* @brief Initialize the cmdsLUT which holds redis command as a key
Expand Down Expand Up @@ -124,6 +139,8 @@ class Server {
*/
std::string infoCommand(const std::vector<std::string> &commands);

bool handShakeMaster();

/**
* @brief The main server database. Reading from the database should be thread
* safe. Inserting in the database should only be done through the function
Expand All @@ -149,6 +166,55 @@ class Server {
std::unordered_map<
std::string, std::function<std::string(const std::vector<std::string> &)>>
cmdsLUT;

/**
* @brief The port number on which this Redis server is listening.
*/
int port;

/**
* @brief The IP address of the master Redis server, if this server is a
* replica. This is std::nullopt if this server is not a replica.
*/
std::optional<std::string> masterIp;

/**
* @brief The port number of the master Redis server, if this server is a
* replica. This is std::nullopt if this server is not a replica.
*/
std::optional<int> masterPort;

/**
* @brief The replication ID of the master server.
* This is used to identify the replication stream.
*/
std::string masterReplId;

/**
* @brief The current replication offset of this server.
* This represents how much of the master's replication stream has been
* processed.
*/
int masterReplOffset = 0;

/**
* @brief A TCP client used to connect to the master server when this server
* is a replica. This is std::nullopt if this server is not a replica.
*/
std::optional<TCPClient> replicaClient = std::nullopt;

/**
* @brief The I/O context for asynchronous operations.
* This is used by Asio for managing asynchronous operations.
*/
asio::io_context ioContext;

/**
* @brief A thread dedicated to running the I/O context.
* This allows asynchronous operations to be processed independently of the
* main thread.
*/
std::thread ioThread;
};
} // namespace Redis

Expand Down
78 changes: 78 additions & 0 deletions include/TCPClient.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef __REDIS_SERVER_TCP_CLIENT_HPP__
#define __REDIS_SERVER_TCP_CLIENT_HPP__
#include "Logging.hpp"
#include <asio.hpp>
#include <asio/post.hpp>

using asio::ip::tcp;

/**
* @brief A class representing a TCP client using the Asio library.
*
* This class provides functionality to connect to a TCP server,
* send messages, and receive responses.
*/
class TCPClient {
public:
/**
* @brief Constructs a TCPClient and establishes a connection to the server.
*
* @param io_context The Asio io_context to use for asynchronous operations.
* @param ip The IP address of the server to connect to.
* @param port The port number of the server to connect to.
*/
TCPClient(asio::io_context &io_context, std::string ip, int port)
: ioContext_(io_context), socket_(ioContext_) {
tcp::resolver resolver(ioContext_);
tcp::resolver::results_type endpoints =
resolver.resolve(ip, std::to_string(port));
asio::connect(socket_, endpoints);
}

/**
* @brief Sends a message to the connected server.
*
* @param msg The message to send.
* @return An error code indicating the success or failure of the operation.
*/
std::error_code send(const std::string &msg) {
asio::error_code error;
asio::write(socket_, asio::buffer(msg), error);
return error;
}

/**
* @brief Reads a message from the connected server.
*
* This method reads until it encounters a "\r\n" sequence or reaches EOF.
*
* @param msg A reference to a string where the received message will be
* stored.
* @return An error code indicating the success or failure of the operation.
*/
std::error_code read(std::string &msg) {
asio::streambuf receive_buffer;
asio::error_code error;
asio::read_until(socket_, receive_buffer, "\r\n", error);
if (error && error != asio::error::eof) {
LOG_ERROR("Receiving failed {}", error.message());
return error;
}
msg = std::string((std::istreambuf_iterator<char>(&receive_buffer)),
std::istreambuf_iterator<char>());
return error;
}

private:
/**
* @brief Reference to the Asio io_context used for asynchronous operations.
*/
asio::io_context &ioContext_;

/**
* @brief The TCP socket used for communication with the server.
*/
tcp::socket socket_;
};

#endif
77 changes: 73 additions & 4 deletions src/RedisServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,28 @@
#include "Logging.hpp"
#include "RDBFile.hpp"
#include "RESP/RESP.hpp"
#include <asio.hpp>
#include <filesystem>
#include <regex>
#include <thread>

namespace fs = std::filesystem;

namespace Redis {

Server::Server() {
Server::Server(int port, std::optional<std::string> masterIp,
std::optional<int> masterPort)
: port(port), masterIp(masterIp), masterPort(masterPort),
masterReplId(randomString(40)) {

initCmdsLUT();

if (isReplica() && !handShakeMaster()) {
ioContext.stop();
ioThread.join();
throw std::runtime_error("Couldn't connect to the master server");
}

fs::path rdbFilePath = fs::path(config_.dir) / fs::path(config_.dbfilename);
auto rdbDatabase = parseRDBFile(rdbFilePath);
if (rdbDatabase) {
Expand All @@ -20,6 +34,52 @@ Server::Server() {
}
}

Server::~Server() {
if (isReplica()) {
ioContext.stop();
ioThread.join();
}
}

bool Server::handShakeMaster() {
replicaClient.emplace(ioContext, *masterIp, *masterPort);
ioThread = std::thread([&] { ioContext.run(); });
LOG_INFO("Pinging the master server on {} {}", *masterIp, *masterPort);
auto readWrite = [&](const std::string_view &cmd,
const std::string_view &expectedReply) -> bool {
auto error = replicaClient->send(std::string(cmd));
if (error) {
LOG_ERROR("Error Sending ping to the master {}", error.message());
return false;
}
std::string reply;
error = replicaClient->read(reply);
if (error) {
LOG_ERROR("Error getting reply from the master {}", error.message());
return false;
}
LOG_INFO("Received reply from master {}", reply);
return expectedReply == reply;
};

if (!readWrite("*1\r\n$4\r\nPING\r\n", "+PONG\r\n")) {
return false;
}
if (!readWrite(RESP::toStringArray(
{"REPLCONF", "listening-port", std::to_string(port)}),
"+OK\r\n")) {
return false;
}
if (!readWrite(RESP::toStringArray({"REPLCONF", "capa", "psync2"}),
"+OK\r\n")) {
return false;
}

readWrite(RESP::toStringArray({"PSYNC", "?", "-1"}), "");

return true;
}

void Server::initCmdsLUT() {
cmdsLUT["ping"] =
std::bind(&Server::pingCommand, this, std::placeholders::_1);
Expand Down Expand Up @@ -139,14 +199,23 @@ std::string Server::keysCommand(const std::vector<std::string> &commands) {
}

std::string Server::infoCommand(const std::vector<std::string> &commands) {
constexpr char replication[] = "$11\r\nrole:master\r\n";
std::vector<std::string> info;
if (isReplica()) {
info.push_back("role:slave");
} else {
info.push_back("role:master");
}
info.push_back("master_replid:" + masterReplId);
info.push_back("master_repl_offset:" + std::to_string(masterReplOffset));
info.push_back("");
std::string infoBString = RESP::toStringArray(info);

if (commands.size() == 2 && commands[1] == "replication") {
// Return replication info only
return replication;
return infoBString;
}

return replication; // return all available options, more stuff in the future.
return infoBString; // return all available options, more stuff in the future.
}

std::optional<std::string> Server::handleRequest(const std::string &message) {
Expand Down
16 changes: 13 additions & 3 deletions src/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,18 @@ int main(int argc, char **argv) {
}
bool debug = result["debug"].as<bool>();
int port = result["port"].as<int>();
std::optional<std::string> master;
std::optional<std::string> masterIp;
std::optional<int> masterPort;
if (result.count("replicaof")) {
master = result["replicaof"].as<std::string>();
std::string replica = result["replicaof"].as<std::string>();
std::size_t index = replica.find(" ");
if (index == std::string::npos) {
LOG_ERROR("Replica should have the format ServerIP ServerPort");
exit(EXIT_FAILURE);
}
masterIp = replica.substr(0, index);
masterPort = std::stoi(replica.substr(index, replica.size() - index));
LOG_INFO("Starting a replica server on {}:{}", *masterIp, *masterPort);
}

// Change the LogLevel to print everything
Expand All @@ -50,7 +59,8 @@ int main(int argc, char **argv) {

// Creating the server
try {
Redis::Server::SharedPtr redisServer = std::make_shared<Redis::Server>();
Redis::Server::SharedPtr redisServer =
std::make_shared<Redis::Server>(port, masterIp, masterPort);
LOG_INFO("Starting the server on port {}", port);
asio::io_context io_context;
TCPServer server(io_context, port, redisServer);
Expand Down
6 changes: 6 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ target_link_libraries(
redis_server quill_wrapper_recommended
)

add_executable(tcp_client_test tcp_client_test.cpp test_main.cpp)
target_link_libraries(
tcp_client_test
gtest gmock
redis_server quill_wrapper_recommended
)

include(GoogleTest)
gtest_discover_tests(parsing_test)
Expand Down
Loading

0 comments on commit 6779965

Please sign in to comment.