Skip to content

Commit

Permalink
sample(DBLogger): Extracted log scanning and inserting functionality …
Browse files Browse the repository at this point in the history
…to class SQLLogInserter.
  • Loading branch information
matejk committed Nov 9, 2024
1 parent 208280e commit 60ac4fc
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 236 deletions.
12 changes: 11 additions & 1 deletion Data/samples/DBLogger/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
add_executable(DBLogger src/DBLogger.cpp)

# Sources
file(GLOB SRCS_G "src/*.cpp")
POCO_SOURCES_AUTO(DBLOGGER_SRCS ${SRCS_G})

# Headers
file(GLOB_RECURSE HDRS_G "src/*.h")
POCO_HEADERS_AUTO(DBLOGGER_SRCS ${HDRS_G})

add_executable(DBLogger ${DBLOGGER_SRCS})

target_link_libraries(DBLogger PUBLIC Poco::Util Poco::DataSQLite)
2 changes: 1 addition & 1 deletion Data/samples/DBLogger/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ifndef POCO_DATA_NO_SQL_PARSER
target_includes = $(POCO_BASE)/Data/src
endif

target = DBLogger
target = DBLogger SQLLogInserter
target_version = 1
target_libs = PocoDataSQLite PocoData PocoFoundation PocoUtil

Expand Down
258 changes: 25 additions & 233 deletions Data/samples/DBLogger/src/DBLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@
#include "Poco/Util/HelpFormatter.h"
#include "Poco/Util/ServerApplication.h"

#include <unordered_set>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <filesystem>
#include <fstream>
#include "SQLLogInserter.h"

using namespace Poco::Data::Keywords;
using Poco::Util::ServerApplication;
Expand Down Expand Up @@ -64,42 +58,36 @@ class DBLogger: public ServerApplication

Poco::Data::SQLite::Connector::registerConnector();

logger().information("Database connector: %s, cs: %s", _connector, _connectionString);
logger().information("Directory: %s", _directory);
logger().information("Number of workers: %z", _numWorkers);
// TODO: Only delete and create table when creating demo messages?
logger().information("Database connector: %s, cs: %s",
_inserter.connector(), _inserter.connectionString());
logger().information("Directory: %s", _inserter.directory());
logger().information("Number of workers: %z", _inserter.numWorkers());

_dataSession = std::make_shared<Poco::Data::Session>(_connector, _connectionString);
Poco::Data::Session session (_inserter.connector(), _inserter.connectionString());

(*_dataSession) << ("DROP TABLE IF EXISTS "s + _tableName), now;
session << ("DROP TABLE IF EXISTS "s + _tableName), now;
const auto create {
"CREATE TABLE "s + _tableName +
" (Source VARCHAR, Name VARCHAR, ProcessId INTEGER, Thread VARCHAR,"s +
" ThreadId INTEGER, Priority INTEGER, Text VARCHAR, DateTime DATE)"s
};

(*_dataSession) << create, now;
session << create, now;


_active = true;
_startTime.update();

_workSet.reserve(MAX_WORKSET_SIZE * 2);

// TODO: Create worker threads
for (std::size_t i = 0; i < _numWorkers; i++)
{
_workers.emplace_back(&DBLogger::processFiles, this);
}

// Thread to scan the directory
_dirScanThread = std::move(std::thread(&DBLogger::runDirectoryScan, this));
_inserter.start();

logger().information("Started directory scanning.");

if (_demoMessagesRequested)
{
// SQL channel to generate SQL files
_sqlChannel = new Poco::Data::SQLChannel();
_sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _directory);
_sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _inserter.directory());
_sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_TABLE, _tableName);

_sqlSourceThread = std::move(std::thread(&DBLogger::createMessages, this));
Expand All @@ -117,25 +105,11 @@ class DBLogger: public ServerApplication
{
_sqlSourceThread.join();
}
_dirScanThread.join();

while (!_workSet.empty())
{
logger().information("Waiting for workers to stop. Work %z", _workSet.size());
Poco::Thread::sleep(200);
}

// stop all worker threads
for (auto& w: _workers)
{
if (w.joinable()) {
w.join();
}
}
_inserter.stop();

logger().information(
"Created %z messages, processed %z messages in %Ld ms.",
_created, _processed, (_startTime.elapsed() / 1000)
_created, _inserter.totalProcessed(), (_startTime.elapsed() / 1000)
);

Application::uninitialize();
Expand Down Expand Up @@ -214,27 +188,30 @@ class DBLogger: public ServerApplication
{
if (value.empty())
throw Poco::Util::IncompatibleOptionsException("Expected directory name");
_directory = value;

_inserter.setDirectory(value);
}

void handleDbConnector(const std::string& name, const std::string& value)
{
if (value.empty())
throw Poco::Util::IncompatibleOptionsException("Expected database connector name");
_connector = value;

_inserter.setConnector(value);
}

void handleDbConnectionString(const std::string& name, const std::string& value)
{
if (value.empty())
throw Poco::Util::IncompatibleOptionsException("Expected database connection string");
_connectionString = value;

_inserter.setConnectionString(value);
}

