Skip to content

Commit

Permalink
Merge pull request ceph#61079 from idryomov/wip-69178
Browse files Browse the repository at this point in the history
librbd/migration/HttpClient: avoid reusing ssl_stream after shut down

Reviewed-by: Ramana Raja <[email protected]>
  • Loading branch information
idryomov authored Dec 15, 2024
2 parents c0dae46 + 88557df commit 2a58a3d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 34 deletions.
8 changes: 8 additions & 0 deletions qa/suites/rbd/migration/6-prepare/qcow2-https.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
tasks:
- exec:
client.0:
- mkdir /home/ubuntu/cephtest/migration
- qemu-img create -f qcow2 /home/ubuntu/cephtest/migration/empty.qcow2 1G
- echo '{"type":"qcow","stream":{"type":"http","url":"https://download.ceph.com/qa/ubuntu-12.04.qcow2"}}' | rbd migration prepare --import-only --source-spec-path - client.0.0
- rbd migration prepare --import-only --source-spec '{"type":"qcow","stream":{"type":"file","file_path":"/home/ubuntu/cephtest/migration/empty.qcow2"}}' client.0.1
- rbd migration prepare --import-only --source-spec '{"type":"qcow","stream":{"type":"file","file_path":"/home/ubuntu/cephtest/migration/empty.qcow2"}}' client.0.2
90 changes: 56 additions & 34 deletions src/librbd/migration/HttpClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
m_on_shutdown = on_finish;

auto current_state = m_state;
m_state = STATE_SHUTTING_DOWN;

if (current_state == STATE_UNINITIALIZED) {
// never initialized or resolve/connect failed
on_finish->complete(0);
return;
}

m_state = STATE_SHUTTING_DOWN;
if (current_state != STATE_READY) {
} else if (current_state != STATE_READY) {
// delay shutdown until current state transition completes
return;
}
Expand Down Expand Up @@ -118,7 +117,7 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
ceph_assert(m_http_client->m_strand.running_in_this_thread());

auto cct = m_http_client->m_cct;
ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
ldout(cct, 20) << "work=" << work.get() << ", ec=" << ec.what() << dendl;

ceph_assert(m_in_flight_requests > 0);
--m_in_flight_requests;
Expand Down Expand Up @@ -187,6 +186,7 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
virtual void connect(boost::asio::ip::tcp::resolver::results_type results,
Context* on_finish) = 0;
virtual void disconnect(Context* on_finish) = 0;
virtual void reset_stream() = 0;

