Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for deserializing and decoding v0.1.0 IR streams, but without log-level parsing and filtering. #30

Merged
merged 43 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fc08027
Add StructuredIrStreamReader.
junhaoliao Nov 7, 2024
8d20583
Reformat; Add missing files to VCS.
junhaoliao Nov 7, 2024
4757bd4
Fix clp submodule commit id.
junhaoliao Nov 7, 2024
12eeca6
Change log level return type int64_t -> uint8_t.
junhaoliao Nov 7, 2024
d870b19
update clp commit id.
junhaoliao Nov 7, 2024
edda4b3
Use version checking utility from CLP FFI.
junhaoliao Nov 7, 2024
8a1e9a6
Add docstrings for class IrUnitHandler.
junhaoliao Nov 7, 2024
5f005e7
Correct ReaderOptions TS type.
junhaoliao Nov 7, 2024
9581578
Swap timestamp & log_level return order.
junhaoliao Nov 7, 2024
dc652ad
Return IR version via `get_ir_protocol_error_code`.
junhaoliao Nov 7, 2024
7d1fa0e
Fix lint.
junhaoliao Nov 7, 2024
c675c61
Do not mark schema_tree_node_locator [[maybe_unused]] - Apply suggest…
junhaoliao Nov 7, 2024
5c23182
Remove unused using string_literals - Apply suggestions from code review
junhaoliao Nov 7, 2024
a4c2c07
Use braces for declarations - Apply suggestions from code review
junhaoliao Nov 7, 2024
b940296
Use braces for declarations - Apply suggestions from code review
junhaoliao Nov 7, 2024
1087f94
Error messages - Apply suggestions from code review
junhaoliao Nov 7, 2024
5e8d42a
Remove dead code - Apply suggestions from code review
junhaoliao Nov 7, 2024
570b8eb
Add clp/ prefix in includes - Apply suggestions from code review
junhaoliao Nov 7, 2024
59b163b
Docs - Apply suggestions from code review
junhaoliao Nov 7, 2024
92e44b3
Add docs about potential shared_ptr replacement with gsl::not_null - …
junhaoliao Nov 7, 2024
d170434
Rename get_ir_protocol_error_code -> get_ir_stream_type; return IrStr…
junhaoliao Nov 7, 2024
b110479
Move version_validation_result into try {}; update docs.
junhaoliao Nov 7, 2024
181206e
Remove redundant docs for get_deserializer and get_reader.
junhaoliao Nov 7, 2024
cac21ee
Rename cLogLevelFilteringNotSupportedPrompt -> cLogLevelFilteringNotS…
junhaoliao Nov 7, 2024
b6d837c
Add const for cReaderOptionsLogLevelKey and cReaderOptionsTimestampKey.
junhaoliao Nov 7, 2024
11b347b
Use const instead of magic strings.
junhaoliao Nov 7, 2024
6588d56
Reformat code.
junhaoliao Nov 7, 2024
2bca1cc
Rename `json` -> `json_result`.
junhaoliao Nov 7, 2024
94f31af
Reorder member variable initializations to match their declaration or…
junhaoliao Nov 7, 2024
072c7bc
Rename parsed_tree_node_id_t -> schema_tree_node_id_t.
junhaoliao Nov 7, 2024
08e0bdd
Rename level_node_id -> log_level_node_id.
junhaoliao Nov 7, 2024
95b8102
Move StructuredIrStreamReader::create declaration to the top of the c…
junhaoliao Nov 7, 2024
37e6f30
Remove all log level handling in StructuredIrStreamReader.
junhaoliao Nov 7, 2024
058ec78
Type check timestamp value before calling get_immutable_view.
junhaoliao Nov 7, 2024
4d2c076
Put empty json string when unable to decode.
junhaoliao Nov 8, 2024
49a72c3
Lint.
junhaoliao Nov 8, 2024
12dfb9e
Rename `IrStreamType` -> `StreamType` in the `ir` namespace.
junhaoliao Nov 8, 2024
2df4df8
Use anonymous namespace for internal linkages for const strings.
junhaoliao Nov 8, 2024
7edb19e
Improve error message when serialize_to_json fails - Apply suggestion…
junhaoliao Nov 8, 2024
77e2b2e
Fix sentence case in docs - Apply suggestions from code review
junhaoliao Nov 8, 2024
320f1ea
Update clp commit id which has https://github.com/y-scope/clp/pull/57…
junhaoliao Nov 8, 2024
81135fa
lint.
junhaoliao Nov 8, 2024
cca920a
Nest anonymous namespace into clp_ffi_js::ir to reduce scope of const…
junhaoliao Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ endif()

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp
src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp
src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp
src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp
src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp
src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp
Expand Down
51 changes: 35 additions & 16 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "StreamReader.hpp"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <format>
Expand All @@ -23,6 +22,7 @@
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamReader.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

