move body streaming into proxy core, fix #4470

This commit is contained in:
Maximilian Hils 2021-06-12 18:51:05 +02:00
parent a4f5edb5b7
commit 199670cad4
11 changed files with 248 additions and 165 deletions

View File

@ -68,6 +68,7 @@ If you depend on these features, please raise your voice in
"red ball" marker, a single character, or an emoji like `:grapes:`. Use the `~marker` filter to filter on marker characters. (@rbdixon)
* New `flow.comment` command to add a comment to the flow. Add `~comment <regex>` filter syntax to search flow comments. (@rbdixon)
* Fix multipart forms losing `boundary` values on edit (@roytu)
* `Transfer-Encoding: chunked` HTTP message bodies are now retained if they are below the stream_large_bodies limit.
* --- TODO: add new PRs above this line ---
* ... and various other fixes, documentation improvements, dependency version bumps, etc.

View File

@ -22,7 +22,6 @@ from mitmproxy.addons import modifybody
from mitmproxy.addons import modifyheaders
from mitmproxy.addons import stickyauth
from mitmproxy.addons import stickycookie
from mitmproxy.addons import streambodies
from mitmproxy.addons import save
from mitmproxy.addons import tlsconfig
from mitmproxy.addons import upstream_auth
@ -54,7 +53,6 @@ def default_addons():
modifyheaders.ModifyHeaders(),
stickyauth.StickyAuth(),
stickycookie.StickyCookie(),
streambodies.StreamBodies(),
save.Save(),
tlsconfig.TlsConfig(),
upstream_auth.UpstreamAuth(),

View File

