diff --git a/bcos-executor/src/vm/VMInstance.cpp b/bcos-executor/src/vm/VMInstance.cpp index fed33c8de4..1f9dc3d262 100644 --- a/bcos-executor/src/vm/VMInstance.cpp +++ b/bcos-executor/src/vm/VMInstance.cpp @@ -40,9 +40,9 @@ VMInstance::VMInstance(evmc_vm* instance, evmc_revision revision, bytes_view cod // Set the options. if (m_instance->set_option != nullptr) { // baseline interpreter could not work with precompiled - // m_instance->set_option(m_instance, "advanced", ""); // default is baseline interpreter - // m_instance->set_option(m_instance, "trace", ""); - // m_instance->set_option(m_instance, "cgoto", "no"); + // m_instance->set_option(m_instance, "advanced", ""); // default is baseline interpreter + // m_instance->set_option(m_instance, "trace", ""); + // m_instance->set_option(m_instance, "cgoto", "no"); } } @@ -62,8 +62,8 @@ Result VMInstance::execute(HostContext& _hostContext, evmc_message* _msg) } auto state = std::make_unique( *_msg, m_revision, *_hostContext.interface, &_hostContext, m_code); - { // baseline - auto* evm = evmc_create_evmone(); // baseline use the vm to get options + { // baseline + static auto* evm = evmc_create_evmone(); // baseline use the vm to get options return Result(evmone::baseline::execute( *static_cast(evm), _msg->gas, *state, *m_analysis)); } diff --git a/bcos-framework/bcos-framework/ledger/Features.h b/bcos-framework/bcos-framework/ledger/Features.h index f127a90aca..1d5a2c9a87 100644 --- a/bcos-framework/bcos-framework/ledger/Features.h +++ b/bcos-framework/bcos-framework/ledger/Features.h @@ -2,11 +2,10 @@ #include "../protocol/Protocol.h" #include "../storage/Entry.h" #include "../storage/LegacyStorageMethods.h" -#include "../storage/StorageInterface.h" #include "../storage2/Storage.h" +#include "bcos-concepts/Exception.h" #include "bcos-framework/ledger/LedgerTypeDef.h" #include "bcos-task/Task.h" -#include #include #include #include diff --git a/bcos-rpc/bcos-rpc/tarsRPC/RPCServer.cpp b/bcos-rpc/bcos-rpc/tarsRPC/RPCServer.cpp index 46b5280c49..9ad837a046 100644 --- a/bcos-rpc/bcos-rpc/tarsRPC/RPCServer.cpp +++ b/bcos-rpc/bcos-rpc/tarsRPC/RPCServer.cpp @@ -67,8 +67,9 @@ bcostars::Error bcos::rpc::RPCServer::call(const bcostars::Transaction& request, return; } - auto receipt = dynamic_cast( - *transactionReceiptPtr); + auto const& receipt = + dynamic_cast( + *transactionReceiptPtr); bcos::rpc::RPCServer::async_response_call(current, tarsError, receipt.inner()); } catch (std::exception& e) @@ -100,7 +101,7 @@ bcostars::Error bcos::rpc::RPCServer::sendTransaction(const bcostars::Transactio auto& txpool = self->m_params.node->txpoolRef(); txpool.broadcastTransaction(*transaction); auto submitResult = co_await txpool.submitTransaction(std::move(transaction)); - auto receipt = dynamic_cast( + const auto& receipt = dynamic_cast( *submitResult->transactionReceipt()); bcos::rpc::RPCServer::async_response_sendTransaction(current, error, receipt.inner()); diff --git a/bcos-sdk/sample/tars/performanceTransfer.cpp b/bcos-sdk/sample/tars/performanceTransfer.cpp index ace523d38d..cc1ab7c1dd 100644 --- a/bcos-sdk/sample/tars/performanceTransfer.cpp +++ b/bcos-sdk/sample/tars/performanceTransfer.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include diff --git a/libinitializer/PBFTInitializer.cpp b/libinitializer/PBFTInitializer.cpp index 4e06f7f81f..41dd04619f 100644 --- a/libinitializer/PBFTInitializer.cpp +++ b/libinitializer/PBFTInitializer.cpp @@ -174,6 +174,9 @@ void PBFTInitializer::initChainNodeInfo( auto nodeProtocolInfo = g_BCOSConfig.protocolInfo(ProtocolModuleID::NodeService); m_nodeInfo->setNodeProtocol(*nodeProtocolInfo); m_nodeInfo->setCompatibilityVersion(m_pbft->compatibilityVersion()); + m_nodeInfo->setFeatureKeys( + ledger::Features::featureKeys() | + RANGES::views::transform([](std::string_view view) { return std::string(view); })); m_groupInfo->appendNodeInfo(m_nodeInfo); INITIALIZER_LOG(INFO) << LOG_DESC("PBFTInitializer::initChainNodeInfo") << LOG_KV("nodeType", m_nodeInfo->nodeType()) diff --git a/transaction-executor/bcos-transaction-executor/vm/VMInstance.h b/transaction-executor/bcos-transaction-executor/vm/VMInstance.h index fe475de954..f9d0178db1 100644 --- a/transaction-executor/bcos-transaction-executor/vm/VMInstance.h +++ b/transaction-executor/bcos-transaction-executor/vm/VMInstance.h @@ -56,8 +56,6 @@ class VMInstance if constexpr (std::is_same_v) { assert(instance->abi_version == EVMC_ABI_VERSION); - if (instance->set_option != nullptr) - {} m_instance.emplace(instance); } else diff --git a/transaction-executor/benchmark/benchmakrExecutor.cpp b/transaction-executor/benchmark/benchmakrExecutor.cpp index e4965cdce1..1b89d940bd 100644 --- a/transaction-executor/benchmark/benchmakrExecutor.cpp +++ b/transaction-executor/benchmark/benchmakrExecutor.cpp @@ -28,9 +28,10 @@ struct Fixture : m_cryptoSuite(std::make_shared( std::make_shared(), nullptr, nullptr)), m_receiptFactory(m_cryptoSuite), - m_executor(m_receiptFactory, bcos::executor::GlobalHashImpl::g_hashImpl), + m_executor(m_receiptFactory, std::make_shared()), blockHeader([inner = std::addressof(tarsBlockHeader)]() mutable { return inner; }) { + boost::log::core::get()->set_logging_enabled(false); bcos::executor::GlobalHashImpl::g_hashImpl = std::make_shared(); boost::algorithm::unhex(helloworldBytecode, std::back_inserter(m_helloworldBytecodeBinary)); blockHeader.setVersion((uint32_t)bcos::protocol::BlockVersion::V3_1_VERSION); diff --git a/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h b/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h index b5df780b7f..41d8466ca0 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h +++ b/transaction-scheduler/bcos-transaction-scheduler/BaselineScheduler.h @@ -39,7 +39,6 @@ #include #include #include -#include #include namespace bcos::transaction_scheduler @@ -216,7 +215,7 @@ class BaselineScheduler : public scheduler::SchedulerInterface std::mutex m_executeMutex; int64_t m_lastcommittedBlockNumber = -1; std::mutex m_commitMutex; - tbb::task_group m_notifyGroup; + tbb::task_group m_asyncGroup; struct ExecuteResult { @@ -224,7 +223,7 @@ class BaselineScheduler : public scheduler::SchedulerInterface std::vector m_receipts; protocol::Block::Ptr m_block; }; - std::list m_results; + std::deque m_results; std::mutex m_resultsMutex; /** @@ -302,6 +301,7 @@ class BaselineScheduler : public scheduler::SchedulerInterface scheduler.m_multiLayerStorage.pushMutableToImmutableFront(); scheduler.m_lastExecutedBlockNumber = blockHeader->number(); + scheduler.m_asyncGroup.run([view = std::move(view)]() {}); auto transactions = RANGES::views::transform(constTransactions, @@ -413,9 +413,9 @@ class BaselineScheduler : public scheduler::SchedulerInterface << " | elapsed: " << (current() - now) << "ms"; commitLock.unlock(); - scheduler.m_notifyGroup.run([&, result = std::move(result), - blockHash = ledgerConfig->hash(), - blockNumber = ledgerConfig->blockNumber()]() { + scheduler.m_asyncGroup.run([&, result = std::move(result), + blockHash = ledgerConfig->hash(), + blockNumber = ledgerConfig->blockNumber()]() { ittapi::Report report(ittapi::ITT_DOMAINS::instance().BASELINE_SCHEDULER, ittapi::ITT_DOMAINS::instance().NOTIFY_RESULTS); @@ -487,7 +487,7 @@ class BaselineScheduler : public scheduler::SchedulerInterface BaselineScheduler(BaselineScheduler&&) noexcept = default; BaselineScheduler& operator=(const BaselineScheduler&) = delete; BaselineScheduler& operator=(BaselineScheduler&&) noexcept = default; - ~BaselineScheduler() noexcept override = default; + ~BaselineScheduler() noexcept override { m_asyncGroup.wait(); } void executeBlock(bcos::protocol::Block::Ptr block, bool verify, std::function diff --git a/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h b/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h index a6599c2641..3b50eb7f0d 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h +++ b/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h @@ -34,10 +34,9 @@ class SchedulerParallelImpl { using ChunkStorage = storage2::memory_storage::MemoryStorage; + storage2::memory_storage::Attribute(storage2::memory_storage::LOGICAL_DELETION)>; std::unique_ptr m_asyncTaskGroup; - constexpr static size_t MIN_CHUNK_SIZE = 32; + constexpr static size_t MIN_CHUNK_SIZE = 16; size_t m_chunkSize = MIN_CHUNK_SIZE; size_t m_maxToken = 0; @@ -53,7 +52,7 @@ class SchedulerParallelImpl } int64_t m_chunkIndex = 0; - std::atomic_int64_t* m_lastChunkIndex = nullptr; + std::atomic_int64_t const& m_lastChunkIndex; Range m_transactionAndReceiptsRange; Executor& m_executor; MultiLayerStorage m_localStorage; @@ -62,10 +61,10 @@ class SchedulerParallelImpl m_localReadWriteSetStorage; public: - ChunkStatus(int64_t chunkIndex, std::atomic_int64_t& lastChunkIndex, + ChunkStatus(int64_t chunkIndex, std::atomic_int64_t const& lastChunkIndex, Range transactionAndReceiptsRange, Executor& executor, auto& storage) : m_chunkIndex(chunkIndex), - m_lastChunkIndex(std::addressof(lastChunkIndex)), + m_lastChunkIndex(lastChunkIndex), m_transactionAndReceiptsRange(transactionAndReceiptsRange), m_executor(executor), m_localStorage(storage), @@ -86,7 +85,7 @@ class SchedulerParallelImpl PARALLEL_SCHEDULER_LOG(DEBUG) << "Chunk " << m_chunkIndex << " executing..."; for (auto&& [contextID, transaction, receipt] : m_transactionAndReceiptsRange) { - if (m_chunkIndex >= *m_lastChunkIndex) + if (m_chunkIndex >= m_lastChunkIndex) { PARALLEL_SCHEDULER_LOG(DEBUG) << "Chunk " << m_chunkIndex << " execute aborted"; co_return; @@ -113,120 +112,156 @@ class SchedulerParallelImpl void setMaxToken(size_t maxToken) { m_maxToken = maxToken; } private: - friend task::Task> tag_invoke( - tag_t /*unused*/, SchedulerParallelImpl& scheduler, auto& storage, - auto& executor, protocol::BlockHeader const& blockHeader, - RANGES::input_range auto const& transactions, ledger::LedgerConfig const& ledgerConfig) + static task::Task mergeLastStorage( + SchedulerParallelImpl& scheduler, auto& storage, ChunkStorage&& lastStorage) + { + ittapi::Report mergeReport(ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, + ittapi::ITT_DOMAINS::instance().MERGE_LAST_CHUNK); + PARALLEL_SCHEDULER_LOG(DEBUG) << "Final merge lastStorage"; + co_await storage2::merge(storage, std::forward(lastStorage)); + } + + static void executeSinglePass(SchedulerParallelImpl& scheduler, auto& storage, auto& executor, + protocol::BlockHeader const& blockHeader, RANGES::input_range auto const& transactions, + ledger::LedgerConfig const& ledgerConfig, size_t offset, + std::vector& receipts) { ittapi::Report report(ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().PARALLEL_EXECUTE); - std::vector receipts(RANGES::size(transactions)); + ittapi::ITT_DOMAINS::instance().SINGLE_PASS); - size_t offset = 0; - size_t retryCount = 0; - while (offset < RANGES::size(transactions)) - { - ittapi::Report report(ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().SINGLE_PASS); - - auto currentTransactionAndReceipts = - RANGES::views::iota(offset, (size_t)RANGES::size(receipts)) | - RANGES::views::transform([&](auto index) { - return std::make_tuple(index, std::addressof(transactions[index]), - std::addressof(receipts[index])); - }); - std::atomic_int64_t lastChunkIndex{std::numeric_limits::max()}; - int64_t chunkIndex = 0; - ReadWriteSetStorage writeSet( - storage); - auto chunks = - currentTransactionAndReceipts | RANGES::views::chunk(scheduler.m_chunkSize); - using Chunk = SchedulerParallelImpl::ChunkStatus, - std::decay_t, RANGES::range_value_t>; - - PARALLEL_SCHEDULER_LOG(DEBUG) << "Start new chunk executing... " << offset << " | " - << RANGES::size(currentTransactionAndReceipts); - ChunkStorage lastStorage; - tbb::parallel_pipeline(scheduler.m_maxToken == 0 ? std::thread::hardware_concurrency() : - scheduler.m_maxToken, - tbb::make_filter>(tbb::filter_mode::serial_in_order, - [&](tbb::flow_control& control) -> std::unique_ptr { - if (chunkIndex >= RANGES::size(chunks)) + auto currentTransactionAndReceipts = + RANGES::views::iota(offset, (size_t)RANGES::size(receipts)) | + RANGES::views::transform([&](auto index) { + return std::make_tuple( + index, std::addressof(transactions[index]), std::addressof(receipts[index])); + }); + + auto count = RANGES::size(transactions); + std::atomic_int64_t lastChunkIndex{std::numeric_limits::max()}; + int64_t chunkIndex = 0; + ReadWriteSetStorage writeSet(storage); + + auto chunkSize = std::max( + MIN_CHUNK_SIZE, RANGES::size(transactions) / (std::thread::hardware_concurrency() * 2)); + auto chunks = currentTransactionAndReceipts | RANGES::views::chunk(chunkSize); + using Chunk = SchedulerParallelImpl::ChunkStatus, + std::decay_t, RANGES::range_value_t>; + + PARALLEL_SCHEDULER_LOG(DEBUG) << "Start new chunk executing... " << offset << " | " + << RANGES::size(currentTransactionAndReceipts); + ChunkStorage lastStorage; + tbb::parallel_pipeline( + scheduler.m_maxToken == 0 ? std::thread::hardware_concurrency() : scheduler.m_maxToken, + tbb::make_filter>(tbb::filter_mode::serial_in_order, + [&](tbb::flow_control& control) -> std::unique_ptr { + if (chunkIndex >= RANGES::size(chunks)) + { + control.stop(); + return {}; + } + PARALLEL_SCHEDULER_LOG(DEBUG) << "Chunk: " << chunkIndex; + auto chunk = std::make_unique( + chunkIndex, lastChunkIndex, chunks[chunkIndex], executor, storage); + ++chunkIndex; + return chunk; + }) & + tbb::make_filter, std::unique_ptr>( + tbb::filter_mode::parallel, + [&](std::unique_ptr chunk) -> std::unique_ptr { + auto index = chunk->chunkIndex(); + if (index >= lastChunkIndex) { - control.stop(); - return {}; + return chunk; } - PARALLEL_SCHEDULER_LOG(DEBUG) << "Chunk: " << chunkIndex; - auto chunk = std::make_unique( - chunkIndex, lastChunkIndex, chunks[chunkIndex], executor, storage); - ++chunkIndex; + task::tbb::syncWait(chunk->execute(blockHeader, ledgerConfig)); + return chunk; }) & - tbb::make_filter, std::unique_ptr>( - tbb::filter_mode::parallel, - [&](std::unique_ptr chunk) -> std::unique_ptr { - auto index = chunk->chunkIndex(); - if (index >= lastChunkIndex) + tbb::make_filter, void>( + tbb::filter_mode::serial_in_order, [&](std::unique_ptr chunk) { + auto index = chunk->chunkIndex(); + if (index >= lastChunkIndex) + { + return; + } + + if (index > 0) + { + bool hasRAW = false; { - return chunk; + ittapi::Report report( + ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, + ittapi::ITT_DOMAINS::instance().DETECT_RAW); + hasRAW = writeSet.hasRAWIntersection(chunk->readWriteSetStorage()); } - task::tbb::syncWait(chunk->execute(blockHeader, ledgerConfig)); - - return chunk; - }) & - tbb::make_filter, void>( - tbb::filter_mode::serial_in_order, [&](std::unique_ptr chunk) { - auto index = chunk->chunkIndex(); - if (index >= lastChunkIndex) + if (hasRAW) { + // 检测到读写集冲突,立即合并已完成的storage并开始新一轮执行,不等待当前pipeline执行结束 + // When a read/write set conflict is detected, the system + // immediately merges the completed storage and starts a new round + // of execution without waiting for the current pipeline execution + // to end + PARALLEL_SCHEDULER_LOG(DEBUG) + << "Detected RAW Intersection:" << index; + lastChunkIndex = index; + + task::tbb::syncWait( + mergeLastStorage(scheduler, storage, std::move(lastStorage))); + executeSinglePass(scheduler, storage, executor, blockHeader, + transactions, ledgerConfig, offset, receipts); + + scheduler.m_asyncTaskGroup->run( + [chunk = std::move(chunk), readWriteSet = + std::move(writeSet)]() {}); return; } + } - if (index > 0) - { + offset += (size_t)chunk->count(); + PARALLEL_SCHEDULER_LOG(DEBUG) + << "Merging... " << index << " | " << chunk->count(); + tbb::parallel_invoke( + [&]() { ittapi::Report report( ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().DETECT_RAW); - if (writeSet.hasRAWIntersection(chunk->readWriteSetStorage())) - { - PARALLEL_SCHEDULER_LOG(DEBUG) - << "Detected RAW Intersection:" << index; - lastChunkIndex = index; - scheduler.m_asyncTaskGroup->run( - [chunk = std::move(chunk)]() {}); - return; - } - } + ittapi::ITT_DOMAINS::instance().MERGE_RWSET); + writeSet.mergeWriteSet(chunk->readWriteSetStorage()); + }, + [&]() { + ittapi::Report report( + ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, + ittapi::ITT_DOMAINS::instance().MERGE_CHUNK); + task::tbb::syncWait(storage2::merge(lastStorage, + std::move(chunk->localStorage().mutableStorage()))); + }); - offset += (size_t)chunk->count(); - PARALLEL_SCHEDULER_LOG(DEBUG) - << "Merging... " << index << " | " << chunk->count(); - tbb::parallel_invoke( - [&]() { - ittapi::Report report( - ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().MERGE_RWSET); - writeSet.mergeWriteSet(chunk->readWriteSetStorage()); - }, - [&]() { - ittapi::Report report( - ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().MERGE_CHUNK); - task::tbb::syncWait(storage2::merge(lastStorage, - std::move(chunk->localStorage().mutableStorage()))); + // 成功执行到最后一个chunk,合并数据并结束 + // Successfully executes to the last chunk, merges the data, and ends + if (offset == count) + { + task::tbb::syncWait( + mergeLastStorage(scheduler, storage, std::move(lastStorage))); + scheduler.m_asyncTaskGroup->run( + [chunk = std::move(chunk), readWriteSet = std::move(writeSet)]() { }); - scheduler.m_asyncTaskGroup->run([chunk = std::move(chunk)]() {}); - })); + } + })); + } - ittapi::Report mergeReport(ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, - ittapi::ITT_DOMAINS::instance().MERGE_LAST_CHUNK); - PARALLEL_SCHEDULER_LOG(DEBUG) << "Final merge lastStorage"; - co_await storage2::merge(storage, lastStorage); + friend task::Task> tag_invoke( + tag_t /*unused*/, SchedulerParallelImpl& scheduler, auto& storage, + auto& executor, protocol::BlockHeader const& blockHeader, + RANGES::input_range auto const& transactions, ledger::LedgerConfig const& ledgerConfig) + { + ittapi::Report report(ittapi::ITT_DOMAINS::instance().PARALLEL_SCHEDULER, + ittapi::ITT_DOMAINS::instance().PARALLEL_EXECUTE); + std::vector receipts(RANGES::size(transactions)); - scheduler.m_asyncTaskGroup->run( - [lastStorage = std::move(lastStorage), readWriteSet = std::move(writeSet)]() {}); - ++retryCount; - } + size_t offset = 0; + size_t retryCount = 0; + + executeSinglePass(scheduler, storage, executor, blockHeader, transactions, ledgerConfig, + offset, receipts); PARALLEL_SCHEDULER_LOG(INFO) << "Parallel scheduler execute finished, retry counts: " << retryCount; diff --git a/transaction-scheduler/benchmark/benchmarkScheduler.cpp b/transaction-scheduler/benchmark/benchmarkScheduler.cpp index 5bad7ee7fc..2d01d58ed3 100644 --- a/transaction-scheduler/benchmark/benchmarkScheduler.cpp +++ b/transaction-scheduler/benchmark/benchmarkScheduler.cpp @@ -30,22 +30,12 @@ using namespace bcos::transaction_executor; constexpr static s256 singleIssue(1000000); constexpr static s256 singleTransfer(1); -using MutableStorage = MemoryStorage; +using MutableStorage = MemoryStorage; using BackendStorage = MemoryStorage>; using MultiLayerStorageType = MultiLayerStorage; using ReceiptFactory = bcostars::protocol::TransactionReceiptFactoryImpl; -namespace bcos::transaction_scheduler -{ -auto tag_invoke(storage2::tag_t /*unused*/, MutableStorage& storage, - RANGES::input_range auto&& keys, storage2::READ_FRONT_TYPE const& /*unused*/) - -> task::Task> -{ - co_return co_await storage2::readSome(storage, std::forward(keys)); -} -} // namespace bcos::transaction_scheduler - template struct Fixture { diff --git a/transaction-scheduler/tests/CMakeLists.txt b/transaction-scheduler/tests/CMakeLists.txt index 592ca90b84..1757fdb6e7 100644 --- a/transaction-scheduler/tests/CMakeLists.txt +++ b/transaction-scheduler/tests/CMakeLists.txt @@ -4,5 +4,5 @@ add_executable(test-transaction-scheduler ${SOURCES}) find_package(Boost REQUIRED serialization unit_test_framework) -target_link_libraries(test-transaction-scheduler transaction-scheduler ${TARS_PROTOCOL_TARGET} bcos-framework Boost::unit_test_framework) +target_link_libraries(test-transaction-scheduler transaction-scheduler ${TARS_PROTOCOL_TARGET} ${TABLE_TARGET} bcos-framework Boost::unit_test_framework) add_test(NAME test-transaction-scheduler WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND test-transaction-scheduler)