Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.0 -> main] SHiP: Fixes: Stack overflow, invalid index, split file access #1802

Merged
merged 14 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 14 additions & 39 deletions libraries/state_history/include/eosio/state_history/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ struct state_history_log_header {
chain::block_id_type block_id = {};
uint64_t payload_size = 0;
};
static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
sizeof(state_history_log_header::block_id) +
sizeof(state_history_log_header::payload_size);
static constexpr int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
sizeof(state_history_log_header::block_id) +
sizeof(state_history_log_header::payload_size);
static_assert(sizeof(state_history_log_header) == state_history_log_header_serial_size);

namespace state_history {
struct prune_config {
Expand Down Expand Up @@ -323,7 +324,7 @@ class state_history_log {
catalog.open(log_dir, conf.retained_dir, conf.archive_dir, name);
catalog.max_retained_files = conf.max_retained_files;
if (_end_block == 0) {
_begin_block = _end_block = catalog.last_block_num() +1;
_index_begin_block = _begin_block = _end_block = catalog.last_block_num() +1;
}
}
}, _config);
Expand Down Expand Up @@ -539,6 +540,7 @@ class state_history_log {
"wrote payload with incorrect size to ${name}.log", ("name", name));
fc::raw::pack(log, pos);

index.seek_end(0);
fc::raw::pack(index, pos);
if (_begin_block == _end_block)
_index_begin_block = _begin_block = block_num;
Expand Down Expand Up @@ -576,10 +578,14 @@ class state_history_log {
if (block_num >= _begin_block && block_num < _end_block) {
state_history_log_header header;
get_entry(block_num, header);
EOS_ASSERT(chain::block_header::num_from_id(header.block_id) == block_num, chain::plugin_exception,
"header id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num));
return header.block_id;
}
return {};
}
EOS_ASSERT(chain::block_header::num_from_id(*result) == block_num, chain::plugin_exception,
"catalog id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num));
return result;
}

Expand Down Expand Up @@ -894,47 +900,16 @@ class state_history_log {
}

void split_log() {

std::filesystem::path log_file_path = log.get_file_path();
std::filesystem::path index_file_path = index.get_file_path();

fc::datastream<fc::cfile> new_log_file;
fc::datastream<fc::cfile> new_index_file;

std::filesystem::path tmp_log_file_path = log_file_path;
tmp_log_file_path.replace_extension("log.tmp");
std::filesystem::path tmp_index_file_path = index_file_path;
tmp_index_file_path.replace_extension("index.tmp");

new_log_file.set_file_path(tmp_log_file_path);
new_index_file.set_file_path(tmp_index_file_path);

try {
new_log_file.open(fc::cfile::truncate_rw_mode);
new_index_file.open(fc::cfile::truncate_rw_mode);

} catch (...) {
wlog("Unable to open new state history log or index file for writing during log spliting, "
"continue writing to existing block log file\n");
return;
}

index.close();
log.close();

catalog.add(_begin_block, _end_block - 1, log.get_file_path().parent_path(), name);

_begin_block = _end_block;
_index_begin_block = _begin_block = _end_block;

using std::swap;
swap(new_log_file, log);
swap(new_index_file, index);

std::filesystem::rename(tmp_log_file_path, log_file_path);
std::filesystem::rename(tmp_index_file_path, index_file_path);

log.set_file_path(log_file_path);
index.set_file_path(index_file_path);
log.open(fc::cfile::truncate_rw_mode);
log.seek_end(0);
index.open(fc::cfile::truncate_rw_mode);
}
}; // state_history_log

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ class session_manager {
private:
using entry_ptr = std::unique_ptr<send_queue_entry_base>;

boost::asio::io_context& ship_io_context;
std::set<std::shared_ptr<session_base>> session_set;
bool sending = false;
std::deque<std::pair<std::shared_ptr<session_base>, entry_ptr>> send_queue;

public:
explicit session_manager(boost::asio::io_context& ship_io_context)
: ship_io_context(ship_io_context) {}

void insert(std::shared_ptr<session_base> s) {
session_set.insert(std::move(s));
}
Expand Down Expand Up @@ -103,8 +107,12 @@ class session_manager {
void pop_entry(bool call_send = true) {
send_queue.erase(send_queue.begin());
sending = false;
if (call_send || !send_queue.empty())
send();
if (call_send || !send_queue.empty()) {
// avoid blowing the stack
boost::asio::post(ship_io_context, [this]() {
send();
});
}
}

void send_updates() {
Expand Down
3 changes: 2 additions & 1 deletion plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
string endpoint_address;
string unix_path;
state_history::trace_converter trace_converter;
session_manager session_mgr;

mutable std::mutex mtx;
block_id_type head_id;
Expand All @@ -71,6 +70,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

named_thread_pool<struct ship> thread_pool;

session_manager session_mgr{thread_pool.get_executor()};

bool plugin_started = false;

public:
Expand Down
96 changes: 89 additions & 7 deletions plugins/state_history_plugin/tests/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,22 @@ struct mock_state_history_plugin {

eosio::state_history::block_position block_head;
fc::temp_directory log_dir;
std::optional<eosio::state_history_log> log;
std::optional<eosio::state_history_log> trace_log;
std::optional<eosio::state_history_log> state_log;
std::atomic<bool> stopping = false;
eosio::session_manager session_mgr;
eosio::session_manager session_mgr{ship_ioc};

constexpr static uint32_t default_frame_size = 1024;

std::optional<eosio::state_history_log>& get_trace_log() { return log; }
std::optional<eosio::state_history_log>& get_chain_state_log() { return log; }
std::optional<eosio::state_history_log>& get_trace_log() { return trace_log; }
std::optional<eosio::state_history_log>& get_chain_state_log() { return state_log; }
fc::sha256 get_chain_id() const { return {}; }

boost::asio::io_context& get_ship_executor() { return ship_ioc; }

void setup_state_history_log(eosio::state_history_log_config conf = {}) {
log.emplace("ship", log_dir.path(), conf);
trace_log.emplace("ship_trace", log_dir.path(), conf);
state_log.emplace("ship_state", log_dir.path(), conf);
}

fc::logger logger = fc::logger::get(DEFAULT_LOGGER);
Expand All @@ -130,7 +132,20 @@ struct mock_state_history_plugin {
return fc::time_point{};
}

std::optional<eosio::chain::block_id_type> get_block_id(uint32_t block_num) { return block_id_for(block_num); }
std::optional<eosio::chain::block_id_type> get_block_id(uint32_t block_num) {
std::optional<eosio::chain::block_id_type> id;
if( trace_log ) {
id = trace_log->get_block_id( block_num );
if( id )
return id;
}
if( state_log ) {
id = state_log->get_block_id( block_num );
if( id )
return id;
}
return block_id_for(block_num);
}

eosio::state_history::block_position get_block_head() { return block_head; }
eosio::state_history::block_position get_last_irreversible() { return block_head; }
Expand Down Expand Up @@ -284,13 +299,24 @@ struct state_history_test_fixture {
header.payload_size += sizeof(uint64_t);
}

server.log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
std::unique_lock gt(server.trace_log->_mx);
server.trace_log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
f.write((const char*)&type, sizeof(type));
if (type == 1) {
f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count));
}
f.write(compressed.data(), compressed.size());
});
gt.unlock();
std::unique_lock gs(server.state_log->_mx);
server.state_log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
f.write((const char*)&type, sizeof(type));
if (type == 1) {
f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count));
}
f.write(compressed.data(), compressed.size());
});
gs.unlock();