void close_socket() {
auto cct = m_http_client->m_cct;
Expand Down Expand Up @@ -229,7 +229,6 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
auto cct = m_http_client->m_cct;
ldout(cct, 15) << dendl;

shutdown_socket();
m_resolver.async_resolve(
m_http_client->m_url_spec.host, m_http_client->m_url_spec.port,
[this, on_finish](boost::system::error_code ec, auto results) {
Expand Down Expand Up @@ -414,7 +413,7 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
void handle_receive(boost::system::error_code ec,
std::shared_ptr<Work>&& work) {
auto cct = m_http_client->m_cct;
ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
ldout(cct, 15) << "work=" << work.get() << ", ec=" << ec.what() << dendl;

ceph_assert(m_in_flight_requests > 0);
--m_in_flight_requests;
Expand Down Expand Up @@ -445,10 +444,10 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
m_receive_queue.push_front(work);
} else if (ec == boost::beast::error::timeout) {
lderr(cct) << "timed-out while issuing request" << dendl;
lderr(cct) << "timed-out while receiving response" << dendl;
work->complete(-ETIMEDOUT, {});
} else {
lderr(cct) << "failed to issue request: " << ec.message() << dendl;
lderr(cct) << "failed to receive response: " << ec.message() << dendl;
work->complete(-ec.value(), {});
}

Expand All @@ -473,7 +472,7 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
r = -EACCES;
} else if (boost::beast::http::to_status_class(result) !=
boost::beast::http::status_class::successful) {
lderr(cct) << "failed to retrieve size: HTTP " << result << dendl;
lderr(cct) << "failed to retrieve resource: HTTP " << result << dendl;
r = -EIO;
}

Expand Down Expand Up @@ -501,7 +500,10 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
<< "next_state=" << next_state << ", "
<< "r=" << r << dendl;

m_state = next_state;
if (current_state != STATE_SHUTTING_DOWN) {
m_state = next_state;
}

if (current_state == STATE_CONNECTING) {
if (next_state == STATE_UNINITIALIZED) {
shutdown_socket();
Expand All @@ -512,14 +514,17 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
return;
}
} else if (current_state == STATE_SHUTTING_DOWN) {
ceph_assert(m_on_shutdown != nullptr);
if (next_state == STATE_READY) {
// shut down requested while connecting/resetting
disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
return;
} else if (next_state == STATE_UNINITIALIZED ||
next_state == STATE_SHUTDOWN ||
next_state == STATE_RESET_CONNECTING) {
ceph_assert(m_on_shutdown != nullptr);
shutdown_socket();
m_on_shutdown->complete(r);
return;
} else if (next_state == STATE_SHUTDOWN) {
m_on_shutdown->complete(r);
return;
}
Expand All @@ -528,6 +533,7 @@ class HttpClient<I>::HttpSession : public HttpSessionInterface {
ceph_assert(next_state == STATE_RESET_CONNECTING);
ceph_assert(on_finish == nullptr);
shutdown_socket();
reset_stream();
resolve_host(nullptr);
return;
} else if (current_state == STATE_RESET_CONNECTING) {
Expand Down Expand Up @@ -601,6 +607,7 @@ class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> {
auto cct = http_client->m_cct;
ldout(cct, 15) << dendl;

ceph_assert(!m_stream.socket().is_open());
m_stream.async_connect(
results,
[on_finish](boost::system::error_code ec, const auto& endpoint) {
Expand All @@ -612,9 +619,12 @@ class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> {
on_finish->complete(0);
}

void reset_stream() override {
// no-op -- tcp_stream object can be reused after shut down
}

private:
boost::beast::tcp_stream m_stream;

};

#undef dout_prefix
Expand Down Expand Up @@ -645,6 +655,7 @@ class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> {
auto cct = http_client->m_cct;
ldout(cct, 15) << dendl;

ceph_assert(!boost::beast::get_lowest_layer(m_stream).socket().is_open());
boost::beast::get_lowest_layer(m_stream).async_connect(
results,
[this, on_finish](boost::system::error_code ec, const auto& endpoint) {
Expand All @@ -657,19 +668,25 @@ class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> {
auto cct = http_client->m_cct;
ldout(cct, 15) << dendl;

if (!m_ssl_enabled) {
on_finish->complete(0);
return;
}

m_stream.async_shutdown(
asio::util::get_callback_adapter([this, on_finish](int r) {
shutdown(r, on_finish); }));
[this, on_finish](boost::system::error_code ec) {
handle_disconnect(ec, on_finish);
});
}

void reset_stream() override {
auto http_client = this->m_http_client;
auto cct = http_client->m_cct;
ldout(cct, 15) << dendl;

// ssl_stream object can't be reused after shut down -- move-in
// a freshly constructed instance
m_stream = boost::beast::ssl_stream<boost::beast::tcp_stream>(
http_client->m_strand, http_client->m_ssl_context);
}

private:
boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream;
bool m_ssl_enabled = false;

void handle_connect(int r, Context* on_finish) {
auto http_client = this->m_http_client;
Expand Down Expand Up @@ -728,33 +745,38 @@ class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> {
// Perform the SSL/TLS handshake
m_stream.async_handshake(
boost::asio::ssl::stream_base::client,
asio::util::get_callback_adapter(
[this, on_finish](int r) { handle_handshake(r, on_finish); }));
[this, on_finish](boost::system::error_code ec) {
handle_handshake(ec, on_finish);
});
}

void handle_handshake(int r, Context* on_finish) {
void handle_handshake(boost::system::error_code ec, Context* on_finish) {
auto http_client = this->m_http_client;
auto cct = http_client->m_cct;
ldout(cct, 15) << "r=" << r << dendl;
ldout(cct, 15) << "ec=" << ec.what() << dendl;

if (r < 0) {
lderr(cct) << "failed to complete handshake: " << cpp_strerror(r)
if (ec) {
lderr(cct) << "failed to complete SSL handshake: " << ec.message()
<< dendl;
disconnect(new LambdaContext([r, on_finish](int) {
on_finish->complete(r); }));
on_finish->complete(-ec.value());
return;
}

m_ssl_enabled = true;
on_finish->complete(0);
}

void shutdown(int r, Context* on_finish) {
void handle_disconnect(boost::system::error_code ec, Context* on_finish) {
auto http_client = this->m_http_client;
auto cct = http_client->m_cct;
ldout(cct, 15) << "r=" << r << dendl;
ldout(cct, 15) << "ec=" << ec.what() << dendl;

on_finish->complete(r);
if (ec && ec != boost::asio::ssl::error::stream_truncated) {
lderr(cct) << "failed to shut down SSL: " << ec.message() << dendl;
on_finish->complete(-ec.value());
return;
}

on_finish->complete(0);
}
};

Expand Down

0 comments on commit 2a58a3d

Please sign in to comment.