diff --git a/.flake8 b/.flake8 index 8de5f32f05..7430ea37b8 100644 --- a/.flake8 +++ b/.flake8 @@ -110,7 +110,7 @@ per-file-ignores = cheroot/test/test_dispatch.py: DAR101, DAR201, S101, WPS111, WPS121, WPS302, WPS422, WPS430 cheroot/test/test_ssl.py: C818, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, S101, S309, S404, S603, WPS100, WPS110, WPS111, WPS114, WPS121, WPS130, WPS201, WPS202, WPS204, WPS210, WPS211, WPS218, WPS219, WPS222, WPS226, WPS231, WPS300, WPS301, WPS317, WPS318, WPS324, WPS326, WPS335, WPS336, WPS337, WPS352, WPS408, WPS420, WPS421, WPS422, WPS432, WPS436, WPS440, WPS441, WPS442, WPS450, WPS509, WPS510, WPS608 cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS226, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608 - cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526 + cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS402, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526 cheroot/test/webtest.py: B007, DAR101, DAR201, DAR401, I001, I003, I004, N802, RST303, RST304, S101, S104, WPS100, WPS110, WPS111, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS220, WPS221, WPS223, WPS229, WPS230, WPS231, WPS236, WPS301, WPS306, WPS317, WPS323, WPS326, WPS338, WPS361, WPS414, WPS420, WPS421, WPS422, WPS430, WPS432, WPS433, WPS437, WPS440, WPS501, WPS503, WPS505, WPS601 cheroot/testing.py: B014, C815, DAR101, DAR201, DAR301, I001, I003, S104, WPS100, WPS202, WPS211, WPS229, WPS301, WPS306, WPS317, WPS414, WPS420, WPS422, WPS430, WPS503, WPS526 cheroot/workers/threadpool.py: B007, DAR101, DAR201, E800, I001, I003, I004, RST201, RST203, RST301, WPS100, WPS110, WPS111, WPS121, WPS125, WPS211, WPS214, WPS220, WPS229, WPS230, WPS231, WPS304, WPS306, WPS317, WPS318, WPS322, WPS326, WPS335, WPS338, WPS362, WPS410, WPS414, WPS420, WPS422, WPS428, WPS432, WPS440, WPS462, WPS501, WPS505, WPS601, WPS602, WPS609 diff --git a/cheroot/test/test_conn.py b/cheroot/test/test_conn.py index 38a6245715..078d4d4d58 100644 --- a/cheroot/test/test_conn.py +++ b/cheroot/test/test_conn.py @@ -1,6 +1,7 @@ """Tests for TCP connection handling, including proper and timely close.""" import errno +from re import match as _matches_pattern import socket import time import logging @@ -700,6 +701,263 @@ def _close_kernel_socket(self): assert _close_kernel_socket.exception_leaked is exception_leaks +def test_broken_connection_during_http_communication_fallback( # noqa: WPS118 + monkeypatch, + test_client, + testing_server, + wsgi_server_thread, +): + """Test that unhandled internal error cascades into shutdown.""" + def _raise_connection_reset(*_args, **_kwargs): + raise ConnectionResetError(666) + + def _read_request_line(self): + monkeypatch.setattr(self.conn.rfile, 'close', _raise_connection_reset) + monkeypatch.setattr(self.conn.wfile, 'write', _raise_connection_reset) + _raise_connection_reset() + + monkeypatch.setattr( + test_client.server_instance.ConnectionClass.RequestHandlerClass, + 'read_request_line', + _read_request_line, + ) + + test_client.get_connection().send(b'GET / HTTP/1.1') + wsgi_server_thread.join() # no extra logs upon server termination + + actual_log_entries = testing_server.error_log.calls[:] + testing_server.error_log.calls.clear() # prevent post-test assertions + + expected_log_entries = ( + (logging.WARNING, r'^socket\.error 666$'), + ( + logging.INFO, + '^Got a connection error while handling a connection ' + r'from .*:\d{1,5} \(666\)', + ), + ( + logging.CRITICAL, + r'A fatal exception happened\. Setting the server interrupt flag ' + r'to ConnectionResetError\(666\) and giving up\.\n\nPlease, ' + 'report this on the Cheroot tracker at ' + r', ' + 'providing a full reproducer with as much context and details ' + r'as possible\.$', + ), + ) + + assert len(actual_log_entries) == len(expected_log_entries) + + for ( # noqa: WPS352 + (expected_log_level, expected_msg_regex), + (actual_msg, actual_log_level, _tb), + ) in zip(expected_log_entries, actual_log_entries): + assert expected_log_level == actual_log_level + assert _matches_pattern(expected_msg_regex, actual_msg) is not None, ( + f'{actual_msg !r} does not match {expected_msg_regex !r}' + ) + + +def test_kb_int_from_http_handler( + test_client, + testing_server, + wsgi_server_thread, +): + """Test that a keyboard interrupt from HTTP handler causes shutdown.""" + def _trigger_kb_intr(_req, _resp): + raise KeyboardInterrupt('simulated test handler keyboard interrupt') + testing_server.wsgi_app.handlers['/kb_intr'] = _trigger_kb_intr + + http_conn = test_client.get_connection() + http_conn.putrequest('GET', '/kb_intr', skip_host=True) + http_conn.putheader('Host', http_conn.host) + http_conn.endheaders() + wsgi_server_thread.join() # no extra logs upon server termination + + actual_log_entries = testing_server.error_log.calls[:] + testing_server.error_log.calls.clear() # prevent post-test assertions + + expected_log_entries = ( + ( + logging.DEBUG, + '^Got a server shutdown request while handling a connection ' + r'from .*:\d{1,5} \(simulated test handler keyboard interrupt\)$', + ), + ( + logging.DEBUG, + '^Setting the server interrupt flag to KeyboardInterrupt' + r"\('simulated test handler keyboard interrupt'\)$", + ), + ( + logging.INFO, + '^Keyboard Interrupt: shutting down$', + ), + ) + + assert len(actual_log_entries) == len(expected_log_entries) + + for ( # noqa: WPS352 + (expected_log_level, expected_msg_regex), + (actual_msg, actual_log_level, _tb), + ) in zip(expected_log_entries, actual_log_entries): + assert expected_log_level == actual_log_level + assert _matches_pattern(expected_msg_regex, actual_msg) is not None, ( + f'{actual_msg !r} does not match {expected_msg_regex !r}' + ) + + +def test_unhandled_exception_in_request_handler( + mocker, + monkeypatch, + test_client, + testing_server, + wsgi_server_thread, +): + """Ensure worker threads are resilient to in-handler exceptions.""" + + class SillyMistake(BaseException): # noqa: WPS418, WPS431 + """A simulated crash within an HTTP handler.""" + + def _trigger_scary_exc(_req, _resp): + raise SillyMistake('simulated unhandled exception 💣 in test handler') + + testing_server.wsgi_app.handlers['/scary_exc'] = _trigger_scary_exc + + server_connection_close_spy = mocker.spy( + test_client.server_instance.ConnectionClass, + 'close', + ) + + http_conn = test_client.get_connection() + http_conn.putrequest('GET', '/scary_exc', skip_host=True) + http_conn.putheader('Host', http_conn.host) + http_conn.endheaders() + + # NOTE: This spy ensure the log entry gets recorded before we're testing + # NOTE: them and before server shutdown, preserving their order and making + # NOTE: the log entry presence non-flaky. + while not server_connection_close_spy.called: # noqa: WPS328 + pass + + assert len(testing_server.requests._threads) == 10 + while testing_server.requests.idle < 10: # noqa: WPS328 + pass + assert len(testing_server.requests._threads) == 10 + testing_server.interrupt = SystemExit('test requesting shutdown') + assert not testing_server.requests._threads + wsgi_server_thread.join() # no extra logs upon server termination + + actual_log_entries = testing_server.error_log.calls[:] + testing_server.error_log.calls.clear() # prevent post-test assertions + + expected_log_entries = ( + ( + logging.ERROR, + '^Unhandled error while processing an incoming connection ' + 'SillyMistake' + r"\('simulated unhandled exception 💣 in test handler'\)$", + ), + ( + logging.INFO, + '^SystemExit raised: shutting down$', + ), + ) + + assert len(actual_log_entries) == len(expected_log_entries) + + for ( # noqa: WPS352 + (expected_log_level, expected_msg_regex), + (actual_msg, actual_log_level, _tb), + ) in zip(expected_log_entries, actual_log_entries): + assert expected_log_level == actual_log_level + assert _matches_pattern(expected_msg_regex, actual_msg) is not None, ( + f'{actual_msg !r} does not match {expected_msg_regex !r}' + ) + + +def test_remains_alive_post_unhandled_exception( + mocker, + monkeypatch, + test_client, + testing_server, + wsgi_server_thread, +): + """Ensure worker threads are resilient to unhandled exceptions.""" + + class ScaryCrash(BaseException): # noqa: WPS418, WPS431 + """A simulated crash during HTTP parsing.""" + + _orig_read_request_line = ( + test_client.server_instance. + ConnectionClass.RequestHandlerClass. + read_request_line + ) + + def _read_request_line(self): + _orig_read_request_line(self) + raise ScaryCrash(666) + + monkeypatch.setattr( + test_client.server_instance.ConnectionClass.RequestHandlerClass, + 'read_request_line', + _read_request_line, + ) + + server_connection_close_spy = mocker.spy( + test_client.server_instance.ConnectionClass, + 'close', + ) + + # NOTE: The initial worker thread count is 10. + assert len(testing_server.requests._threads) == 10 + + test_client.get_connection().send(b'GET / HTTP/1.1') + + # NOTE: This spy ensure the log entry gets recorded before we're testing + # NOTE: them and before server shutdown, preserving their order and making + # NOTE: the log entry presence non-flaky. + while not server_connection_close_spy.called: # noqa: WPS328 + pass + + # NOTE: This checks for whether there's any crashed threads + while testing_server.requests.idle < 10: # noqa: WPS328 + pass + assert len(testing_server.requests._threads) == 10 + assert all( + worker_thread.is_alive() + for worker_thread in testing_server.requests._threads + ) + testing_server.interrupt = SystemExit('test requesting shutdown') + assert not testing_server.requests._threads + wsgi_server_thread.join() # no extra logs upon server termination + + actual_log_entries = testing_server.error_log.calls[:] + testing_server.error_log.calls.clear() # prevent post-test assertions + + expected_log_entries = ( + ( + logging.ERROR, + '^Unhandled error while processing an incoming connection ' + r'ScaryCrash\(666\)$', + ), + ( + logging.INFO, + '^SystemExit raised: shutting down$', + ), + ) + + assert len(actual_log_entries) == len(expected_log_entries) + + for ( # noqa: WPS352 + (expected_log_level, expected_msg_regex), + (actual_msg, actual_log_level, _tb), + ) in zip(expected_log_entries, actual_log_entries): + assert expected_log_level == actual_log_level + assert _matches_pattern(expected_msg_regex, actual_msg) is not None, ( + f'{actual_msg !r} does not match {expected_msg_regex !r}' + ) + + @pytest.mark.parametrize( 'timeout_before_headers', ( diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index 87fe2557a7..cd28450a3a 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -6,6 +6,7 @@ """ import collections +import logging import threading import time import socket @@ -107,14 +108,38 @@ def run(self): from the inner-layer code constitute a global server interrupt request. When they happen, the worker thread exits. + :raises BaseException: when an unexpected non-interrupt + exception leaks from the inner layers + # noqa: DAR401 KeyboardInterrupt SystemExit """ self.server.stats['Worker Threads'][self.name] = self.stats self.ready = True try: self._process_connections_until_interrupted() - except (KeyboardInterrupt, SystemExit) as ex: - self.server.interrupt = ex + except (KeyboardInterrupt, SystemExit) as interrupt_exc: + interrupt_cause = interrupt_exc.__cause__ or interrupt_exc + self.server.error_log( + f'Setting the server interrupt flag to {interrupt_cause !r}', + level=logging.DEBUG, + ) + self.server.interrupt = interrupt_cause + except BaseException as underlying_exc: # noqa: WPS424 + # NOTE: This is the last resort logging with the last dying breath + # NOTE: of the worker. It is only reachable when exceptions happen + # NOTE: in the `finally` branch of the internal try/except block. + self.server.error_log( + 'A fatal exception happened. Setting the server interrupt flag' + f' to {underlying_exc !r} and giving up.' + '\N{NEW LINE}\N{NEW LINE}' + 'Please, report this on the Cheroot tracker at ' + ', ' + 'providing a full reproducer with as much context and details as possible.', + level=logging.CRITICAL, + traceback=True, + ) + self.server.interrupt = underlying_exc + raise finally: self.ready = False @@ -123,6 +148,9 @@ def _process_connections_until_interrupted(self): Retrieves incoming connections from thread pool, processing them one by one. + + :raises SystemExit: on the internal requests to stop the + server instance """ while True: conn = self.server.requests.get() @@ -136,7 +164,52 @@ def _process_connections_until_interrupted(self): keep_conn_open = False try: keep_conn_open = conn.communicate() + except ConnectionError as connection_error: + keep_conn_open = False # Drop the connection cleanly + self.server.error_log( + 'Got a connection error while handling a ' + f'connection from {conn.remote_addr !s}:' + f'{conn.remote_port !s} ({connection_error !s})', + level=logging.INFO, + ) + continue + except (KeyboardInterrupt, SystemExit) as shutdown_request: + # Shutdown request + keep_conn_open = False # Drop the connection cleanly + self.server.error_log( + 'Got a server shutdown request while handling a ' + f'connection from {conn.remote_addr !s}:' + f'{conn.remote_port !s} ({shutdown_request !s})', + level=logging.DEBUG, + ) + raise SystemExit( + str(shutdown_request), + ) from shutdown_request + except BaseException as unhandled_error: # noqa: WPS424 + # NOTE: Only a shutdown request should bubble up to the + # NOTE: external cleanup code. Otherwise, this thread dies. + # NOTE: If this were to happen, the threadpool would still + # NOTE: list a dead thread without knowing its state. And + # NOTE: the calling code would fail to schedule processing + # NOTE: of new requests. + self.server.error_log( + 'Unhandled error while processing an incoming ' + f'connection {unhandled_error !r}', + level=logging.ERROR, + traceback=True, + ) + continue # Prevent the thread from dying finally: + # NOTE: Any exceptions coming from within `finally` may + # NOTE: kill the thread, causing the threadpool to only + # NOTE: contain references to dead threads rendering the + # NOTE: server defunct, effectively meaning a DoS. + # NOTE: Ideally, things called here should process + # NOTE: everything recoverable internally. Any unhandled + # NOTE: errors will bubble up into the outer try/except + # NOTE: block. They will be treated as fatal and turned + # NOTE: into server shutdown requests and then reraised + # NOTE: unconditionally. if keep_conn_open: self.server.put_conn(conn) else: