Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[humble] Bugfix for bag_split event callbacks called to early with file compression (backport #1643) #1733

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

// Prepares the metadata by setting initial values.
void init_metadata() override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include "logging.hpp"

namespace fs = rcpputils::fs;

namespace rosbag2_compression
{

Expand Down Expand Up @@ -76,7 +78,7 @@ void SequentialCompressionWriter::compression_thread_fn()

while (true) {
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
std::string closed_file_relative_to_bag;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
compressor_condition_.wait(
Expand All @@ -92,7 +94,7 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
compressor_condition_.notify_all();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
closed_file_relative_to_bag = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
Expand All @@ -109,8 +111,35 @@ void SequentialCompressionWriter::compression_thread_fn()
std::lock_guard<std::recursive_mutex> storage_lock(storage_mutex_);
SequentialWriter::write(message);
}
} else if (!file.empty()) {
compress_file(*compressor, file);
} else if (!closed_file_relative_to_bag.empty()) {
compress_file(*compressor, closed_file_relative_to_bag);

// Execute callbacks from the base class
static const std::string compressor_ext = "." + compressor->get_compression_identifier();
auto closed_file =
(fs::path(base_folder_) / (closed_file_relative_to_bag + compressor_ext)).string();
std::string new_file;
// To determine, a new_file we can't rely on the metadata_.relative_file_paths.back(),
// because other compressor threads may have already pushed a new item above.
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(),
closed_file_relative_to_bag + compressor_ext);
if (iter != metadata_.relative_file_paths.end()) {
++iter;
if (iter != metadata_.relative_file_paths.end()) {
new_file = (fs::path(base_folder_) / *iter).string();
}
}
}
if (!new_file.empty()) {
// The new_file is empty when we compressed the last file after calling close().
// Note: We shall not call 'execute_bag_split_callbacks(closed_file, new_file)' for the
// last compressed file because it will be called inside base class
// SequentialWriter::close().
SequentialWriter::execute_bag_split_callbacks(closed_file, new_file);
}
}
}
}
Expand Down Expand Up @@ -292,14 +321,17 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();
const auto last_file_relative_to_bag = metadata_.relative_file_paths.back();
const auto new_file = SequentialWriter::split_bagfile_local(false);

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compressor_file_queue_.push(last_file);
compressor_file_queue_.push(last_file_relative_to_bag);
compressor_condition_.notify_one();
} else {
auto last_file = (fs::path(base_folder_) / last_file_relative_to_bag).string();
SequentialWriter::execute_bag_split_callbacks(last_file, new_file);
}

if (!storage_) {
Expand All @@ -324,6 +356,7 @@ void SequentialCompressionWriter::write(
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
SequentialWriter::write(message);
} else {
// since the compression operation will manipulate memory inplace, thus
Expand Down Expand Up @@ -389,17 +422,4 @@ void SequentialCompressionWriter::write(
}
}

bool SequentialCompressionWriter::should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time)
{
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
return SequentialWriter::should_split_bagfile(current_time);
}
}

} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

using namespace testing; // NOLINT

namespace fs = rcpputils::fs;

static constexpr const char * DefaultTestCompressor = "fake_comp";

class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
Expand All @@ -54,7 +56,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
tmp_dir_storage_options_{},
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
tmp_dir_storage_options_.uri = (tmp_dir_ / bag_base_dir_).string();
rcpputils::fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
Expand All @@ -77,8 +79,8 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);
return message;
}

Expand All @@ -90,7 +92,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_size_.store(0);
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
Expand All @@ -104,11 +106,11 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_.fetch_add(1);
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
Expand All @@ -133,7 +135,6 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -143,8 +144,10 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
rosbag2_storage::BagMetadata intercepted_metadata_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
std::string serialization_format_;
uint64_t fake_storage_size_{};
std::atomic<uint64_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
std::string fake_storage_uri_;
const std::string bag_base_dir_ = "test_bag";

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand Down Expand Up @@ -297,11 +300,10 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
ss << bag_base_dir_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
Expand Down Expand Up @@ -339,6 +341,147 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
1
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = make_test_msg();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", {}});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i));
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") :
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.string()) << "i = " << i;
}
}

TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
const size_t num_splits = 2;
const int message_count = max_bagfile_size * num_splits + max_bagfile_size - 1; // 8

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1
};

initializeFakeFileStorage();
initializeWriter(compression_options);

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", {}});

for (auto i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_->close();

EXPECT_EQ(intercepted_metadata_.relative_file_paths.size(), num_splits + 1);

EXPECT_THAT(closed_files.size(), num_splits + 1);
EXPECT_THAT(opened_files.size(), num_splits + 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == num_splits + 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
for (size_t i = 0; i < intercepted_metadata_.relative_file_paths.size(); i++) {
std::cout << "metadata file path [" << i << "] = '" <<
intercepted_metadata_.relative_file_paths[i] << "'\n";
}
}

ASSERT_GE(opened_files.size(), num_splits + 1);
ASSERT_GE(closed_files.size(), num_splits + 1);
for (size_t i = 0; i < num_splits + 1; i++) {
auto expected_closed =
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i) +
"." + DefaultTestCompressor);
auto expected_opened = (i == num_splits) ?
// The last opened file shall be empty string when we do "writer->close();"
fs::path("") :
fs::path(tmp_dir_storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1));
EXPECT_EQ(closed_files[i], expected_closed.string()) << "i = " << i;
EXPECT_EQ(opened_files[i], expected_opened.string()) << "i = " << i;
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

std::string split_bagfile_local(bool execute_callbacks = true);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

void switch_to_next_storage();

std::string format_storage_uri(
Expand Down
Loading
Loading