Skip to content

Commit

Permalink
Optimize get_json_object by calling the main kernel only once (NVID…
Browse files Browse the repository at this point in the history
…IA#2129)

* Implement the CPU code

Signed-off-by: Nghia Truong <[email protected]>

* Reimplement kernel

Signed-off-by: Nghia Truong <[email protected]>

* Fix the kernel caller

Signed-off-by: Nghia Truong <[email protected]>

* Optimize validity computation

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Add comment

Signed-off-by: Nghia Truong <[email protected]>

* Turning kernel occupancy

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Change padding for the scratch buffer

Signed-off-by: Nghia Truong <[email protected]>

* Update docs

Signed-off-by: Nghia Truong <[email protected]>

* Add test for overflow case

Signed-off-by: Nghia Truong <[email protected]>

* Pad the output buffer using max row size

Signed-off-by: Nghia Truong <[email protected]>

* Update test

Signed-off-by: Nghia Truong <[email protected]>

* Change the padding ratio

Signed-off-by: Nghia Truong <[email protected]>

---------

Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia authored and wjxiz1992 committed Jun 17, 2024
1 parent dcfc320 commit 083c23c
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 124 deletions.
250 changes: 130 additions & 120 deletions src/main/cpp/src/get_json_object.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/detail/offsets_iterator_factory.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
Expand All @@ -37,7 +38,11 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/functional.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/pair.h>
#include <thrust/transform_reduce.h>
#include <thrust/tuple.h>

namespace spark_rapids_jni {
Expand Down Expand Up @@ -825,33 +830,21 @@ rmm::device_uvector<path_instruction> construct_path_commands(
*
*
* @param input The incoming json string
* @param input_len Size of the incoming json string
* @param path_commands_ptr The command buffer to be applied to the string.
* @param path_commands_size The command buffer size.
* @param out_buf Buffer user to store the results of the query
* (nullptr in the size computation step)
* @param out_buf_size Size of the output buffer
* @returns A pair containing the result code and the output buffer.
* @param path_commands The command buffer to be applied to the string
* @param out_buf Buffer user to store the string resulted from the query
* @returns A pair containing the result code and the output buffer
*/
__device__ thrust::pair<bool, size_t> get_json_object_single(
char_range input,
cudf::device_span<path_instruction const> path_commands,
char* out_buf,
size_t out_buf_size)
__device__ thrust::pair<bool, cudf::size_type> get_json_object_single(
char_range input, cudf::device_span<path_instruction const> path_commands, char* out_buf)
{
json_parser j_parser(input);
j_parser.next_token();
// JSON validation check
if (json_token::ERROR == j_parser.get_current_token()) { return {false, 0}; }

// First pass: preprocess sizes.
// Second pass: writes output.
// The generator automatically determines which pass based on `out_buf`.
// If `out_buf_size` is zero, pass in `nullptr` to avoid generator writing trash output.
json_generator generator((out_buf_size == 0) ? nullptr : out_buf);
json_generator generator(out_buf);

bool const success = evaluate_path(
j_parser, generator, write_style::RAW, {path_commands.data(), path_commands.size()});
bool const success = evaluate_path(j_parser, generator, write_style::RAW, path_commands);

if (!success) {
// generator may contain trash output, e.g.: generator writes some output,
Expand All @@ -860,25 +853,23 @@ __device__ thrust::pair<bool, size_t> get_json_object_single(
generator.set_output_len_zero();
}

return {success, generator.get_output_len()};
return {success, static_cast<cudf::size_type>(generator.get_output_len())};
}

/**
* @brief Kernel for running the JSONPath query.
*
* This kernel operates in a 2-pass way. On the first pass it computes the
* output sizes. On the second pass, it fills in the provided output buffers
* (chars and validity).
* This kernel writes out the output strings and their lengths at the same time. If any output
* length exceed buffer size limit, a boolean flag will be turned on to inform to the caller.
* In such situation, another (larger) output buffer will be generated and the kernel is launched
* again. Otherwise, launching this kernel only once is sufficient to produce the desired output.
*
* @param col Device view of the incoming string
* @param input The input JSON strings stored in a strings column
* @param offsets Offsets to the output locations in the output buffer
* @param path_commands JSONPath command buffer
* @param d_sizes a buffer used to write the output sizes in the first pass,
* and is read back in on the second pass to compute offsets.
* @param output_offsets Buffer used to store the string offsets for the results
* of the query
* @param out_buf Buffer used to store the results of the query
* @param out_validity Output validity buffer
* @param out_valid_count Output count of # of valid bits
* @param out_stringviews The output array to store pointers to the output strings and their sizes
* @param out_buf Buffer used to store the strings resulted from the query
* @param has_out_of_bound Flag to indicate if any output string has length exceeds its buffer size
*/
template <int block_size>
// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from
Expand All @@ -889,61 +880,33 @@ template <int block_size>
// the performance is really bad. This essentially tells NVCC to prefer using lots
// of registers over spilling.
__launch_bounds__(block_size, 1) CUDF_KERNEL
void get_json_object_kernel(cudf::column_device_view col,
void get_json_object_kernel(cudf::column_device_view input,
cudf::detail::input_offsetalator offsets,
cudf::device_span<path_instruction const> path_commands,
cudf::size_type* d_sizes,
cudf::detail::input_offsetalator output_offsets,
thrust::pair<char const*, cudf::size_type>* out_stringviews,
char* out_buf,
cudf::bitmask_type* out_validity,
cudf::size_type* out_valid_count)
bool* has_out_of_bound)
{
auto tid = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();
for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < input.size(); tid += stride) {
char* const dst = out_buf + offsets[tid];
bool is_valid = false;
cudf::size_type out_size = 0;

cudf::size_type warp_valid_count{0};

auto active_threads = __ballot_sync(0xffff'ffffu, tid < col.size());
while (tid < col.size()) {
bool is_valid = false;
cudf::string_view const str = col.element<cudf::string_view>(tid);
auto const str = input.element<cudf::string_view>(tid);
if (str.size_bytes() > 0) {
char* dst = out_buf != nullptr ? out_buf + output_offsets[tid] : nullptr;
size_t const dst_size =
out_buf != nullptr ? output_offsets[tid + 1] - output_offsets[tid] : 0;

// process one single row
auto [result, output_size] =
get_json_object_single(str, {path_commands.data(), path_commands.size()}, dst, dst_size);
if (result) { is_valid = true; }

// filled in only during the precompute step. during the compute step, the
// offsets are fed back in so we do -not- want to write them out
if (out_buf == nullptr) { d_sizes[tid] = static_cast<cudf::size_type>(output_size); }
} else {
// valid JSON length is always greater than 0
// if `str` size len is zero, output len is 0 and `is_valid` is false
if (out_buf == nullptr) { d_sizes[tid] = 0; }
}
auto const max_size = offsets[tid + 1] - offsets[tid];

// validity filled in only during the output step
if (out_validity != nullptr) {
uint32_t mask = __ballot_sync(active_threads, is_valid);
// 0th lane of the warp writes the validity
if (!(tid % cudf::detail::warp_size)) {
out_validity[cudf::word_index(tid)] = mask;
warp_valid_count += __popc(mask);
}
// If `max_size == 0`, do not pass in the dst pointer to prevent writing garbage data.
thrust::tie(is_valid, out_size) =
get_json_object_single(str, path_commands, max_size != 0 ? dst : nullptr);
if (out_size > max_size) { *has_out_of_bound = true; }
}

tid += stride;
active_threads = __ballot_sync(active_threads, tid < col.size());
}

// sum the valid counts across the whole block
if (out_valid_count != nullptr) {
cudf::size_type block_valid_count =
cudf::detail::single_lane_block_sum_reduce<block_size, 0>(warp_valid_count);
if (threadIdx.x == 0) { atomicAdd(out_valid_count, block_valid_count); }
// Write out `nullptr` in the output string_view to indicate that the output is a null.
// The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
// second time due to out-of-bound write in the first launch.
if (out_stringviews) { out_stringviews[tid] = {is_valid ? dst : nullptr, out_size}; }
}
}

Expand All @@ -953,64 +916,111 @@ std::unique_ptr<cudf::column> get_json_object(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.is_empty()) return cudf::make_empty_column(cudf::type_id::STRING);

if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); }
if (input.is_empty()) { return cudf::make_empty_column(cudf::type_id::STRING); }

// get a string buffer to store all the names and convert to device
std::string all_names;
for (auto const& inst : instructions) {
all_names += std::get<1>(inst);
}
cudf::string_scalar all_names_scalar(all_names, true, stream);
// parse the json_path into a command buffer
auto path_commands = construct_path_commands(
auto const all_names_scalar = cudf::string_scalar(all_names, true, stream);
auto const path_commands = construct_path_commands(
instructions, all_names_scalar, stream, rmm::mr::get_current_device_resource());
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const in_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

// A buffer to store the output strings without knowing their sizes.
// Since we do not know their sizes, we need to allocate the buffer a bit larger than the input
// size so that we will not write output strings into an out-of-bound position.
// Checking out-of-bound needs to be performed in the main kernel to make sure we will not have
// data corruption.
auto const scratch_size = [&] {
auto const max_row_size = thrust::transform_reduce(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.size()),
cuda::proclaim_return_type<int64_t>(
[in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }),
int64_t{0},
thrust::maximum{});

// Pad the scratch buffer by an additional size that is a multiple of max row size.
auto constexpr padding_rows = 10;
return input.chars_size(stream) + max_row_size * padding_rows;
}();
auto output_scratch = rmm::device_uvector<char>(scratch_size, stream);
auto out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{
static_cast<std::size_t>(input.size()), stream};
auto has_out_of_bound = rmm::device_scalar<bool>{false, stream};

constexpr int blocks_per_SM = 1;
constexpr int block_size = 256;
auto const num_blocks = [&] {
int device_id{};
cudaDeviceProp props{};
CUDF_CUDA_TRY(cudaGetDevice(&device_id));
CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id));
return props.multiProcessorCount * blocks_per_SM;
}();

// compute output sizes
auto sizes = rmm::device_uvector<cudf::size_type>(
input.size(), stream, rmm::mr::get_current_device_resource());
auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

constexpr int block_size = 512;
cudf::detail::grid_1d const grid{input.size(), block_size};
auto d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
// preprocess sizes (returned in the offsets buffer)
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr, path_commands, sizes.data(), d_offsets, nullptr, nullptr, nullptr);