if (written_data.size() < index)
written_data.resize(index);
Expand Down Expand Up @@ -428,6 +454,62 @@ BOOST_FIXTURE_TEST_CASE(test_session_no_prune, state_history_test_fixture) {
FC_LOG_AND_RETHROW()
}

BOOST_FIXTURE_TEST_CASE(test_split_log, state_history_test_fixture) {
try {
// setup block head for the server
constexpr uint32_t head = 1023;
eosio::state_history::partition_config conf;
conf.stride = 25;
server.setup_state_history_log(conf);
uint32_t head_block_num = head;
server.block_head = {head_block_num, block_id_for(head_block_num)};

// generate the log data used for traces and deltas
uint32_t n = mock_state_history_plugin::default_frame_size;
add_to_log(1, n * sizeof(uint32_t), generate_data(n)); // original data format
add_to_log(2, 0, generate_data(n)); // format to accommodate the compressed size greater than 4GB
add_to_log(3, 1, generate_data(n)); // format to encode decompressed size to avoid decompress entire data upfront.
for (size_t i = 4; i <= head; ++i) {
add_to_log(i, 1, generate_data(n));
}

send_request(eosio::state_history::get_blocks_request_v0{.start_block_num = 1,
.end_block_num = UINT32_MAX,
.max_messages_in_flight = UINT32_MAX,
.have_positions = {},
.irreversible_only = false,
.fetch_block = true,
.fetch_traces = true,
.fetch_deltas = true});

eosio::state_history::state_result result;
// we should get 1023 consecutive block result
eosio::chain::block_id_type prev_id;
for (int i = 0; i < head; ++i) {
receive_result(result);
BOOST_REQUIRE(std::holds_alternative<eosio::state_history::get_blocks_result_v0>(result));
auto r = std::get<eosio::state_history::get_blocks_result_v0>(result);
BOOST_REQUIRE_EQUAL(r.head.block_num, server.block_head.block_num);
if (i > 0) {
BOOST_TEST(prev_id.str() == r.prev_block->block_id.str());
}
prev_id = r.this_block->block_id;
BOOST_REQUIRE(r.traces.has_value());
BOOST_REQUIRE(r.deltas.has_value());
auto traces = r.traces.value();
auto deltas = r.deltas.value();
auto& data = written_data[i];
auto data_size = data.size() * sizeof(int32_t);
BOOST_REQUIRE_EQUAL(traces.size(), data_size);
BOOST_REQUIRE_EQUAL(deltas.size(), data_size);

BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data()));
BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data()));
}
}
FC_LOG_AND_RETHROW()
}

BOOST_FIXTURE_TEST_CASE(test_session_with_prune, state_history_test_fixture) {
try {
// setup block head for the server
Expand Down
6 changes: 3 additions & 3 deletions tests/ship_streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def getLatestSnapshot(nodeId):

shipNodeNum = 1
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
# producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node
specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin "

Expand Down Expand Up @@ -206,7 +206,7 @@ def getLatestSnapshot(nodeId):
prodNode0.waitForProducer(forkAtProducer)
prodNode1.waitForProducer(prodNode1Prod)
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge did not shutdown");
Utils.errorExit("Bridge did not shutdown")
Print("Fork started")

forkProgress="defproducer" + chr(ord(forkAtProducer[-1])+3)
Expand All @@ -215,7 +215,7 @@ def getLatestSnapshot(nodeId):
Print("Restore fork")
Print("Relaunching the non-producing bridge node to connect the producing nodes again")
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge is already running");
Utils.errorExit("Bridge is already running")
if not nonProdNode.relaunch():
Utils.errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted")

Expand Down