Skip to content

Commit

Permalink
*: Refine the error message when schema mismatch in ExchangeReceiver (#…
Browse files Browse the repository at this point in the history
…9744)

ref #9673

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Dec 27, 2024
1 parent 8369058 commit f0faa0c
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 107 deletions.
36 changes: 18 additions & 18 deletions dbms/src/Flash/Coprocessor/CodecUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <Flash/Coprocessor/DAGUtils.h>

namespace DB
{
namespace ErrorCodes
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes
} // namespace DB::ErrorCodes

namespace CodecUtils
namespace DB::CodecUtils
{
void checkColumnSize(const String & identifier, size_t expected, size_t actual)
{
if unlikely (expected != actual)
if (unlikely(expected != actual))
throw Exception(
fmt::format("{} schema size mismatch, expected {}, actual {}.", identifier, expected, actual),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"{} schema size mismatch, expected {}, actual {}.",
identifier,
expected,
actual);
}

void checkDataTypeName(const String & identifier, size_t column_index, const String & expected, const String & actual)
{
if unlikely (expected != actual)
if (unlikely(expected != actual))
throw Exception(
fmt::format(
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual);
}

} // namespace CodecUtils
} // namespace DB
} // namespace DB::CodecUtils
69 changes: 36 additions & 33 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <type_traits>

namespace DB
{
Expand Down Expand Up @@ -816,7 +815,17 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toExchangeReceiveResult
recv_msg->getReqInfo(),
recv_msg->getErrorPtr()->msg());

return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr);
try
{
return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr);
}
catch (DB::Exception & e)
{
// Add the MPPTask identifier and exector_id to the error message, make it easier to
// identify the specific stage within a complex query where the error occurs
e.addMessage(fmt::format("{}", exc_log->identifier()));
e.rethrow();
}
}
case ReceiveStatus::eof:
return handleUnnormalChannel(block_queue, decoder_ptr);
Expand Down Expand Up @@ -882,42 +891,36 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::toDecodeResult(
{
assert(recv_msg != nullptr);
const auto * resp_ptr = recv_msg->getRespPtr(stream_id);
if (resp_ptr
!= nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries.
{
auto select_resp = std::make_shared<tipb::SelectResponse>();
if (unlikely(!select_resp->ParseFromString(*resp_ptr)))
{
return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error");
}
else
{
auto result
= ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
/// If mocking TiFlash as TiDB, we should decode chunks from select_resp.
if (unlikely(!result.resp->chunks().empty()))
{
assert(recv_msg->getChunks(stream_id).empty());
// Fine grained shuffle should only be enabled when sending data to TiFlash node.
// So all data should be encoded into MPPDataPacket.chunks.
RUNTIME_CHECK_MSG(
!enable_fine_grained_shuffle_flag,
"Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema);
}
else if (!recv_msg->getChunks(stream_id).empty())
{
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
}
return result;
}
}
else /// the non-last packets
if (resp_ptr == nullptr)
{
/// the non-last packets
auto result = ExchangeReceiverResult::newOk(nullptr, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
return result;
}

/// the data of the last packet is serialized from tipb::SelectResponse including execution summaries.
auto select_resp = std::make_shared<tipb::SelectResponse>();
if (unlikely(!select_resp->ParseFromString(*resp_ptr)))
return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error");

auto result = ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo());
/// If mocking TiFlash as TiDB, we should decode chunks from select_resp.
if (unlikely(!result.resp->chunks().empty()))
{
assert(recv_msg->getChunks(stream_id).empty());
// Fine grained shuffle should only be enabled when sending data to TiFlash node.
// So all data should be encoded into MPPDataPacket.chunks.
RUNTIME_CHECK_MSG(
!enable_fine_grained_shuffle_flag,
"Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled");
result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema);
}
else if (!recv_msg->getChunks(stream_id).empty())
{
result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr);
}
return result;
}

template <typename RPCContext>
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <Flash/Mpp/AsyncRequestHandler.h>
#include <Flash/Mpp/GRPCReceiverContext.h>

#include <future>
#include <memory>
#include <mutex>
#include <thread>

namespace DB
{
Expand All @@ -36,8 +34,8 @@ struct ExchangeReceiverResult
size_t call_index;
String req_info;
bool meet_error;
String error_msg;
bool eof;
String error_msg;
DecodeDetail decode_detail;

ExchangeReceiverResult()
Expand Down Expand Up @@ -74,8 +72,8 @@ struct ExchangeReceiverResult
, call_index(call_index_)
, req_info(req_info_)
, meet_error(meet_error_)
, error_msg(error_msg_)
, eof(eof_)
, error_msg(error_msg_)
{}
};

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}