namespace {
Expand Down Expand Up @@ -117,8 +117,12 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>("{timestampKey: string} | null");

// JS types used as outputs
emscripten::enum_<clp_ffi_js::ir::StreamType>("IrStreamType")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking, the TS type should remain IrStreamType, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I keep the Ir prefix intentionally.

To be precise, types registered via this method are not TS types. For every enum, emscripten creates an object, which is comparable with any transpiled enum value. For example, Module.IrStreamType.STRUCTURED / Module.IrStreamType.UNSTRUCTURED can be used to check against the return values of reader.getIrStreamType() in our case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.value("STRUCTURED", clp_ffi_js::ir::StreamType::Structured)
.value("UNSTRUCTURED", clp_ffi_js::ir::StreamType::Unstructured);
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
Expand All @@ -128,6 +132,7 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function("getIrStreamType", &clp_ffi_js::ir::StreamReader::get_ir_stream_type)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::StreamReader::get_num_events_buffered
Expand All @@ -143,7 +148,8 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
} // namespace

namespace clp_ffi_js::ir {
auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options)
-> std::unique_ptr<StreamReader> {
Comment on lines +151 to +152
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential API Breaking Change in StreamReader::create Signature

The create method of StreamReader now requires an additional parameter, ReaderOptions. This change may break existing code that calls StreamReader::create without the new parameter. Consider providing a default value or an overloaded method to maintain backward compatibility.

Apply this diff to add a default parameter value:

-auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options)
+auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options = {})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options)
-> std::unique_ptr<StreamReader> {
auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options = {})
-> std::unique_ptr<StreamReader> {

auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);

Expand All @@ -159,20 +165,30 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<

rewind_reader_and_validate_encoding_type(*zstd_decompressor);

// Validate the stream's version
// Validate the stream's version and decide which type of IR stream reader to create.
auto pos = zstd_decompressor->get_pos();
auto const version{get_version(*zstd_decompressor)};
if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
}
try {
zstd_decompressor->seek_from_begin(pos);
} catch (ZstdDecompressor::OperationFailed& e) {
auto const version_validation_result{clp::ffi::ir_stream::validate_protocol_version(version)
};
if (clp::ffi::ir_stream::IRProtocolErrorCode::Supported == version_validation_result) {
zstd_decompressor->seek_from_begin(0);
return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer),
reader_options
));
}
if (clp::ffi::ir_stream::IRProtocolErrorCode::BackwardCompatible
== version_validation_result)
{
zstd_decompressor->seek_from_begin(pos);
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
));
}
} catch (ZstdDecompressor::OperationFailed const& e) {
Comment on lines +174 to +191
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure consistent use of ReaderOptions

While StructuredIrStreamReader::create is called with reader_options, the UnstructuredIrStreamReader::create is called without it. If ReaderOptions are applicable to both reader types, consider passing reader_options to UnstructuredIrStreamReader::create as well for consistency.

throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
Expand All @@ -181,8 +197,11 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
};
}

return std::make_unique<UnstructuredIrStreamReader>(
UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer))
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
}
} // namespace clp_ffi_js::ir
16 changes: 11 additions & 5 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#ifndef CLP_FFI_JS_IR_STREAMREADER_HPP
#define CLP_FFI_JS_IR_STREAMREADER_HPP

#include <array>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string_view>

