Skip to content

Commit

Permalink
Improve concat_json (#2557)
Browse files Browse the repository at this point in the history
* Add `nullify_invalid_rows` parameter to `concat_json`

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup and add more docs

Signed-off-by: Nghia Truong <[email protected]>

* Rename variable

Signed-off-by: Nghia Truong <[email protected]>

---------

Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia authored Nov 5, 2024
1 parent 4a3661c commit ee4a6c4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 44 deletions.
9 changes: 6 additions & 3 deletions src/main/cpp/src/JSONUtilsJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concaten
try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
auto [is_valid, joined_strings, delimiter] =
spark_rapids_jni::concat_json(cudf::strings_column_view{*input_cv});

// Currently, set `nullify_invalid_rows = false` as `concatenateJsonStrings` is used only for
// `from_json` with struct schema.
auto [joined_strings, delimiter, should_be_nullify] = spark_rapids_jni::concat_json(
cudf::strings_column_view{*input_cv}, /*nullify_invalid_rows*/ false);

// The output array contains 5 elements:
// [0]: address of the cudf::column object `is_valid` in host memory
Expand All @@ -173,7 +176,7 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concaten
// [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory
// [4]: delimiter char
auto out_handles = cudf::jni::native_jlongArray(env, 5);
out_handles[0] = reinterpret_cast<jlong>(is_valid.release());
out_handles[0] = reinterpret_cast<jlong>(should_be_nullify.release());
out_handles[1] = reinterpret_cast<jlong>(joined_strings->data());
out_handles[2] = static_cast<jlong>(joined_strings->size());
out_handles[3] = reinterpret_cast<jlong>(joined_strings.release());
Expand Down
62 changes: 26 additions & 36 deletions src/main/cpp/src/json_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,40 @@ constexpr bool can_be_delimiter(char c)

} // namespace

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::column>> concat_json(
cudf::strings_column_view const& input,
bool nullify_invalid_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.is_empty()) {
return {std::make_unique<rmm::device_buffer>(0, stream, mr),
'\n',
std::make_unique<cudf::column>(
rmm::device_uvector<bool>{0, stream, mr}, rmm::device_buffer{}, 0)};
}

auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const default_mr = rmm::mr::get_current_device_resource();

// Check if the input rows are either null, equal to `null` string literal, or empty.
// This will be used for masking out the input when doing string concatenation.
// Check if the input rows are null, empty (containing only whitespaces), and invalid JSON.
// This will be used for masking out the null/empty/invalid input rows when doing string
// concatenation.
rmm::device_uvector<bool> is_valid_input(input.size(), stream, default_mr);

// Check if the input rows are either null or empty.
// This will be returned to the caller.
rmm::device_uvector<bool> is_null_or_empty(input.size(), stream, mr);
// Check if the input rows are null, empty (containing only whitespaces), and may also check
// for invalid JSON strings.
// This will be returned to the caller to create null mask for the final output.
rmm::device_uvector<bool> should_be_nullified(input.size(), stream, mr);

thrust::for_each(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0L),
thrust::make_counting_iterator(input.size() * static_cast<int64_t>(cudf::detail::warp_size)),
[input = *d_input_ptr,
[nullify_invalid_rows,
input = *d_input_ptr,
output = thrust::make_zip_iterator(thrust::make_tuple(
is_valid_input.begin(), is_null_or_empty.begin()))] __device__(int64_t tidx) {
is_valid_input.begin(), should_be_nullified.begin()))] __device__(int64_t tidx) {
// Execute one warp per row to minimize thread divergence.
if ((tidx % cudf::detail::warp_size) != 0) { return; }
auto const idx = tidx / cudf::detail::warp_size;
Expand All @@ -110,36 +121,14 @@ std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, c
if (not_whitespace(ch)) { break; }
}

if (i + 3 < size &&
(d_str[i] == 'n' && d_str[i + 1] == 'u' && d_str[i + 2] == 'l' && d_str[i + 3] == 'l')) {
i += 4;

// Skip the very last whitespace characters.
bool is_null_literal{true};
for (; i < size; ++i) {
ch = d_str[i];
if (not_whitespace(ch)) {
is_null_literal = false;
break;
}
}

// The current row contains only `null` string literal and not any other non-whitespace
// characters. Such rows need to be masked out as null when doing concatenation.
if (is_null_literal) {
output[idx] = thrust::make_tuple(false, false);
return;
}
}

auto const not_eol = i < size;

// If the current row is not null or empty, it should start with `{`. Otherwise, we need to
// replace it by a null. This is necessary for libcudf's JSON reader to work.
// Note that if we want to support ARRAY schema, we need to check for `[` instead.
auto constexpr start_character = '{';
if (not_eol && ch != start_character) {
output[idx] = thrust::make_tuple(false, false);
output[idx] = thrust::make_tuple(false, nullify_invalid_rows);
return;
}

Expand Down Expand Up @@ -221,9 +210,9 @@ std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, c
stream,
mr);

return {std::make_unique<cudf::column>(std::move(is_null_or_empty), rmm::device_buffer{}, 0),
std::move(concat_strings->release().data),
delimiter};
return {std::move(concat_strings->release().data),
delimiter,
std::make_unique<cudf::column>(std::move(should_be_nullified), rmm::device_buffer{}, 0)};
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
Expand Down Expand Up @@ -254,13 +243,14 @@ std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const&

} // namespace detail

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::column>> concat_json(
cudf::strings_column_view const& input,
bool nullify_invalid_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::concat_json(input, stream, mr);
return detail::concat_json(input, nullify_invalid_rows, stream, mr);
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
Expand Down
31 changes: 26 additions & 5 deletions src/main/cpp/src/json_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,36 @@ std::unique_ptr<cudf::column> from_json_to_raw_map(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::unique_ptr<cudf::column> make_structs(
std::vector<cudf::column_view> const& input,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
* @brief Concatenate the JSON objects given by a strings column into one single character buffer,
* in which each JSON objects is delimited by a special character that does not exist in the input.
*
* Beyond returning the concatenated buffer with delimiter, the function also returns a BOOL8
* column indicating which rows should be nullified after parsing the concatenated buffer. Each
* row of this column is a `true` value if the corresponding input row is either empty, containing
* only whitespaces, or invalid JSON object depending on the `nullify_invalid_rows` parameter.
*
* Note that an invalid JSON object in this context is a string that does not start with the `{`
* character after whitespaces.
*
* @param input The strings column containing input JSON objects
* @param nullify_invalid_rows Whether to nullify rows containing invalid JSON objects
* @param stream The CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* @return A tuple containing the concatenated JSON objects as a single buffer, the delimiter
* character, and a BOOL8 column indicating which rows should be nullified after parsing
* the concatenated buffer
*/
std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::column>> concat_json(
cudf::strings_column_view const& input,
bool nullify_invalid_rows = false,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni

0 comments on commit ee4a6c4

Please sign in to comment.