Skip to content

Commit

Permalink
使用新的 splice 工具发送文件,尽量填满 IO。
Browse files Browse the repository at this point in the history
  • Loading branch information
microcai committed Nov 18, 2024
1 parent 74e49b6 commit dbc095e
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 81 deletions.
137 changes: 90 additions & 47 deletions proxy/include/proxy/proxy_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ inline bool operator == (const std::pmr::string& a, const std::string& b)
return std::string_view(a) == std::string_view(b);
}

template<typename S, typename proxy_session>
concept supports_stream_rate_limit = requires(S s, proxy_session p)
{
p.stream_rate_limit(s, 1);
};

template<typename S, typename proxy_session>
concept supports_stream_expires_after = requires(S s, proxy_session p)
{
p.stream_expires_after(s, std::chrono::seconds(0));
};

template<typename S>
concept supports_shutdown = requires(S s)
{
s.shutdown(boost::asio::socket_base::shutdown_receive);
};

namespace proxy {

namespace net = boost::asio;
Expand Down Expand Up @@ -318,7 +336,7 @@ R"x*x*x(<html>
0x16, // ssl
};

inline const std::map<std::string, std::string> global_mimes =
inline const std::map<std::string_view, std::string_view> global_mimes =
{
{ ".html", "text/html; charset=utf-8" },
{ ".htm", "text/html; charset=utf-8" },
Expand Down Expand Up @@ -880,14 +898,11 @@ R"x*x*x(<html>
if (!ret)
co_return;

size_t l2r_transferred = 0;
size_t r2l_transferred = 0;

co_await(
transfer(m_local_socket, m_remote_socket, l2r_transferred)
auto [l2r_transferred, r2l_transferred] = co_await(
transfer(m_local_socket, m_remote_socket)
&&
transfer(m_remote_socket, m_local_socket, r2l_transferred)
);
transfer(m_remote_socket, m_local_socket)
);

XLOG_DBG << "connection id: "
<< m_connection_id
Expand Down Expand Up @@ -1819,13 +1834,10 @@ R"x*x*x(<html>
// 发起数据传输协程.
if (command == SOCKS_CMD_CONNECT)
{
size_t l2r_transferred = 0;
size_t r2l_transferred = 0;

co_await(
transfer(m_local_socket, m_remote_socket, l2r_transferred)
auto [l2r_transferred, r2l_transferred ] = co_await(
transfer(m_local_socket, m_remote_socket)
&&
transfer(m_remote_socket, m_local_socket, r2l_transferred)
transfer(m_remote_socket, m_local_socket)
);

XLOG_DBG << "connection id: "
Expand Down Expand Up @@ -2265,14 +2277,11 @@ R"x*x*x(<html>
if (error_code != SOCKS4_REQUEST_GRANTED)
co_return;

size_t l2r_transferred = 0;
size_t r2l_transferred = 0;

co_await(
transfer(m_local_socket, m_remote_socket, l2r_transferred)
auto [l2r_transferred , r2l_transferred ]= co_await(
transfer(m_local_socket, m_remote_socket)
&&
transfer(m_remote_socket, m_local_socket, r2l_transferred)
);
transfer(m_remote_socket, m_local_socket)
);

XLOG_DBG << "connection id: "
<< m_connection_id
Expand All @@ -2293,14 +2302,24 @@ R"x*x*x(<html>
net::awaitable<bool> socks_auth();

template<typename S1, typename S2>
net::awaitable<void> transfer(S1& from, S2& to, size_t& bytes_transferred)
net::awaitable<std::streamsize> transfer(S1& from, S2& to, std::streamsize bytes_to_be_sent = -1)
{
bytes_transferred = 0;

stream_rate_limit(from, m_option.tcp_rate_limit_);
stream_rate_limit(to, m_option.tcp_rate_limit_);
std::size_t bytes_transferred = 0;

stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));
if constexpr (supports_stream_rate_limit<S1, proxy_session>)
{
stream_rate_limit(from, m_option.tcp_rate_limit_);
}
if constexpr (supports_stream_rate_limit<S2, proxy_session>)
{
stream_rate_limit(to, m_option.tcp_rate_limit_);
}

if constexpr (supports_stream_expires_after<S1, proxy_session>)
{
stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));
}

constexpr auto buf_size = 512 * 1024;

Expand All @@ -2312,49 +2331,73 @@ R"x*x*x(<html>
auto secondary_buf = buf1.get();

// 首先邓读取第一个数据作为预备, 以用于后面的交替读写逻辑.
auto read_size = (bytes_to_be_sent == -1) ? buf_size : std::min<std::streamsize>(bytes_to_be_sent, buf_size);
boost::system::error_code ec;
auto bytes = co_await from.async_read_some(net::buffer(primary_buf, buf_size), net_awaitable[ec]);
auto bytes = co_await from.async_read_some(net::buffer(primary_buf, read_size), net_awaitable[ec]);
if (bytes_to_be_sent != -1) bytes_to_be_sent -= bytes;
if (ec || m_abort)
{
if (bytes > 0)
co_await net::async_write(to,
bytes_transferred += co_await net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec]);

to.shutdown(net::socket_base::shutdown_send, ec);
co_return;
co_return bytes_transferred;
}

for (; !m_abort;)
{
stream_expires_after(to, std::chrono::seconds(m_option.tcp_timeout_));
stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));
if constexpr (supports_stream_expires_after<S2, proxy_session>)
{
stream_expires_after(to, std::chrono::seconds(m_option.tcp_timeout_));
}

// 并发读写.
auto [write_bytes, read_bytes] =
co_await(
net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec])
&&
from.async_read_some(
net::buffer(secondary_buf, buf_size), net_awaitable[ec])
);
if constexpr (supports_stream_expires_after<S1, proxy_session>)
{
stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));
}

// 交换主从缓冲区.
std::swap(primary_buf, secondary_buf);
read_size = (bytes_to_be_sent == -1) ? buf_size : std::min<std::streamsize>(bytes_to_be_sent, buf_size);

bytes = read_bytes;
bytes_transferred += bytes;
if (read_size > 0)
{
// 并发读写.
auto [write_bytes, read_bytes] =
co_await(
net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec])
&&
from.async_read_some(
net::buffer(secondary_buf, read_size), net_awaitable[ec])
);

// 交换主从缓冲区.
std::swap(primary_buf, secondary_buf);

bytes = read_bytes;
if (bytes_to_be_sent != -1) bytes_to_be_sent -= bytes;
bytes_transferred += write_bytes;
}
else
{
bytes_transferred += co_await net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec]);
co_return bytes_transferred;
}

// 如果 async_write 失败, 则也无需要再读取数据, 如果
// async_read_some 失败, 则也无数据可用于写, 所以无论哪一种情况
// 都可以直接退出.
if (ec)
{
to.shutdown(net::socket_base::shutdown_send, ec);
from.shutdown(net::socket_base::shutdown_receive, ec);
co_return;
if constexpr (supports_shutdown<S2>)
to.shutdown(net::socket_base::shutdown_send, ec);
if constexpr (supports_shutdown<S1>)
from.shutdown(net::socket_base::shutdown_receive, ec);
co_return bytes_transferred;
}
}
co_return bytes_transferred;
}

template <typename Stream, typename Endpoint>
Expand Down
Loading

0 comments on commit dbc095e

Please sign in to comment.