#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <emscripten/val.h>
Expand All @@ -13,13 +12,16 @@ namespace clp_ffi_js::ir {
// JS types used as inputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions);

// JS types used as outputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);

constexpr std::array<std::string_view, 6> cUnstructuredIrVersions
= {"v0.0.2", "v0.0.1", "v0.0.0", "0.0.2", "0.0.1", "0.0.0"};
enum class StreamType : uint8_t {
Structured,
Unstructured,
};

/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
Expand All @@ -36,7 +38,9 @@ class StreamReader {
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array
[[nodiscard]] static auto create(
DataArrayTsType const& data_array,
ReaderOptions const& reader_options
) -> std::unique_ptr<StreamReader>;

// Destructor
Expand All @@ -52,6 +56,8 @@ class StreamReader {
auto operator=(StreamReader&&) -> StreamReader& = delete;

// Methods
[[nodiscard]] virtual auto get_ir_stream_type() const -> StreamType = 0;

/**
* @return The number of events buffered.
*/
Expand Down
5 changes: 2 additions & 3 deletions src/clp_ffi_js/ir/StreamReaderDataContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ class StreamReaderDataContext {
~StreamReaderDataContext() = default;

// Methods
/**
* @return A reference to the deserializer.
*/
[[nodiscard]] auto get_deserializer() -> Deserializer& { return m_deserializer; }

[[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; }

private:
clp::Array<char> m_data_buffer;
std::unique_ptr<clp::ReaderInterface> m_reader;
Expand Down
195 changes: 195 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#include "StructuredIrStreamReader.hpp"

#include <cstddef>
#include <format>
#include <memory>
#include <string>
#include <string_view>
#include <system_error>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/Value.hpp>
#include <clp/ir/types.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

namespace {
constexpr std::string_view cEmptyJsonStr{"{}"};
constexpr std::string_view cLogLevelFilteringNotSupportedErrorMsg{
"Log level filtering is not yet supported in this reader."
};
constexpr std::string_view cReaderOptionsTimestampKey{"timestampKey"};
} // namespace

namespace clp_ffi_js::ir {
using clp::ir::four_byte_encoded_variable_t;
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved

auto StructuredIrStreamReader::create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char> data_array,
ReaderOptions const& reader_options
) -> StructuredIrStreamReader {
auto deserialized_log_events{std::make_shared<std::vector<clp::ffi::KeyValuePairLogEvent>>()};
auto result{StructuredIrDeserializer::create(
*zstd_decompressor,
IrUnitHandler{
deserialized_log_events,
reader_options[cReaderOptionsTimestampKey.data()].as<std::string>()
}
)};
if (result.has_error()) {
auto const error_code{result.error()};
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to create deserializer: {} {}",
error_code.category().name(),
error_code.message()
)
};
}
StreamReaderDataContext<StructuredIrDeserializer> data_context{
std::move(data_array),
std::move(zstd_decompressor),
std::move(result.value())
};
return StructuredIrStreamReader{std::move(data_context), std::move(deserialized_log_events)};
}

auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t {
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg);
return FilteredLogEventMapTsType{emscripten::val::null()};
}

void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) {
if (log_level_filter.isNull()) {
return;
}
SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg);
}
Comment on lines +77 to +87
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider throwing an exception for unsupported operations.

Instead of logging errors and returning null values, consider throwing a ClpFfiJsException to explicitly indicate that these operations are not supported.

 auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
-    SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg);
-    return FilteredLogEventMapTsType{emscripten::val::null()};
+    throw ClpFfiJsException{
+        clp::ErrorCode::ErrorCode_NotSupported,
+        __FILENAME__,
+        __LINE__,
+        std::string{cLogLevelFilteringNotSupportedErrorMsg}
+    };
 }

Committable suggestion skipped: line range outside the PR's diff.


auto StructuredIrStreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_deserialized_log_events->size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents);
auto& reader{m_stream_reader_data_context->get_reader()};
while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader
)};
if (false == result.has_error()) {
continue;
}
auto const error{result.error()};
if (std::errc::operation_not_permitted == error) {
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
Comment on lines +107 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Throw exception for incomplete IR streams

When encountering an incomplete IR stream, the code logs an error and exits the loop. Consider throwing a ClpFfiJsException to inform the caller about the incomplete stream explicitly. This change would improve error propagation and allow for more robust error handling upstream.

throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
std::format(
"Failed to deserialize IR unit: {}:{}",
error.category().name(),
error.message()
)
};
}
m_timestamp_node_id = m_stream_reader_data_context->get_deserializer()
.get_ir_unit_handler()
.get_timestamp_node_id();
m_stream_reader_data_context.reset(nullptr);
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter) {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg);
return DecodedResultsTsType{emscripten::val::null()};
}

if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
}

auto const results{emscripten::val::array()};

for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) {
auto const& log_event{m_deserialized_log_events->at(log_event_idx)};

auto const json_result{log_event.serialize_to_json()};
std::string json_str{cEmptyJsonStr};
if (false == json_result.has_value()) {
auto error_code{json_result.error()};
SPDLOG_ERROR(
"Failed to deserialize log event to JSON: {}:{}",
error_code.category().name(),
error_code.message()
);
} else {
json_str = json_result.value().dump();
}

auto const& id_value_pairs{log_event.get_node_id_value_pairs()};
clp::ffi::value_int_t timestamp{0};
if (m_timestamp_node_id.has_value()) {
auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())};
if (timestamp_pair.has_value()) {
if (timestamp_pair->is<clp::ffi::value_int_t>()) {
timestamp = timestamp_pair.value().get_immutable_view<clp::ffi::value_int_t>();
} else {
// TODO: Add support for parsing timestamp values of string type.
SPDLOG_ERROR("Unable to parse timestamp for log_event_idx={}", log_event_idx);
}
}
}

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
json_str.c_str(),
timestamp,
LogLevel::NONE,
log_event_idx + 1
);
}

return DecodedResultsTsType(results);
}

StructuredIrStreamReader::StructuredIrStreamReader(
StreamReaderDataContext<StructuredIrDeserializer>&& stream_reader_data_context,
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> deserialized_log_events
)
: m_deserialized_log_events{std::move(deserialized_log_events)},
m_stream_reader_data_context{
std::make_unique<StreamReaderDataContext<StructuredIrDeserializer>>(
std::move(stream_reader_data_context)
)
} {}
} // namespace clp_ffi_js::ir
Loading
Loading