// convert sizes to offsets
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
in_offsets,
path_commands,
out_stringviews.data(),
output_scratch.data(),
has_out_of_bound.data());

// If we didn't see any out-of-bound write, everything is good so far.
// Just gather the output strings and return.
if (!has_out_of_bound.value(stream)) {
return cudf::make_strings_column(out_stringviews, stream, mr);
}
// From here, we had out-of-bound write. Although this is very rare, it may still happen.

// This scratch buffer is no longer needed.
output_scratch = rmm::device_uvector<char>{0, stream};

// The string sizes computed in the previous kernel call will be used to allocate a new char
// buffer to store the output.
auto const size_it = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[string_pairs = out_stringviews.data()] __device__(auto const idx) {
return string_pairs[idx].second;
}));
auto [offsets, output_size] =
cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr);
d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());
cudf::strings::detail::make_offsets_child_column(size_it, size_it + input.size(), stream, mr);

// allocate output string column
rmm::device_uvector<char> chars(output_size, stream, mr);
// Also compute the null mask using the stored char pointers.
auto const validator = [] __device__(thrust::pair<char const*, cudf::size_type> const item) {
return item.first != nullptr;
};
auto [null_mask, null_count] =
cudf::detail::valid_if(out_stringviews.begin(), out_stringviews.end(), validator, stream, mr);