@ -2,7 +2,7 @@ import asyncio
import warnings
from typing import Dict, Optional, Tuple
from mitmproxy import command, controller, ctx, flow, http, log, master, options, platform, tcp, websocket
from mitmproxy import command, controller, ctx, exceptions, flow, http, log, master, options, platform, tcp, websocket
from mitmproxy.flow import Error, Flow
from mitmproxy.proxy import commands, events, server_hooks
from mitmproxy.proxy import server
@ -90,6 +90,14 @@ class Proxyserver:
"server-side greetings, as well as accurately mirror TLS ALPN negotiation.",
choices=("eager", "lazy")
)
loader.add_option(
"stream_large_bodies", Optional[str], None,
"""
Stream data to the client if response body exceeds the given
threshold. If streamed, the body will not be stored in any way.
Understands k/m/g suffixes, i.e. 3m for 3 megabytes.
"""
)
loader.add_option(
"proxy_debug", bool, False,
"Enable debug logs in the proxy core.",
@ -104,6 +112,11 @@ class Proxyserver:
def configure(self, updated):
if not self.is_running:
return
if "stream_large_bodies" in updated:
try:
human.parse_size(ctx.options.stream_large_bodies)
except ValueError as e:
raise exceptions.OptionsError(e)
if "mode" in updated and ctx.options.mode == "transparent": # pragma: no cover
platform.init_transparent_mode()
if any(x in updated for x in ["server", "listen_host", "listen_port"]):

View File

@ -1,49 +0,0 @@
import typing
from mitmproxy.net.http import http1
from mitmproxy import exceptions
from mitmproxy import ctx
from mitmproxy.utils import human
class StreamBodies:
def __init__(self):
self.max_size = None
def load(self, loader):
loader.add_option(
"stream_large_bodies", typing.Optional[str], None,
"""
Stream data to the client if response body exceeds the given
threshold. If streamed, the body will not be stored in any way.
Understands k/m/g suffixes, i.e. 3m for 3 megabytes.
"""
)
def configure(self, updated):
if "stream_large_bodies" in updated and ctx.options.stream_large_bodies:
try:
self.max_size = human.parse_size(ctx.options.stream_large_bodies)
except ValueError as e:
raise exceptions.OptionsError(e)
def run(self, f, is_request):
if self.max_size:
r = f.request if is_request else f.response
try:
expected_size = http1.expected_http_body_size(
f.request, f.response if not is_request else None
)
except ValueError:
f.kill()
return
if expected_size and not r.raw_content and not (0 <= expected_size <= self.max_size):
# r.stream may already be a callable, which we want to preserve.
r.stream = r.stream or True
ctx.log.info("Streaming {} {}".format("response from" if not is_request else "request to", f.request.host))
def requestheaders(self, f):
self.run(f, True)
def responseheaders(self, f):
self.run(f, False)

View File

@ -40,13 +40,9 @@ def connection_close(http_version, headers):
def expected_http_body_size(
request: Request,
response: Optional[Response] = None,
expect_continue_as_0: bool = True
):
response: Optional[Response] = None
) -> Optional[int]:
"""
Args:
- expect_continue_as_0: If true, incorrectly predict a body size of 0 for requests which are waiting
for a 100 Continue response.
Returns:
The expected body length:
- a positive integer, if the size is known in advance
@ -62,8 +58,6 @@ def expected_http_body_size(
headers = request.headers
if request.method.upper() == "CONNECT":
return 0
if expect_continue_as_0 and headers.get("expect", "").lower() == "100-continue":
return 0
else:
headers = response.headers
if request.method.upper() == "HEAD":
@ -75,8 +69,6 @@ def expected_http_body_size(
if response.status_code in (204, 304):
return 0
if "chunked" in headers.get("transfer-encoding", "").lower():
return None
if "content-length" in headers:
sizes = headers.get_all("content-length")
different_content_length_headers = any(x != sizes[0] for x in sizes)
@ -86,6 +78,8 @@ def expected_http_body_size(
if size < 0:
raise ValueError("Negative Content Length")
return size
if "chunked" in headers.get("transfer-encoding", "").lower():
return None
if not response:
return 0
return -1

View File

@ -9,6 +9,7 @@ from mitmproxy import flow, http
from mitmproxy.connection import Connection, Server
from mitmproxy.net import server_spec
from mitmproxy.net.http import status_codes, url
from mitmproxy.net.http.http1 import expected_http_body_size
from mitmproxy.proxy import commands, events, layer, tunnel
from mitmproxy.proxy.layers import tcp, tls, websocket
from mitmproxy.proxy.layers.http import _upstream_proxy
@ -194,6 +195,18 @@ class HttpStream(layer.Layer):
self.context.server.address[1],
)
# determine if we already know that we want to stream the request body.
# we may also only realize this later while consuming the request body if the expected body size is unknown.
if self.context.options.stream_large_bodies and not event.end_stream:
max_size = human.parse_size(self.context.options.stream_large_bodies)
assert max_size is not None
try:
expected_size = expected_http_body_size(self.flow.request)
except ValueError: # pragma: no cover
expected_size = None
if expected_size is not None and expected_size > max_size:
self.flow.request.stream = True
yield HttpRequestHeadersHook(self.flow)
if (yield from self.check_killed(True)):
return
@ -220,6 +233,7 @@ class HttpStream(layer.Layer):
RequestHeaders(self.stream_id, self.flow.request, end_stream=False),
self.context.server
)
yield commands.Log(f"Streaming request to {self.flow.request.host}.")
self.client_state = self.state_stream_request_body
@expect(RequestData, RequestTrailers, RequestEndOfMessage)
@ -256,6 +270,17 @@ class HttpStream(layer.Layer):
def state_consume_request_body(self, event: events.Event) -> layer.CommandGenerator[None]:
if isinstance(event, RequestData):
self.request_body_buf += event.data
# Have we reached the threshold to stream the request?
if self.context.options.stream_large_bodies:
max_size = human.parse_size(self.context.options.stream_large_bodies)
assert max_size is not None
if len(self.request_body_buf) > max_size:
self.flow.request.stream = True
body_buf = self.request_body_buf
self.request_body_buf = b""
yield from self.start_request_stream()
yield from self.handle_event(RequestData(event.stream_id, body_buf))
elif isinstance(event, RequestTrailers):
assert self.flow.request
self.flow.request.trailers = event.trailers
@ -292,15 +317,33 @@ class HttpStream(layer.Layer):
@expect(ResponseHeaders)
def state_wait_for_response_headers(self, event: ResponseHeaders) -> layer.CommandGenerator[None]:
self.flow.response = event.response
# determine if we already know that we want to stream the response body.
# we may also only realize this later while consuming the response body if the expected body size is unknown.
if self.context.options.stream_large_bodies and not event.end_stream:
max_size = human.parse_size(self.context.options.stream_large_bodies)
assert max_size is not None
try:
expected_size = expected_http_body_size(self.flow.request, self.flow.response)
except ValueError: # pragma: no cover
expected_size = None
if expected_size is not None and expected_size > max_size:
self.flow.response.stream = True
yield HttpResponseHeadersHook(self.flow)
if (yield from self.check_killed(True)):
return
elif self.flow.response.stream:
yield SendHttp(event, self.context.client)
self.server_state = self.state_stream_response_body
yield from self.start_response_stream()
else:
self.server_state = self.state_consume_response_body
def start_response_stream(self) -> layer.CommandGenerator[None]:
assert self.flow.response
yield SendHttp(ResponseHeaders(self.stream_id, self.flow.response, end_stream=False), self.context.client)
yield commands.Log(f"Streaming response from {self.flow.request.host}.")
self.server_state = self.state_stream_response_body
@expect(ResponseData, ResponseTrailers, ResponseEndOfMessage)
def state_stream_response_body(self, event: events.Event) -> layer.CommandGenerator[None]:
assert self.flow.response
@ -321,6 +364,18 @@ class HttpStream(layer.Layer):
def state_consume_response_body(self, event: events.Event) -> layer.CommandGenerator[None]:
if isinstance(event, ResponseData):
self.response_body_buf += event.data
# Have we reached the threshold to stream the response?
if self.context.options.stream_large_bodies:
max_size = human.parse_size(self.context.options.stream_large_bodies)
assert max_size is not None
if len(self.response_body_buf) > max_size:
assert self.flow.response
self.flow.response.stream = True
body_buf = self.response_body_buf
self.response_body_buf = b""
yield from self.start_response_stream()
yield from self.handle_event(ResponseData(event.stream_id, body_buf))
elif isinstance(event, ResponseTrailers):
assert self.flow.response
self.flow.response.trailers = event.trailers

View File

@ -235,7 +235,7 @@ class Http1Server(Http1Connection):
request_head = [bytes(x) for x in request_head] # TODO: Make url.parse compatible with bytearrays
try:
self.request = http1.read_request_head(request_head)
expected_body_size = http1.expected_http_body_size(self.request, expect_continue_as_0=False)
expected_body_size = http1.expected_http_body_size(self.request)
except ValueError as e:
yield commands.Log(f"{human.format_address(self.conn.peername)}: {e}")
yield commands.CloseConnection(self.conn)

View File

@ -4,6 +4,7 @@ from contextlib import asynccontextmanager
import pytest
from mitmproxy.addons.clientplayback import ClientPlayback, ReplayHandler
from mitmproxy.addons.proxyserver import Proxyserver
from mitmproxy.exceptions import CommandError, OptionsError
from mitmproxy.connection import Address
from mitmproxy.test import taddons, tflow
@ -47,7 +48,8 @@ async def test_playback(mode):
handler_ok.set()
cp = ClientPlayback()
with taddons.context(cp) as tctx:
ps = Proxyserver()
with taddons.context(cp, ps) as tctx:
async with tcp_server(handler) as addr:
cp.running()
@ -78,6 +80,7 @@ async def test_playback_crash(monkeypatch):
cp.start_replay([tflow.tflow()])
await tctx.master.await_log("Client replay has crashed!", level="error")
assert cp.count() == 0
cp.done()
def test_check():

View File

@ -1,31 +0,0 @@
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
from mitmproxy.addons import streambodies
import pytest
def test_simple():
sa = streambodies.StreamBodies()
with taddons.context(sa) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, stream_large_bodies = "invalid")
tctx.configure(sa, stream_large_bodies = "10")
f = tflow.tflow()
f.request.content = b""
f.request.headers["Content-Length"] = "1024"
assert not f.request.stream
sa.requestheaders(f)
assert f.request.stream
f = tflow.tflow(resp=True)
f.response.content = b""
f.response.headers["Content-Length"] = "1024"
assert not f.response.stream
sa.responseheaders(f)
assert f.response.stream
f = tflow.tflow(resp=True)
f.response.headers["content-length"] = "invalid"
tctx.cycle(sa, f)

View File

@ -63,12 +63,6 @@ def test_expected_http_body_size():
# Expect: 100-continue
assert expected_http_body_size(
treq(headers=Headers(expect="100-continue", content_length="42")),
expect_continue_as_0=True
) == 0
# Expect: 100-continue
assert expected_http_body_size(
treq(headers=Headers(expect="100-continue", content_length="42")),
expect_continue_as_0=False
) == 42
# http://tools.ietf.org/html/rfc7230#section-3.3
@ -94,6 +88,9 @@ def test_expected_http_body_size():
assert expected_http_body_size(
treq(headers=Headers(transfer_encoding="chunked")),
) is None
assert expected_http_body_size(
treq(headers=Headers(transfer_encoding="chunked", content_length="42")),
) == 42
# explicit length
for val in (b"foo", b"-7"):

View File

@ -1,14 +1,14 @@
import pytest
from mitmproxy.connection import ConnectionState, Server
from mitmproxy.flow import Error
from mitmproxy.http import HTTPFlow, Response
from mitmproxy.net.server_spec import ServerSpec
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy import layer
from mitmproxy.proxy.commands import CloseConnection, OpenConnection, SendData, Log
from mitmproxy.connection import ConnectionState, Server
from mitmproxy.proxy.commands import CloseConnection, Log, OpenConnection, SendData
from mitmproxy.proxy.events import ConnectionClosed, DataReceived
from mitmproxy.proxy.layers import TCPLayer, http, tls
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy.layers.tcp import TcpMessageInjected, TcpStartHook
from mitmproxy.proxy.layers.websocket import WebsocketStartHook
from mitmproxy.tcp import TCPFlow, TCPMessage
@ -265,37 +265,100 @@ def test_disconnect_while_intercept(tctx):
assert flow().server_conn == server2()
def test_response_streaming(tctx):
@pytest.mark.parametrize("why", ["body_size=0", "body_size=3", "addon"])
@pytest.mark.parametrize("transfer_encoding", ["identity", "chunked"])
def test_response_streaming(tctx, why, transfer_encoding):
"""Test HTTP response streaming"""
server = Placeholder(Server)
flow = Placeholder(HTTPFlow)
playbook = Playbook(http.HttpLayer(tctx, HTTPMode.regular))
if why.startswith("body_size"):
tctx.options.stream_large_bodies = why.removeprefix("body_size=")
def enable_streaming(flow: HTTPFlow):
flow.response.stream = lambda x: x.upper()
if why == "addon":
flow.response.stream = True
assert (
Playbook(http.HttpLayer(tctx, HTTPMode.regular))
>> DataReceived(tctx.client, b"GET http://example.com/largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< http.HttpRequestHeadersHook(flow)
>> reply()
<< http.HttpRequestHook(flow)
>> reply()
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET /largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc")
<< http.HttpResponseHeadersHook(flow)
>> reply(side_effect=enable_streaming)
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nABC")
>> DataReceived(server, b"def")
<< SendData(tctx.client, b"DEF")
<< http.HttpResponseHook(flow)
>> reply()
playbook
>> DataReceived(tctx.client, b"GET http://example.com/largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< http.HttpRequestHeadersHook(flow)
>> reply()
<< http.HttpRequestHook(flow)
>> reply()
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET /largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\n")
)
if transfer_encoding == "identity":
playbook >> DataReceived(server, b"Content-Length: 6\r\n\r\n"
b"abc")
else:
playbook >> DataReceived(server, b"Transfer-Encoding: chunked\r\n\r\n"
b"3\r\nabc\r\n")
playbook << http.HttpResponseHeadersHook(flow)
playbook >> reply(side_effect=enable_streaming)
if transfer_encoding == "identity":
playbook << SendData(tctx.client, b"HTTP/1.1 200 OK\r\n"
b"Content-Length: 6\r\n\r\n"
b"abc")
playbook >> DataReceived(server, b"def")
playbook << SendData(tctx.client, b"def")
else:
if why == "body_size=3":
playbook >> DataReceived(server, b"3\r\ndef\r\n")
playbook << SendData(tctx.client, b"HTTP/1.1 200 OK\r\n"
b"Transfer-Encoding: chunked\r\n\r\n"
b"6\r\nabcdef\r\n")
else:
playbook << SendData(tctx.client, b"HTTP/1.1 200 OK\r\n"
b"Transfer-Encoding: chunked\r\n\r\n"
b"3\r\nabc\r\n")
playbook >> DataReceived(server, b"3\r\ndef\r\n")
playbook << SendData(tctx.client, b"3\r\ndef\r\n")
playbook >> DataReceived(server, b"0\r\n\r\n")
playbook << http.HttpResponseHook(flow)
playbook >> reply()
if transfer_encoding == "chunked":
playbook << SendData(tctx.client, b"0\r\n\r\n")
assert playbook
def test_request_stream_modify(tctx):
"""Test HTTP response streaming"""
server = Placeholder(Server)
def enable_streaming(flow: HTTPFlow):
flow.request.stream = lambda x: x.upper()
assert (
Playbook(http.HttpLayer(tctx, HTTPMode.regular))
>> DataReceived(tctx.client, b"POST http://example.com/ HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 6\r\n\r\n"
b"abc")
<< http.HttpRequestHeadersHook(Placeholder(HTTPFlow))
>> reply(side_effect=enable_streaming)
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 6\r\n\r\n"
b"ABC")
)
@pytest.mark.parametrize("why", ["body_size=0", "body_size=3", "addon"])
@pytest.mark.parametrize("transfer_encoding", ["identity", "chunked"])
@pytest.mark.parametrize("response", ["normal response", "early response", "early close", "early kill"])
def test_request_streaming(tctx, response):
def test_request_streaming(tctx, why, transfer_encoding, response):
"""
Test HTTP request streaming
@ -305,55 +368,94 @@ def test_request_streaming(tctx, response):
flow = Placeholder(HTTPFlow)
playbook = Playbook(http.HttpLayer(tctx, HTTPMode.regular))
def enable_streaming(flow: HTTPFlow):
flow.request.stream = lambda x: x.upper()
if why.startswith("body_size"):
tctx.options.stream_large_bodies = why.removeprefix("body_size=")
def enable_streaming(flow: HTTPFlow):
if why == "addon":
flow.request.stream = True
playbook >> DataReceived(tctx.client, b"POST http://example.com/ HTTP/1.1\r\n"
b"Host: example.com\r\n")
if transfer_encoding == "identity":
playbook >> DataReceived(tctx.client, b"Content-Length: 9\r\n\r\n"
b"abc")
else:
playbook >> DataReceived(tctx.client, b"Transfer-Encoding: chunked\r\n\r\n"
b"3\r\nabc\r\n")
playbook << http.HttpRequestHeadersHook(flow)
playbook >> reply(side_effect=enable_streaming)
needs_more_data_before_open = (why == "body_size=3" and transfer_encoding == "chunked")
if needs_more_data_before_open:
playbook >> DataReceived(tctx.client, b"3\r\ndef\r\n")
playbook << OpenConnection(server)
playbook >> reply(None)
playbook << SendData(server, b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n")
if transfer_encoding == "identity":
playbook << SendData(server, b"Content-Length: 9\r\n\r\n"
b"abc")
playbook >> DataReceived(tctx.client, b"def")
playbook << SendData(server, b"def")
else:
if needs_more_data_before_open:
playbook << SendData(server, b"Transfer-Encoding: chunked\r\n\r\n"
b"6\r\nabcdef\r\n")
else:
playbook << SendData(server, b"Transfer-Encoding: chunked\r\n\r\n"
b"3\r\nabc\r\n")
playbook >> DataReceived(tctx.client, b"3\r\ndef\r\n")
playbook << SendData(server, b"3\r\ndef\r\n")
assert (
playbook
>> DataReceived(tctx.client, b"POST http://example.com/ HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 6\r\n\r\n"
b"abc")
<< http.HttpRequestHeadersHook(flow)
>> reply(side_effect=enable_streaming)
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 6\r\n\r\n"
b"ABC")
)
if response == "normal response":
if transfer_encoding == "identity":
playbook >> DataReceived(tctx.client, b"ghi")
playbook << SendData(server, b"ghi")
else:
playbook >> DataReceived(tctx.client, b"3\r\nghi\r\n0\r\n\r\n")
playbook << SendData(server, b"3\r\nghi\r\n")
playbook << http.HttpRequestHook(flow)
playbook >> reply()
if transfer_encoding == "chunked":
playbook << SendData(server, b"0\r\n\r\n")
assert (
playbook
>> DataReceived(tctx.client, b"def")
<< SendData(server, b"DEF")
<< http.HttpRequestHook(flow)
>> reply()
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
>> reply()
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
playbook
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
>> reply()
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
)
elif response == "early response":
# We may receive a response before we have finished sending our request.
# We continue sending unless the server closes the connection.
# https://tools.ietf.org/html/rfc7231#section-6.5.11
assert (
playbook
>> DataReceived(server, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
>> reply()
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
>> DataReceived(tctx.client, b"def")
<< SendData(server, b"DEF")
<< http.HttpRequestHook(flow)
>> reply()
playbook
>> DataReceived(server, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
>> reply()
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
)
if transfer_encoding == "identity":
playbook >> DataReceived(tctx.client, b"ghi")
playbook << SendData(server, b"ghi")
else:
playbook >> DataReceived(tctx.client, b"3\r\nghi\r\n0\r\n\r\n")
playbook << SendData(server, b"3\r\nghi\r\n")
playbook << http.HttpRequestHook(flow)
playbook >> reply()
if transfer_encoding == "chunked":
playbook << SendData(server, b"0\r\n\r\n")
assert playbook
elif response == "early close":
assert (
playbook