LOG_INFO(log, "Using api_version={}", storage_config.api_version);

// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, const Logge

lazily_init_store = get_bool_config_or_default("lazily_init_store", lazily_init_store);

LOG_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store);
LOG_INFO(
log,
"format_version={} lazily_init_store={} api_version={}",
format_version,
lazily_init_store,
api_version);
}

Strings TiFlashStorageConfig::getAllNormalPaths() const
Expand Down
132 changes: 85 additions & 47 deletions dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RestoreDMFile.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>
Expand All @@ -29,38 +31,61 @@ DMFilePtr restoreDMFileFromRemoteDataSource(
UInt64 file_page_id,
UInt64 meta_version)
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto wn_ps = dm_context.global_context.getWriteNodePageStorage();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id),
file_page_id);
auto full_external_id = wn_ps->getNormalPageId(full_page_id);
auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id);
auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id);
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
DMFilePtr dmfile;
try
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto wn_ps = dm_context.global_context.getWriteNodePageStorage();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(
dm_context.keyspace_id,
StorageType::Data,
dm_context.physical_table_id),
file_page_id);
auto full_external_id = wn_ps->getNormalPageId(full_page_id);
auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id);
auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id);
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
assert(dmfile != nullptr);
return dmfile;
}

DMFilePtr restoreDMFileFromLocal(const DMContext & dm_context, UInt64 file_page_id, UInt64 meta_version)
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id);
auto file_parent_path = path_delegate.getDTFilePath(file_id);
auto dmfile = DMFile::restore(
dm_context.global_context.getFileProvider(),
file_id,
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
meta_version,
dm_context.keyspace_id);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
DMFilePtr dmfile;
try
{
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id);
auto file_parent_path = path_delegate.getDTFilePath(file_id);
dmfile = DMFile::restore(
dm_context.global_context.getFileProvider(),
file_id,
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
meta_version,
dm_context.keyspace_id);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
assert(dmfile != nullptr);
return dmfile;
}

Expand All @@ -72,26 +97,39 @@ DMFilePtr restoreDMFileFromCheckpoint(
UInt64 file_page_id,
UInt64 meta_version)
{
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id),
file_page_id);
auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id);
auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile();
auto file_oid = data_key_view.getDMFileOID();
auto data_key = data_key_view.toFullKey();
auto delegator = dm_context.path_pool->getStableDiskDelegator();
auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
PS::V3::CheckpointLocation loc{
.data_file_id = std::make_shared<String>(data_key),
.offset_in_file = 0,
.size_in_file = 0,
};
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
DMFilePtr dmfile;
try
{
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(
dm_context.keyspace_id,
StorageType::Data,
dm_context.physical_table_id),
file_page_id);
auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id);
auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile();
auto file_oid = data_key_view.getDMFileOID();
auto data_key = data_key_view.toFullKey();
auto delegator = dm_context.path_pool->getStableDiskDelegator();
auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
PS::V3::CheckpointLocation loc{
.data_file_id = std::make_shared<String>(data_key),
.offset_in_file = 0,
.size_in_file = 0,
};
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version));
e.rethrow();
}
assert(dmfile != nullptr);
return dmfile;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ SegmentPtr Segment::restoreSegment( //
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.addMessage(fmt::format("while restoreSegment, segment_id={} ident={}", segment_id, parent_log->identifier()));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/FormatVersion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const StorageFormatVersion & toStorageFormat(UInt64 setting)
case 103:
return STORAGE_FORMAT_V103;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal setting value: {}", setting);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal format_version value: {}", setting);
}
}
} // namespace
Expand Down

0 comments on commit f0faa0c

Please sign in to comment.