mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
streamline HTTPHandler.handle_flow()
This commit is contained in:
parent
2a6337343a
commit
32e1ed212d
@ -160,13 +160,6 @@ class HTTPMessage(stateobject.SimpleStateObject):
|
||||
c += self.headers.replace(pattern, repl, *args, **kwargs)
|
||||
return c
|
||||
|
||||
@classmethod
|
||||
def from_stream(cls, rfile, include_body=True, body_size_limit=None):
|
||||
"""
|
||||
Parse an HTTP message from a file stream
|
||||
"""
|
||||
raise NotImplementedError() # pragma: nocover
|
||||
|
||||
def _assemble_first_line(self):
|
||||
"""
|
||||
Returns the assembled request/response line
|
||||
@ -644,7 +637,8 @@ class HTTPResponse(HTTPMessage):
|
||||
|
||||
if self.content:
|
||||
headers["Content-Length"] = [str(len(self.content))]
|
||||
elif not preserve_transfer_encoding and 'Transfer-Encoding' in self.headers: # add content-length for chuncked transfer-encoding with no content
|
||||
# add content-length for chuncked transfer-encoding with no content
|
||||
elif not preserve_transfer_encoding and 'Transfer-Encoding' in self.headers:
|
||||
headers["Content-Length"] = ["0"]
|
||||
|
||||
return str(headers)
|
||||
@ -873,19 +867,21 @@ class HTTPHandler(ProtocolHandler):
|
||||
while self.handle_flow():
|
||||
pass
|
||||
|
||||
def get_response_from_server(self, request, include_body=True):
|
||||
def get_response_from_server(self, flow):
|
||||
self.c.establish_server_connection()
|
||||
request_raw = request.assemble()
|
||||
request_raw = flow.request.assemble()
|
||||
|
||||
for i in range(2):
|
||||
for attempt in (0, 1):
|
||||
try:
|
||||
self.c.server_conn.send(request_raw)
|
||||
res = HTTPResponse.from_stream(self.c.server_conn.rfile, request.method,
|
||||
body_size_limit=self.c.config.body_size_limit, include_body=include_body)
|
||||
return res
|
||||
# Only get the headers at first...
|
||||
flow.response = HTTPResponse.from_stream(self.c.server_conn.rfile, flow.request.method,
|
||||
body_size_limit=self.c.config.body_size_limit,
|
||||
include_body=False)
|
||||
break
|
||||
except (tcp.NetLibDisconnect, http.HttpErrorConnClosed), v:
|
||||
self.c.log("error in server communication: %s" % repr(v), level="debug")
|
||||
if i < 1:
|
||||
if attempt == 0:
|
||||
# In any case, we try to reconnect at least once.
|
||||
# This is necessary because it might be possible that we already initiated an upstream connection
|
||||
# after clientconnect that has already been expired, e.g consider the following event log:
|
||||
@ -899,13 +895,24 @@ class HTTPHandler(ProtocolHandler):
|
||||
else:
|
||||
raise
|
||||
|
||||
# call the appropriate script hook - this is an opportunity for an inline script to set flow.stream = True
|
||||
self.c.channel.ask("responseheaders", flow)
|
||||
|
||||
# now get the rest of the request body, if body still needs to be read but not streaming this response
|
||||
if flow.response.stream:
|
||||
flow.response.content = CONTENT_MISSING
|
||||
else:
|
||||
flow.response.content = http.read_http_body(self.c.server_conn.rfile, flow.response.headers,
|
||||
self.c.config.body_size_limit,
|
||||
flow.request.method, flow.response.code, False)
|
||||
|
||||
def handle_flow(self):
|
||||
flow = HTTPFlow(self.c.client_conn, self.c.server_conn, self.live)
|
||||
try:
|
||||
try:
|
||||
req = HTTPRequest.from_stream(self.c.client_conn.rfile,
|
||||
body_size_limit=self.c.config.body_size_limit)
|
||||
except tcp.NetLibDisconnect: # specifically ignore disconnects that happen before/between requests.
|
||||
except tcp.NetLibDisconnect: # don't throw an error for disconnects that happen before/between requests.
|
||||
return False
|
||||
self.c.log("request", "debug", [req._assemble_first_line(req.form_in)])
|
||||
ret = self.process_request(flow, req)
|
||||
@ -927,20 +934,7 @@ class HTTPHandler(ProtocolHandler):
|
||||
if isinstance(request_reply, HTTPResponse):
|
||||
flow.response = request_reply
|
||||
else:
|
||||
|
||||
# read initially in "stream" mode, so we can get the headers separately
|
||||
flow.response = self.get_response_from_server(flow.request, include_body=False)
|
||||
|
||||
# call the appropriate script hook - this is an opportunity for an inline script to set flow.stream = True
|
||||
self.c.channel.ask("responseheaders", flow)
|
||||
|
||||
# now get the rest of the request body, if body still needs to be read but not streaming this response
|
||||
if flow.response.stream:
|
||||
flow.response.content = CONTENT_MISSING
|
||||
else:
|
||||
flow.response.content = http.read_http_body(self.c.server_conn.rfile, flow.response.headers,
|
||||
self.c.config.body_size_limit,
|
||||
flow.request.method, flow.response.code, False)
|
||||
self.get_response_from_server(flow)
|
||||
|
||||
# no further manipulation of self.c.server_conn beyond this point
|
||||
# we can safely set it as the final attribute value here.
|
||||
@ -951,67 +945,34 @@ class HTTPHandler(ProtocolHandler):
|
||||
if response_reply is None or response_reply == KILL:
|
||||
return False
|
||||
|
||||
if not flow.response.stream:
|
||||
# no streaming:
|
||||
# we already received the full response from the server and can send it to the client straight away.
|
||||
self.c.client_conn.send(flow.response.assemble())
|
||||
else:
|
||||
# streaming:
|
||||
# First send the body and then transfer the response incrementally:
|
||||
h = flow.response._assemble_head(preserve_transfer_encoding=True)
|
||||
self.c.client_conn.send(h)
|
||||
for chunk in http.read_http_body_chunked(self.c.server_conn.rfile,
|
||||
flow.response.headers,
|
||||
self.c.config.body_size_limit, flow.request.method,
|
||||
flow.response.code, False, 4096):
|
||||
for part in chunk:
|
||||
self.c.client_conn.wfile.write(part)
|
||||
self.c.client_conn.wfile.flush()
|
||||
flow.response.timestamp_end = utils.timestamp()
|
||||
self.send_response_to_client(flow)
|
||||
|
||||
flow.timestamp_end = utils.timestamp()
|
||||
|
||||
close_connection = (
|
||||
http.connection_close(flow.request.httpversion, flow.request.headers) or
|
||||
http.connection_close(flow.response.httpversion, flow.response.headers) or
|
||||
http.expected_http_body_size(flow.response.headers, False, flow.request.method,
|
||||
flow.response.code) == -1)
|
||||
if close_connection:
|
||||
if flow.request.form_in == "authority" and flow.response.code == 200:
|
||||
# Workaround for https://github.com/mitmproxy/mitmproxy/issues/313:
|
||||
# Some proxies (e.g. Charles) send a CONNECT response with HTTP/1.0 and no Content-Length header
|
||||
pass
|
||||
else:
|
||||
return False
|
||||
if self.check_close_connection(flow):
|
||||
return False
|
||||
|
||||
# We sent a CONNECT request to an upstream proxy.
|
||||
if flow.request.form_in == "authority" and flow.response.code == 200:
|
||||
# TODO: Eventually add headers (space/usefulness tradeoff)
|
||||
# Make sure to add state info before the actual upgrade happens.
|
||||
# During the upgrade, we may receive an SNI indication from the client,
|
||||
# TODO: Possibly add headers (memory consumption/usefulness tradeoff)
|
||||
# Make sure to add state info before the actual processing of the CONNECT request happens.
|
||||
# During an SSL upgrade, we may receive an SNI indication from the client,
|
||||
# which resets the upstream connection. If this is the case, we must
|
||||
# already re-issue the CONNECT request at this point.
|
||||
self.c.server_conn.state.append(("http", {"state": "connect",
|
||||
"host": flow.request.host,
|
||||
"port": flow.request.port}))
|
||||
|
||||
if self.c.check_ignore_address((flow.request.host, flow.request.port)):
|
||||
self.c.log("Ignore host: %s:%s" % self.c.server_conn.address(), "info")
|
||||
TCPHandler(self.c).handle_messages()
|
||||
if not self.process_connect_request((flow.request.host, flow.request.port)):
|
||||
return False
|
||||
else:
|
||||
if flow.request.port in self.c.config.ssl_ports:
|
||||
self.ssl_upgrade()
|
||||
self.skip_authentication = True
|
||||
|
||||
# If the user has changed the target server on this connection,
|
||||
# restore the original target server
|
||||
flow.live.restore_server()
|
||||
flow.live = None
|
||||
|
||||
return True
|
||||
return True # Next flow please.
|
||||
except (HttpAuthenticationError, http.HttpError, proxy.ProxyError, tcp.NetLibError), e:
|
||||
self.handle_error(e, flow)
|
||||
finally:
|
||||
flow.timestamp_end = utils.timestamp()
|
||||
flow.live = None # Connection is not live anymore.
|
||||
return False
|
||||
|
||||
def handle_server_reconnect(self, state):
|
||||
@ -1060,16 +1021,6 @@ class HTTPHandler(ProtocolHandler):
|
||||
self.c.client_conn.wfile.write(html_content)
|
||||
self.c.client_conn.wfile.flush()
|
||||
|
||||
def ssl_upgrade(self):
|
||||
"""
|
||||
Upgrade the connection to SSL after an authority (CONNECT) request has been made.
|
||||
"""
|
||||
self.c.log("Received CONNECT request. Upgrading to SSL...", "debug")
|
||||
self.expected_form_in = "relative"
|
||||
self.expected_form_out = "relative"
|
||||
self.c.establish_ssl(server=True, client=True)
|
||||
self.c.log("Upgrade to SSL completed.", "debug")
|
||||
|
||||
def process_request(self, flow, request):
|
||||
"""
|
||||
@returns:
|
||||
@ -1114,16 +1065,7 @@ class HTTPHandler(ProtocolHandler):
|
||||
('Proxy-agent: %s\r\n' % self.c.server_version) +
|
||||
'\r\n'
|
||||
)
|
||||
|
||||
if self.c.check_ignore_address(self.c.server_conn.address):
|
||||
self.c.log("Ignore host: %s:%s" % self.c.server_conn.address(), "info")
|
||||
TCPHandler(self.c).handle_messages()
|
||||
return False
|
||||
else:
|
||||
if self.c.server_conn.address.port in self.c.config.ssl_ports:
|
||||
self.ssl_upgrade()
|
||||
self.skip_authentication = True
|
||||
return True
|
||||
return self.process_connect_request(self.c.server_conn.address)
|
||||
else: # upstream proxy mode
|
||||
return None
|
||||
else:
|
||||
@ -1140,7 +1082,6 @@ class HTTPHandler(ProtocolHandler):
|
||||
self.c.set_server_address((request.host, request.port))
|
||||
flow.server_conn = self.c.server_conn
|
||||
|
||||
|
||||
return None
|
||||
|
||||
raise http.HttpError(400, "Invalid HTTP request form (expected: %s, got: %s)" %
|
||||
@ -1182,6 +1123,66 @@ class HTTPHandler(ProtocolHandler):
|
||||
|
||||
flow.server_conn = self.c.server_conn
|
||||
|
||||
def send_response_to_client(self, flow):
|
||||
if not flow.response.stream:
|
||||
# no streaming:
|
||||
# we already received the full response from the server and can send it to the client straight away.
|
||||
self.c.client_conn.send(flow.response.assemble())
|
||||
else:
|
||||
# streaming:
|
||||
# First send the body and then transfer the response incrementally:
|
||||
h = flow.response._assemble_head(preserve_transfer_encoding=True)
|
||||
self.c.client_conn.send(h)
|
||||
for chunk in http.read_http_body_chunked(self.c.server_conn.rfile,
|
||||
flow.response.headers,
|
||||
self.c.config.body_size_limit, flow.request.method,
|
||||
flow.response.code, False, 4096):
|
||||
for part in chunk:
|
||||
self.c.client_conn.wfile.write(part)
|
||||
self.c.client_conn.wfile.flush()
|
||||
flow.response.timestamp_end = utils.timestamp()
|
||||
|
||||
def check_close_connection(self, flow):
|
||||
"""
|
||||
Checks if the connection should be closed depending on the HTTP semantics. Returns True, if so.
|
||||
"""
|
||||
close_connection = (
|
||||
http.connection_close(flow.request.httpversion, flow.request.headers) or
|
||||
http.connection_close(flow.response.httpversion, flow.response.headers) or
|
||||
http.expected_http_body_size(flow.response.headers, False, flow.request.method,
|
||||
flow.response.code) == -1)
|
||||
if close_connection:
|
||||
if flow.request.form_in == "authority" and flow.response.code == 200:
|
||||
# Workaround for https://github.com/mitmproxy/mitmproxy/issues/313:
|
||||
# Some proxies (e.g. Charles) send a CONNECT response with HTTP/1.0 and no Content-Length header
|
||||
pass
|
||||
else:
|
||||
return True
|
||||
return False
|
||||
|
||||
def process_connect_request(self, address):
|
||||
"""
|
||||
Process a CONNECT request.
|
||||
Returns True if the CONNECT request has been processed successfully.
|
||||
Returns False, if the connection should be closed immediately.
|
||||
"""
|
||||
address = tcp.Address.wrap(address)
|
||||
if self.c.check_ignore_address(address):
|
||||
self.c.log("Ignore host: %s:%s" % address(), "info")
|
||||
TCPHandler(self.c).handle_messages()
|
||||
return False
|
||||
else:
|
||||
self.expected_form_in = "relative"
|
||||
self.expected_form_out = "relative"
|
||||
self.skip_authentication = True
|
||||
|
||||
if address.port in self.c.config.ssl_ports:
|
||||
self.c.log("Received CONNECT request to SSL port. Upgrading to SSL...", "debug")
|
||||
self.c.establish_ssl(server=True, client=True)
|
||||
self.c.log("Upgrade to SSL completed.", "debug")
|
||||
|
||||
return True
|
||||
|
||||
def authenticate(self, request):
|
||||
if self.c.config.authenticator:
|
||||
if self.c.config.authenticator.authenticate(request.headers):
|
||||
|
Loading…
Reference in New Issue
Block a user