Skip to content

Commit

Permalink
GH-1690 Socket should only be accessed from thread-pool thread. Use a…
Browse files Browse the repository at this point in the history
…sync_write instead of send. Wait for all write callbacks to finish before exit.
  • Loading branch information
heifner committed Oct 24, 2023
1 parent def1432 commit db98dbc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
31 changes: 26 additions & 5 deletions tests/trx_generator/trx_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,36 @@ namespace eosio::testing {
}

void p2p_connection::disconnect() {
ilog("Closing socket.");
_p2p_socket.close();
ilog("Socket closed.");
int max = 30;
int waited = 0;
while (_sent.load() != _sent_callback_num.load() && waited < max) {
ilog("disconnect waiting on ack - sent ${s} | acked ${a} | waited ${w}",
("s", _sent.load())("a", _sent_callback_num.load())("w", waited));
sleep(1);
++waited;
}
if (waited == max) {
elog("disconnect failed to receive all acks in time - sent ${s} | acked ${a} | waited ${w}",
("s", _sent.load())("a", _sent_callback_num.load())("w", waited));
}
}

void p2p_connection::send_transaction(const chain::packed_transaction& trx) {
send_buffer_type msg = create_send_buffer(trx);
_p2p_socket.send(boost::asio::buffer(*msg));
trx_acknowledged(trx.id(), fc::time_point::min()); //using min to identify ack time as not applicable for p2p

++_sent;
_strand.post( [this, msg{std::move(msg)}, id{trx.id()}]() {
boost::asio::async_write( _p2p_socket, boost::asio::buffer(*msg),
boost::asio::bind_executor( _strand, [this, msg, id]( boost::system::error_code ec, std::size_t w ) {
if (ec) {
elog("async write failure: ${e}", ("e", ec.message()));
} else {
trx_acknowledged(id, fc::time_point::min()); //using min to identify ack time as not applicable for p2p
}
++_sent_callback_num;
})
);
} );
}

acked_trx_trace_info p2p_connection::get_acked_trx_trace_info(const eosio::chain::transaction_id_type& trx_id) {
Expand Down
15 changes: 12 additions & 3 deletions tests/trx_generator/trx_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

#include<eosio/chain/transaction.hpp>
#include<eosio/chain/block.hpp>
#include<boost/asio/ip/tcp.hpp>
#include<fc/network/message_buffer.hpp>
#include<eosio/chain/thread_utils.hpp>

#include<fc/network/message_buffer.hpp>

#include<boost/asio/ip/tcp.hpp>
#include<boost/asio/strand.hpp>

#include<chrono>
#include<thread>
#include<variant>
Expand Down Expand Up @@ -104,10 +108,15 @@ namespace eosio::testing {

struct p2p_connection : public provider_connection {
boost::asio::ip::tcp::socket _p2p_socket;
boost::asio::io_context::strand _strand;
std::atomic<uint64_t> _sent_callback_num{0};
std::atomic<uint64_t> _sent{0};


explicit p2p_connection(const provider_base_config& provider_config)
: provider_connection(provider_config)
, _p2p_socket(_connection_thread_pool.get_executor()) {}
, _p2p_socket(_connection_thread_pool.get_executor())
, _strand(_connection_thread_pool.get_executor()){}

void send_transaction(const chain::packed_transaction& trx) final;

Expand Down

0 comments on commit db98dbc

Please sign in to comment.