// potential optimization : if we know that all outputs are valid, we could
// skip creating the validity mask altogether
rmm::device_buffer validity =
cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr);
// No longer need it from here. Free up memory for now.
out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{0, stream};

// compute results
rmm::device_scalar<cudf::size_type> d_valid_count{0, stream};
auto chars = rmm::device_uvector<char>(output_size, stream, mr);
auto const out_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());

has_out_of_bound.set_value_to_zero_async(stream);
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr,
path_commands,
sizes.data(),
d_offsets,
chars.data(),
static_cast<cudf::bitmask_type*>(validity.data()),
d_valid_count.data());

return make_strings_column(input.size(),
std::move(offsets),
chars.release(),
input.size() - d_valid_count.value(stream),
std::move(validity));
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
out_offsets,
path_commands,
nullptr /*out_stringviews*/,
chars.data(),
has_out_of_bound.data());

// This kernel call should not see out-of-bound write. If it is still detected, there must be
// something wrong happened.
CUDF_EXPECTS(!has_out_of_bound.value(stream),
"Unexpected out-of-bound write in get_json_object kernel.");

return cudf::make_strings_column(
input.size(), std::move(offsets), chars.release(), null_count, std::move(null_mask));
}

} // namespace detail
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/com/nvidia/spark/rapids/jni/GetJsonObjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ void getJsonObjectTest() {
namedPath("k") };
try (ColumnVector jsonCv = ColumnVector.fromStrings(
"{\"k\": \"v\"}");
ColumnVector expected = ColumnVector.fromStrings(
"v");
ColumnVector actual = JSONUtils.getJsonObject(jsonCv, query)) {
ColumnVector expected = ColumnVector.fromStrings(
"v");
ColumnVector actual = JSONUtils.getJsonObject(jsonCv, query)) {
assertColumnsAreEqual(expected, actual);
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ void getJsonObjectTest_Escape() {
String JSON4 = "['a','b','\"C\"']";
// \\u4e2d\\u56FD is 中国
String JSON5 = "'\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b'";
String JSON6 = "['\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b']";
String JSON6 = "['\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b']";

String expectedStr1 = "{\"a\":\"A\"}";
String expectedStr2 = "{\"a\":\"A\\\"\"}";
Expand Down Expand Up @@ -600,6 +600,23 @@ void getJsonObjectTest_15() {
}
}

/**
* This test is when the JNI kernel is called twice. It happens when the output JSON strings
* have lengths that are larger than their corresponding input.
*/
@Test
void getJsonObjectTest_JNIKernelCalledTwice() {
// This is equivalent to the path '$'.
JSONUtils.PathInstructionJni[] query = new JSONUtils.PathInstructionJni[] {};
try (
ColumnVector input = ColumnVector.fromStrings("['\n']", "['\n\n\n\n\n\n\n\n\n\n']",
"", "", "", "", "", "", "", "");
ColumnVector expected = ColumnVector.fromStrings("[\"\\n\"]",
"[\"\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\"]", null, null, null, null, null, null, null, null);
ColumnVector actual = JSONUtils.getJsonObject(input, query)) {
assertColumnsAreEqual(expected, actual);
}
}

private JSONUtils.PathInstructionJni wildcardPath() {
return new JSONUtils.PathInstructionJni(JSONUtils.PathInstructionType.WILDCARD, "", -1);
Expand Down

0 comments on commit 083c23c

Please sign in to comment.