From a653f314ff1c14b9f7acc5bfe1eaa78bcc4ad260 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Fri, 4 Nov 2016 23:01:46 +1300 Subject: [PATCH] proxy.protocol.http: flatten for refactoring Flatten all of _process_flow, so we can see what's going on in there. --- mitmproxy/proxy/protocol/http.py | 415 ++++++++++++++----------------- 1 file changed, 188 insertions(+), 227 deletions(-) diff --git a/mitmproxy/proxy/protocol/http.py b/mitmproxy/proxy/protocol/http.py index 5412827f3..97105324b 100644 --- a/mitmproxy/proxy/protocol/http.py +++ b/mitmproxy/proxy/protocol/http.py @@ -18,14 +18,6 @@ class _HttpTransmissionLayer(base.Layer): def read_request_body(self, request): raise NotImplementedError() - def read_request(self, f): - request = self.read_request_headers(f) - request.data.content = b"".join( - self.read_request_body(request) - ) - request.timestamp_end = time.time() - return request - def send_request(self, request): raise NotImplementedError() @@ -146,9 +138,39 @@ class HttpLayer(base.Layer): def _process_flow(self, f): try: - request = self.get_request_from_client(f) + request = self.read_request_headers(f) + request.data.content = b"".join( + self.read_request_body(request) + ) + request.timestamp_end = time.time() + f.request = request + self.channel.ask("requestheaders", f) + if request.headers.get("expect", "").lower() == "100-continue": + # TODO: We may have to use send_response_headers for HTTP2 here. + self.send_response(http.expect_continue_response) + request.headers.pop("expect") + request.content = b"".join(self.read_request_body(request)) + request.timestamp_end = time.time() + # Make sure that the incoming request matches our expectations - self.validate_request(request) + if request.first_line_format == "absolute" and request.scheme != "http": + raise exceptions.HttpException("Invalid request scheme: %s" % request.scheme) + + expected_request_forms = { + "regular": ("authority", "absolute",), + "upstream": ("authority", "absolute"), + "transparent": ("relative",) + } + + allowed_request_forms = expected_request_forms[self.mode] + if request.first_line_format not in allowed_request_forms: + err_message = "Invalid HTTP request form (expected: %s, got: %s)" % ( + " or ".join(allowed_request_forms), request.first_line_format + ) + raise exceptions.HttpException(err_message) + + if self.mode == "regular" and request.first_line_format == "absolute": + request.first_line_format = "relative" except exceptions.HttpReadDisconnect: # don't throw an error for disconnects that happen before/between requests. return False @@ -174,7 +196,11 @@ class HttpLayer(base.Layer): try: # Regular Proxy Mode: Handle CONNECT if self.mode == "regular" and request.first_line_format == "authority": - self.handle_regular_mode_connect(request) + self.http_authenticated = True + self.set_server((request.host, request.port)) + self.send_response(http.make_connect_response(request.data.http_version)) + layer = self.ctx.next_layer(self) + layer() return False except (exceptions.ProtocolException, exceptions.NetlibException) as e: # HTTPS tasting means that ordinary errors like resolution and @@ -191,164 +217,7 @@ class HttpLayer(base.Layer): # set upstream auth if self.mode == "upstream" and self.config.upstream_auth is not None: f.request.headers["Proxy-Authorization"] = self.config.upstream_auth - self.process_request_hook(f) - try: - if websockets.check_handshake(request.headers) and websockets.check_client_version(request.headers): - # We only support RFC6455 with WebSockets version 13 - # allow inline scripts to manipulate the client handshake - self.channel.ask("websocket_handshake", f) - - if not f.response: - self.establish_server_connection( - f.request.host, - f.request.port, - f.request.scheme - ) - self.get_response_from_server(f) - else: - # response was set by an inline script. - # we now need to emulate the responseheaders hook. - self.channel.ask("responseheaders", f) - - self.log("response", "debug", [repr(f.response)]) - self.channel.ask("response", f) - self.send_response_to_client(f) - - if self.check_close_connection(f): - return False - - # Handle 101 Switching Protocols - if f.response.status_code == 101: - self.handle_101_switching_protocols(f) - return False # should never be reached - - # Upstream Proxy Mode: Handle CONNECT - if f.request.first_line_format == "authority" and f.response.status_code == 200: - self.handle_upstream_mode_connect(f.request.copy()) - return False - - except (exceptions.ProtocolException, exceptions.NetlibException) as e: - self.send_error_response(502, repr(e)) - if not f.response: - f.error = flow.Error(str(e)) - self.channel.ask("error", f) - return False - else: - raise exceptions.ProtocolException( - "Error in HTTP connection: %s" % repr(e) - ) - finally: - if f: - f.live = False - - return True - - def get_request_from_client(self, f): - request = self.read_request(f) - f.request = request - self.channel.ask("requestheaders", f) - if request.headers.get("expect", "").lower() == "100-continue": - # TODO: We may have to use send_response_headers for HTTP2 here. - self.send_response(http.expect_continue_response) - request.headers.pop("expect") - request.content = b"".join(self.read_request_body(request)) - request.timestamp_end = time.time() - return request - - def send_error_response(self, code, message, headers=None): - try: - response = http.make_error_response(code, message, headers) - self.send_response(response) - except (exceptions.NetlibException, h2.exceptions.H2Error, exceptions.Http2ProtocolException): - self.log(traceback.format_exc(), "debug") - - def change_upstream_proxy_server(self, address): - # Make set_upstream_proxy_server always available, - # even if there's no UpstreamConnectLayer - if address != self.server_conn.address: - return self.set_server(address) - - def handle_regular_mode_connect(self, request): - self.http_authenticated = True - self.set_server((request.host, request.port)) - self.send_response(http.make_connect_response(request.data.http_version)) - layer = self.ctx.next_layer(self) - layer() - - def handle_upstream_mode_connect(self, connect_request): - layer = UpstreamConnectLayer(self, connect_request) - layer() - - def send_response_to_client(self, f): - if not f.response.stream: - # no streaming: - # we already received the full response from the server and can - # send it to the client straight away. - self.send_response(f.response) - else: - # streaming: - # First send the headers and then transfer the response incrementally - self.send_response_headers(f.response) - chunks = self.read_response_body( - f.request, - f.response - ) - if callable(f.response.stream): - chunks = f.response.stream(chunks) - self.send_response_body(f.response, chunks) - f.response.timestamp_end = time.time() - - def get_response_from_server(self, f): - def get_response(): - self.send_request(f.request) - f.response = self.read_response_headers() - - try: - get_response() - except exceptions.NetlibException as e: - self.log( - "server communication error: %s" % repr(e), - level="debug" - ) - # In any case, we try to reconnect at least once. This is - # necessary because it might be possible that we already - # initiated an upstream connection after clientconnect that - # has already been expired, e.g consider the following event - # log: - # > clientconnect (transparent mode destination known) - # > serverconnect (required for client tls handshake) - # > read n% of large request - # > server detects timeout, disconnects - # > read (100-n)% of large request - # > send large request upstream - - if isinstance(e, exceptions.Http2ProtocolException): - # do not try to reconnect for HTTP2 - raise exceptions.ProtocolException("First and only attempt to get response via HTTP2 failed.") - - self.disconnect() - self.connect() - get_response() - - # call the appropriate script hook - this is an opportunity for an - # inline script to set f.stream = True - self.channel.ask("responseheaders", f) - - if f.response.stream: - f.response.data.content = None - else: - f.response.data.content = b"".join(self.read_response_body( - f.request, - f.response - )) - f.response.timestamp_end = time.time() - - # no further manipulation of self.server_conn beyond this point - # we can safely set it as the final attribute value here. - f.server_conn = self.server_conn - - def process_request_hook(self, f): # Determine .scheme, .host and .port attributes for inline scripts. # For absolute-form requests, they are directly given in the request. # For authority-form requests, we only need to determine the request scheme. @@ -368,7 +237,156 @@ class HttpLayer(base.Layer): f.request.scheme = "https" if self.__initial_server_tls else "http" self.channel.ask("request", f) - def establish_server_connection(self, host, port, scheme): + try: + if websockets.check_handshake(request.headers) and websockets.check_client_version(request.headers): + # We only support RFC6455 with WebSockets version 13 + # allow inline scripts to manipulate the client handshake + self.channel.ask("websocket_handshake", f) + + if not f.response: + self.establish_server_connection( + f.request.host, + f.request.port, + f.request.scheme + ) + + def get_response(): + self.send_request(f.request) + f.response = self.read_response_headers() + + try: + get_response() + except exceptions.NetlibException as e: + self.log( + "server communication error: %s" % repr(e), + level="debug" + ) + # In any case, we try to reconnect at least once. This is + # necessary because it might be possible that we already + # initiated an upstream connection after clientconnect that + # has already been expired, e.g consider the following event + # log: + # > clientconnect (transparent mode destination known) + # > serverconnect (required for client tls handshake) + # > read n% of large request + # > server detects timeout, disconnects + # > read (100-n)% of large request + # > send large request upstream + + if isinstance(e, exceptions.Http2ProtocolException): + # do not try to reconnect for HTTP2 + raise exceptions.ProtocolException("First and only attempt to get response via HTTP2 failed.") + + self.disconnect() + self.connect() + get_response() + + # call the appropriate script hook - this is an opportunity for an + # inline script to set f.stream = True + self.channel.ask("responseheaders", f) + + if f.response.stream: + f.response.data.content = None + else: + f.response.data.content = b"".join(self.read_response_body( + f.request, + f.response + )) + f.response.timestamp_end = time.time() + + # no further manipulation of self.server_conn beyond this point + # we can safely set it as the final attribute value here. + f.server_conn = self.server_conn + else: + # response was set by an inline script. + # we now need to emulate the responseheaders hook. + self.channel.ask("responseheaders", f) + + self.log("response", "debug", [repr(f.response)]) + self.channel.ask("response", f) + + if not f.response.stream: + # no streaming: + # we already received the full response from the server and can + # send it to the client straight away. + self.send_response(f.response) + else: + # streaming: + # First send the headers and then transfer the response incrementally + self.send_response_headers(f.response) + chunks = self.read_response_body( + f.request, + f.response + ) + if callable(f.response.stream): + chunks = f.response.stream(chunks) + self.send_response_body(f.response, chunks) + f.response.timestamp_end = time.time() + + if self.check_close_connection(f): + return False + + # Handle 101 Switching Protocols + if f.response.status_code == 101: + """ + Handle a successful HTTP 101 Switching Protocols Response, received after + e.g. a WebSocket upgrade request. + """ + # Check for WebSockets handshake + is_websockets = ( + f and + websockets.check_handshake(f.request.headers) and + websockets.check_handshake(f.response.headers) + ) + if is_websockets and not self.config.options.websockets: + self.log( + "Client requested WebSocket connection, but the protocol is currently disabled in mitmproxy.", + "info" + ) + + if is_websockets and self.config.options.websockets: + layer = pwebsockets.WebSocketsLayer(self, f) + else: + layer = self.ctx.next_layer(self) + layer() + return False # should never be reached + + # Upstream Proxy Mode: Handle CONNECT + if f.request.first_line_format == "authority" and f.response.status_code == 200: + layer = UpstreamConnectLayer(self, f.request) + layer() + return False + + except (exceptions.ProtocolException, exceptions.NetlibException) as e: + self.send_error_response(502, repr(e)) + if not f.response: + f.error = flow.Error(str(e)) + self.channel.ask("error", f) + return False + else: + raise exceptions.ProtocolException( + "Error in HTTP connection: %s" % repr(e) + ) + finally: + if f: + f.live = False + + return True + + def send_error_response(self, code, message, headers=None) -> None: + try: + response = http.make_error_response(code, message, headers) + self.send_response(response) + except (exceptions.NetlibException, h2.exceptions.H2Error, exceptions.Http2ProtocolException): + self.log(traceback.format_exc(), "debug") + + def change_upstream_proxy_server(self, address) -> None: + # Make set_upstream_proxy_server always available, + # even if there's no UpstreamConnectLayer + if address != self.server_conn.address: + self.set_server(address) + + def establish_server_connection(self, host: str, port: int, scheme: str): address = tcp.Address((host, port)) tls = (scheme == "https") @@ -385,42 +403,8 @@ class HttpLayer(base.Layer): self.connect() if tls: raise exceptions.HttpProtocolException("Cannot change scheme in upstream proxy mode.") - """ - # This is a very ugly (untested) workaround to solve a very ugly problem. - if self.server_conn and self.server_conn.tls_established and not ssl: - self.disconnect() - self.connect() - elif ssl and not hasattr(self, "connected_to") or self.connected_to != address: - if self.server_conn.tls_established: - self.disconnect() - self.connect() - self.send_request(make_connect_request(address)) - tls_layer = TlsLayer(self, False, True) - tls_layer._establish_tls_with_server() - """ - - def validate_request(self, request): - if request.first_line_format == "absolute" and request.scheme != "http": - raise exceptions.HttpException("Invalid request scheme: %s" % request.scheme) - - expected_request_forms = { - "regular": ("authority", "absolute",), - "upstream": ("authority", "absolute"), - "transparent": ("relative",) - } - - allowed_request_forms = expected_request_forms[self.mode] - if request.first_line_format not in allowed_request_forms: - err_message = "Invalid HTTP request form (expected: %s, got: %s)" % ( - " or ".join(allowed_request_forms), request.first_line_format - ) - raise exceptions.HttpException(err_message) - - if self.mode == "regular" and request.first_line_format == "absolute": - request.first_line_format = "relative" - - def authenticate(self, request): + def authenticate(self, request) -> bool: if self.config.authenticator: if self.config.authenticator.authenticate(request.headers): self.config.authenticator.clean(request.headers) @@ -439,26 +423,3 @@ class HttpLayer(base.Layer): )) return False return True - - def handle_101_switching_protocols(self, f): - """ - Handle a successful HTTP 101 Switching Protocols Response, received after e.g. a WebSocket upgrade request. - """ - # Check for WebSockets handshake - is_websockets = ( - f and - websockets.check_handshake(f.request.headers) and - websockets.check_handshake(f.response.headers) - ) - if is_websockets and not self.config.options.websockets: - self.log( - "Client requested WebSocket connection, but the protocol is currently disabled in mitmproxy.", - "info" - ) - - if is_websockets and self.config.options.websockets: - layer = pwebsockets.WebSocketsLayer(self, f) - else: - layer = self.ctx.next_layer(self) - - layer()