Skip to content

Commit

Permalink
feat(functions): Support for canonicalization of JSON (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#11284)

Summary:
This is preliminary PR that adds support for canonicalization of JSON strings. This initial PR only tackles canonicalization of json_parse. Another diff will handle CAST( _ as JSON) . Canonicalization is required since currently Velox just treats JSON as varchars thus equivalent JSON but having different backing varchar's are treated as separate JSON's which is incorrect and contrary to behavior shown by Presto.


Reviewed By: Yuhta, gggrace14

Differential Revision: D65084925

Pulled By: kgpai
  • Loading branch information
kgpai authored and facebook-github-bot committed Nov 20, 2024
1 parent c13b8ed commit 2ce62a9
Show file tree
Hide file tree
Showing 6 changed files with 587 additions and 28 deletions.
212 changes: 197 additions & 15 deletions velox/functions/prestosql/JsonFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,95 @@
* limitations under the License.
*/
#include "velox/expression/VectorFunction.h"
#include "velox/functions/prestosql/json/JsonStringUtil.h"
#include "velox/functions/prestosql/json/SIMDJsonUtil.h"
#include "velox/functions/prestosql/types/JsonType.h"

namespace facebook::velox::functions {

namespace {
const std::string_view kArrayStart = "[";
const std::string_view kArrayEnd = "]";
const std::string_view kSeparator = ",";
const std::string_view kObjectStart = "{";
const std::string_view kObjectEnd = "}";
const std::string_view kObjectKeySeparator = ":";

using JsonViews = std::vector<std::string_view>;

inline void addOrMergeViews(JsonViews& jsonViews, std::string_view view) {
if (jsonViews.empty()) {
jsonViews.push_back(view);
return;
}

auto& lastView = jsonViews.back();

if (lastView.data() + lastView.size() == view.data()) {
lastView = std::string_view(lastView.data(), lastView.size() + view.size());
} else {
jsonViews.push_back(view);
}
}

/// Class to keep track of json strings being written
/// in to a buffer. The size of the backing buffer must be known during
/// construction time.
class BufferTracker {
public:
explicit BufferTracker(BufferPtr buffer) : curPos_(0), currentViewStart_(0) {
bufPtr_ = buffer->asMutable<char>();
capacity = buffer->capacity();
}

/// Write out all the views to the buffer.
auto getCanonicalString(JsonViews& jsonViews) {
for (auto view : jsonViews) {
trimEscapeWriteToBuffer(view);
}
return getStringView();
}

/// Sets current view to the end of the previous string.
/// Should be called only after getCanonicalString ,
/// as after this call the previous view is lost.
void startNewString() {
currentViewStart_ += curPos_;
curPos_ = 0;
}

private:
/// Trims whitespace and escapes utf characters before writing to buffer.
void trimEscapeWriteToBuffer(std::string_view input) {
auto trimmed = velox::util::trimWhiteSpace(input.data(), input.size());
auto curBufPtr = getCurrentBufferPtr();
auto bytesWritten =
prestoJavaEscapeString(trimmed.data(), trimmed.size(), curBufPtr);
incrementCounter(bytesWritten);
}

/// Returns current string view against the buffer.
std::string_view getStringView() {
return std::string_view(bufPtr_ + currentViewStart_, curPos_);
}

inline char* getCurrentBufferPtr() {
return bufPtr_ + currentViewStart_ + curPos_;
}

void incrementCounter(size_t increment) {
VELOX_DCHECK_LE(curPos_ + currentViewStart_ + increment, capacity);
curPos_ += increment;
}

size_t capacity;
size_t curPos_;
size_t currentViewStart_;
char* bufPtr_;
};

} // namespace

namespace {
class JsonFormatFunction : public exec::VectorFunction {
public:
Expand Down Expand Up @@ -84,38 +168,75 @@ class JsonParseFunction : public exec::VectorFunction {
auto value = arg->as<ConstantVector<StringView>>()->valueAt(0);
paddedInput_.resize(value.size() + simdjson::SIMDJSON_PADDING);
memcpy(paddedInput_.data(), value.data(), value.size());
if (auto error = parse(value.size())) {
auto escapeSize = escapedStringSize(value.data(), value.size());
auto buffer = AlignedBuffer::allocate<char>(escapeSize, context.pool());
BufferTracker bufferTracker{buffer};

JsonViews jsonViews;

if (auto error = parse(value.size(), jsonViews)) {
context.setErrors(rows, errors_[error]);
return;
}
localResult = std::make_shared<ConstantVector<StringView>>(
context.pool(), rows.end(), false, JSON(), std::move(value));

BufferPtr stringViews =
AlignedBuffer::allocate<StringView>(1, context.pool(), StringView());
auto rawStringViews = stringViews->asMutable<StringView>();
rawStringViews[0] =
StringView(bufferTracker.getCanonicalString(jsonViews));

auto constantBase = std::make_shared<FlatVector<StringView>>(
context.pool(),
JSON(),
nullptr,
1,
stringViews,
std::vector<BufferPtr>{buffer});

localResult = BaseVector::wrapInConstant(rows.end(), 0, constantBase);

} else {
auto flatInput = arg->asFlatVector<StringView>();
BufferPtr stringViews = AlignedBuffer::allocate<StringView>(
rows.end(), context.pool(), StringView());
auto rawStringViews = stringViews->asMutable<StringView>();

auto stringBuffers = flatInput->stringBuffers();
VELOX_CHECK_LE(rows.end(), flatInput->size());

size_t maxSize = 0;
size_t totalOutputSize = 0;
rows.applyToSelected([&](auto row) {
auto value = flatInput->valueAt(row);
maxSize = std::max(maxSize, value.size());
totalOutputSize += escapedStringSize(value.data(), value.size());
});

paddedInput_.resize(maxSize + simdjson::SIMDJSON_PADDING);
BufferPtr buffer =
AlignedBuffer::allocate<char>(totalOutputSize, context.pool());
BufferTracker bufferTracker{buffer};

rows.applyToSelected([&](auto row) {
JsonViews jsonViews;
auto value = flatInput->valueAt(row);
memcpy(paddedInput_.data(), value.data(), value.size());
if (auto error = parse(value.size())) {
if (auto error = parse(value.size(), jsonViews)) {
context.setVeloxExceptionError(row, errors_[error]);
} else {
auto canonicalString = bufferTracker.getCanonicalString(jsonViews);

rawStringViews[row] = StringView(canonicalString);
bufferTracker.startNewString();
}
});

localResult = std::make_shared<FlatVector<StringView>>(
context.pool(),
JSON(),
nullptr,
rows.end(),
flatInput->values(),
std::move(stringBuffers));
stringViews,
std::vector<BufferPtr>{buffer});
}

context.moveOrCopyResult(localResult, rows, result);
Expand All @@ -130,45 +251,106 @@ class JsonParseFunction : public exec::VectorFunction {
}

private:
simdjson::error_code parse(size_t size) const {
simdjson::error_code parse(size_t size, JsonViews& jsonViews) const {
simdjson::padded_string_view paddedInput(
paddedInput_.data(), size, paddedInput_.size());
SIMDJSON_ASSIGN_OR_RAISE(auto doc, simdjsonParse(paddedInput));
SIMDJSON_TRY(validate<simdjson::ondemand::document&>(doc));
SIMDJSON_TRY(validate<simdjson::ondemand::document&>(doc, jsonViews));
if (!doc.at_end()) {
return simdjson::TRAILING_CONTENT;
}
return simdjson::SUCCESS;
}

template <typename T>
static simdjson::error_code validate(T value) {
static simdjson::error_code validate(T value, JsonViews& jsonViews) {
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
switch (type) {
case simdjson::ondemand::json_type::array: {
SIMDJSON_ASSIGN_OR_RAISE(auto array, value.get_array());

jsonViews.push_back(kArrayStart);
auto jsonViewsSize = jsonViews.size();
for (auto elementOrError : array) {
SIMDJSON_ASSIGN_OR_RAISE(auto element, elementOrError);
SIMDJSON_TRY(validate(element));
SIMDJSON_TRY(validate(element, jsonViews));
jsonViews.push_back(kSeparator);
}

// If the array is not empty, remove the last separator.
if (jsonViews.size() > jsonViewsSize) {
jsonViews.pop_back();
}

jsonViews.push_back(kArrayEnd);

return simdjson::SUCCESS;
}

case simdjson::ondemand::json_type::object: {
SIMDJSON_ASSIGN_OR_RAISE(auto object, value.get_object());

std::vector<std::pair<std::string_view, JsonViews>> objFields;
for (auto fieldOrError : object) {
SIMDJSON_ASSIGN_OR_RAISE(auto field, fieldOrError);
SIMDJSON_TRY(validate(field.value()));
auto key = field.key_raw_json_token();
JsonViews elementArray;
SIMDJSON_TRY(validate(field.value(), elementArray));
objFields.push_back({key, elementArray});
}

std::sort(objFields.begin(), objFields.end(), [](auto& a, auto& b) {
// Remove the quotes from the keys before we sort them.
auto af = std::string_view{a.first.data() + 1, a.first.size() - 2};
auto bf = std::string_view{b.first.data() + 1, b.first.size() - 2};
return lessThan(a.first, b.first);
});

jsonViews.push_back(kObjectStart);

for (auto i = 0; i < objFields.size(); i++) {
auto field = objFields[i];
addOrMergeViews(jsonViews, field.first);
jsonViews.push_back(kObjectKeySeparator);

for (auto& element : field.second) {
addOrMergeViews(jsonViews, element);
}

if (i < objFields.size() - 1) {
jsonViews.push_back(kSeparator);
}
}

jsonViews.push_back(kObjectEnd);
return simdjson::SUCCESS;
}
case simdjson::ondemand::json_type::number:

case simdjson::ondemand::json_type::number: {
SIMDJSON_ASSIGN_OR_RAISE(auto rawJson, value.raw_json());
addOrMergeViews(jsonViews, rawJson);

return value.get_double().error();
case simdjson::ondemand::json_type::string:
}
case simdjson::ondemand::json_type::string: {
SIMDJSON_ASSIGN_OR_RAISE(auto rawJson, value.raw_json());
addOrMergeViews(jsonViews, rawJson);

return value.get_string().error();
case simdjson::ondemand::json_type::boolean:
}

case simdjson::ondemand::json_type::boolean: {
SIMDJSON_ASSIGN_OR_RAISE(auto rawJson, value.raw_json());
addOrMergeViews(jsonViews, rawJson);

return value.get_bool().error();
}

case simdjson::ondemand::json_type::null: {
SIMDJSON_ASSIGN_OR_RAISE(auto isNull, value.is_null());
SIMDJSON_ASSIGN_OR_RAISE(auto rawJson, value.raw_json());
addOrMergeViews(jsonViews, rawJson);

return isNull ? simdjson::SUCCESS : simdjson::N_ATOM_ERROR;
}
}
Expand Down
Loading

0 comments on commit 2ce62a9

Please sign in to comment.