void handleNumberOfWorkers(const std::string& name, const std::string& value)
{
const auto num = Poco::NumberParser::parseUnsigned(value);
this->_numWorkers = std::min(1U, num);
_inserter.setNumWorkers( std::min(1U, num) );
}

void displayHelp()
Expand All @@ -245,149 +222,6 @@ class DBLogger: public ServerApplication
helpFormatter.format(std::cout);
}

std::size_t insertEntries(std::vector<std::string>& entries)
{
std::unique_lock<std::mutex> l(_workMutex);

while (_workSet.size() > MAX_WORKSET_SIZE && _active)
{
// Prevent creating too large work set
_underflowCondition.wait_for(l, std::chrono::milliseconds(200));
}

const auto wss = _workSet.size();
// Do not re-insert entries that are being processed.
entries.erase(
std::remove_if(
entries.begin(),
entries.end(),
[this](const std::string& e) {
return this->_processingSet.find(e) != this->_processingSet.end();
}
),
entries.end()
);

logger().information("Enqueued new entries: %z", entries.size());
_workSet.insert(entries.begin(), entries.end());
_workCondition.notify_all();
return _workSet.size() - wss;
}

std::string popEntry()
{
std::unique_lock<std::mutex> l(_workMutex);
while (_workSet.empty() && _active)
{
_workCondition.wait_for(l, std::chrono::milliseconds(200));
}
if (_workSet.empty())
{
// Exited loop because of !_active
return {};
}
auto entry = (*_workSet.begin());
_processingSet.insert(entry);
_workSet.erase(_workSet.begin());
if (_workSet.size() < MAX_WORKSET_SIZE)
{
_underflowCondition.notify_all();
}
return entry;
}

void removeEntry(std::string entry)
{
std::unique_lock<std::mutex> l(_workMutex);
auto i = _processingSet.find(entry);
if (i != _processingSet.end())
{
_processingSet.erase(i);
}
}

void processFile(std::string& entry)
{
if (entry.empty())
{
return;
}
if (!std::filesystem::exists(entry))
{
// Directory iterator can still pick up files that were already processed
removeEntry(entry);
return;
}
bool success {false};
try
{
std::ifstream is(entry);
std::stringstream buffer;
buffer << is.rdbuf();
const auto& sql { buffer.str() };

if (!sql.empty())
{
auto& s = (*_dataSession);

s << sql, now;

++_processed;
success = true;
}
}
catch (const Poco::Exception& e)
{
logger().warning("Failed to insert to database %s: %s", entry, e.displayText());
}
if (success)
{
std::filesystem::remove(entry);
}
else
{
std::filesystem::path newPath {entry};
newPath.replace_extension("err"s);
std::filesystem::rename(entry, newPath);
}
removeEntry(entry);
}

std::size_t scanDirectory()
{
std::vector<std::string> newEntries;
newEntries.reserve(1000);
std::filesystem::directory_iterator diriter(_directory, std::filesystem::directory_options::skip_permission_denied);
for (auto& entry: diriter)
{
if (!_active)
{
return 0;
}
if (_dataSession == nullptr || !_dataSession->isGood())
{
// Do not process files if database session is not healthy.
// Files will be processed later.
return 0;
}

if (!std::filesystem::exists(entry))
{
continue;
}
if (!entry.is_regular_file())
{
continue;
}
if (entry.path().extension() != ".sql"s)
{
continue;
}
newEntries.push_back(entry.path());
}
return insertEntries(newEntries);
}

void createMessages()
{
int i {0};
Expand All @@ -404,72 +238,30 @@ class DBLogger: public ServerApplication
}
}

void runDirectoryScan()
{
while (_active)
{
const auto scheduled = scanDirectory();
if (scheduled == 0)
{
// No new files to be scheduled. Wait a bit.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
}

void processFiles()
{
while (!_workSet.empty() || _active)
{
auto entry = popEntry();
processFile(entry);
}
}

int main(const std::vector<std::string>& args) override
{
if (!_helpRequested)
{
logger().information("Press any key to stop scanning.");
std::cin.get();
waitForTerminationRequest();

_active = false;
}
return Application::EXIT_OK;
}

private:
static constexpr std::size_t MAX_WORKSET_SIZE {1000};
bool _helpRequested {false};
bool _demoMessagesRequested {false};

std::string _directory;
std::string _connector;
std::string _connectionString;
std::size_t _numWorkers {2};

const std::string _tableName{"T_POCO_LOG"};
SQLLogInserter _inserter;

Poco::Timestamp _startTime;
bool _active {false};

std::size_t _created{0};
std::size_t _processed{0};

Poco::Timestamp _startTime;

std::thread _sqlSourceThread;
std::thread _dirScanThread;

WorkSet _workSet;
WorkSet _processingSet;
std::mutex _workMutex;
std::condition_variable _workCondition;
std::condition_variable _underflowCondition;

std::vector<std::thread> _workers;

const std::string _tableName{"T_POCO_LOG"};
Poco::AutoPtr<Poco::Data::SQLChannel> _sqlChannel;
std::shared_ptr<Poco::Data::Session> _dataSession;
};


Expand Down
Loading

0 comments on commit 60ac4fc

Please sign in to comment.