Merge pull request #2369 from ujjwal96/stream-support

HTTP request & WebSocket message streaming
This commit is contained in:
Thomas Kriechbaumer 2017-07-04 11:28:26 +02:00 committed by GitHub
commit b38ebd7278
18 changed files with 431 additions and 142 deletions

View File

@ -16,7 +16,7 @@ mechanism:
If you want to peek into (SSL-protected) non-HTTP connections, check out the :ref:`tcpproxy`
feature.
If you want to ignore traffic from mitmproxy's processing because of large response bodies,
take a look at the :ref:`responsestreaming` feature.
take a look at the :ref:`streaming` feature.
How it works
------------
@ -89,7 +89,7 @@ Here are some other examples for ignore patterns:
.. seealso::
- :ref:`tcpproxy`
- :ref:`responsestreaming`
- :ref:`streaming`
- mitmproxy's "Limit" feature
.. rubric:: Footnotes

View File

@ -1,68 +0,0 @@
.. _responsestreaming:
Response Streaming
==================
By using mitmproxy's streaming feature, response contents can be passed to the client incrementally
before they have been fully received by the proxy. This is especially useful for large binary files
such as videos, where buffering the whole file slows down the client's browser.
By default, mitmproxy will read the entire response, perform any indicated
manipulations on it and then send the (possibly modified) response to
the client. In some cases this is undesirable and you may wish to "stream"
the response back to the client. When streaming is enabled, the response is
not buffered on the proxy but directly sent back to the client instead.
On the command-line
-------------------
Streaming can be enabled on the command line for all response bodies exceeding a certain size.
The SIZE argument understands k/m/g suffixes, e.g. 3m for 3 megabytes.
================== =================
command-line ``--stream SIZE``
================== =================
.. warning::
When response streaming is enabled, **streamed response contents will not be
recorded or preserved in any way.**
.. note::
When response streaming is enabled, the response body cannot be modified by the usual means.
Customizing Response Streaming
------------------------------
You can also use a script to customize exactly which responses are streamed.
Responses that should be tagged for streaming by setting their ``.stream``
attribute to ``True``:
.. literalinclude:: ../../examples/complex/stream.py
:caption: examples/complex/stream.py
:language: python
Implementation Details
----------------------
When response streaming is enabled, portions of the code which would have otherwise performed
changes on the response body will see an empty response body. Any modifications will be ignored.
Streamed responses are usually sent in chunks of 4096 bytes. If the response is sent with a
``Transfer-Encoding: chunked`` header, the response will be streamed one chunk at a time.
Modifying streamed data
-----------------------
If the ``.stream`` attribute is callable, ``.stream`` will wrap the generator that yields all
chunks.
.. literalinclude:: ../../examples/complex/stream_modify.py
:caption: examples/complex/stream_modify.py
:language: python
.. seealso::
- :ref:`passthrough`

102
docs/features/streaming.rst Normal file
View File

@ -0,0 +1,102 @@
.. _streaming:
HTTP Streaming
==============
By default, mitmproxy will read the entire request/response, perform any indicated
manipulations on it and then send the (possibly modified) message to
the other party. In some cases this is undesirable and you may wish to "stream"
the request/response. When streaming is enabled, the request/response is
not buffered on the proxy but directly sent to the server/client instead.
HTTP headers are still fully buffered before being sent.
Request Streaming
-----------------
Request streaming can be used to incrementally stream a request body to the server
before it has been fully received by the proxy. This is useful for large file uploads.
Response Streaming
------------------
By using mitmproxy's streaming feature, response contents can be passed to the client incrementally
before they have been fully received by the proxy. This is especially useful for large binary files
such as videos, where buffering the whole file slows down the client's browser.
On the command-line
-------------------
Streaming can be enabled on the command line for all request and response bodies exceeding a certain size.
The SIZE argument understands k/m/g suffixes, e.g. 3m for 3 megabytes.
================== =================
command-line ``--set stream_large_bodies=SIZE``
================== =================
.. warning::
When streaming is enabled, **streamed request/response contents will not be
recorded or preserved in any way.**
.. note::
When streaming is enabled, the request/response body cannot be modified by the usual means.
Customizing Streaming
---------------------
You can also use a script to customize exactly which requests or responses are streamed.
Requests/Responses that should be tagged for streaming by setting their ``.stream``
attribute to ``True``:
.. literalinclude:: ../../examples/complex/stream.py
:caption: examples/complex/stream.py
:language: python
Implementation Details
----------------------
When response streaming is enabled, portions of the code which would have otherwise performed
changes on the request/response body will see an empty body. Any modifications will be ignored.
Streamed bodies are usually sent in chunks of 4096 bytes. If the response is sent with a
``Transfer-Encoding: chunked`` header, the response will be streamed one chunk at a time.
Modifying streamed data
-----------------------
If the ``.stream`` attribute is callable, ``.stream`` will wrap the generator that yields all
chunks.
.. literalinclude:: ../../examples/complex/stream_modify.py
:caption: examples/complex/stream_modify.py
:language: python
WebSocket Streaming
===================
The WebSocket streaming feature can be used to send the frames as soon as they arrive. This can be useful for large binary file transfers.
On the command-line
-------------------
Streaming can be enabled on the command line for all WebSocket frames
================== =================
command-line ``--set stream_websockets=true``
================== =================
.. note::
When Web Socket streaming is enabled, the message payload cannot be modified.
Implementation Details
----------------------
When WebSocket streaming is enabled, portions of the code which may perform changes to the WebSocket message payloads will not have
any effect on the actual payload sent to the server as the frames are immediately forwarded to the server.
In contrast to HTTP streaming, where the body is not stored, the message payload will still be stored in the WebSocket Flow.
.. seealso::
- :ref:`passthrough`

View File

@ -28,4 +28,4 @@ feature.
.. seealso::
- :ref:`passthrough`
- :ref:`responsestreaming`
- :ref:`streaming`

View File

@ -33,7 +33,7 @@
features/passthrough
features/proxyauth
features/reverseproxy
features/responsestreaming
features/streaming
features/socksproxy
features/sticky
features/tcpproxy

View File

@ -1,6 +1,6 @@
def responseheaders(flow):
"""
Enables streaming for all responses.
This is equivalent to passing `--stream 0` to mitmproxy.
This is equivalent to passing `--set stream_large_bodies=1` to mitmproxy.
"""
flow.response.stream = True

View File

@ -28,12 +28,18 @@ class StreamBodies:
if expected_size and not r.raw_content and not (0 <= expected_size <= self.max_size):
# r.stream may already be a callable, which we want to preserve.
r.stream = r.stream or True
# FIXME: make message generic when we add rquest streaming
ctx.log.info("Streaming response from %s" % f.request.host)
ctx.log.info("Streaming {} {}".format("response from" if not is_request else "request to", f.request.host))
# FIXME! Request streaming doesn't work at the moment.
def requestheaders(self, f):
self.run(f, True)
def responseheaders(self, f):
self.run(f, False)
def websocket_start(self, f):
if ctx.options.stream_websockets:
f.stream = True
ctx.log.info("Streaming WebSocket messages between {client} and {server}".format(
client=human.format_address(f.client_conn.address),
server=human.format_address(f.server_conn.address))
)

View File

@ -154,6 +154,13 @@ class Options(optmanager.OptManager):
Understands k/m/g suffixes, i.e. 3m for 3 megabytes.
"""
)
self.add_option(
"stream_websockets", bool, False,
"""
Stream WebSocket messages between client and server.
Messages are captured and cannot be modified.
"""
)
self.add_option(
"verbosity", int, 2,
"Log verbosity."

View File

@ -273,7 +273,10 @@ class HttpLayer(base.Layer):
self.send_response(http.expect_continue_response)
request.headers.pop("expect")
request.data.content = b"".join(self.read_request_body(request))
if f.request.stream:
f.request.data.content = None
else:
f.request.data.content = b"".join(self.read_request_body(request))
request.timestamp_end = time.time()
except exceptions.HttpException as e:
# We optimistically guess there might be an HTTP client on the
@ -326,12 +329,8 @@ class HttpLayer(base.Layer):
f.request.scheme
)
def get_response():
self.send_request(f.request)
f.response = self.read_response_headers()
try:
get_response()
self.send_request_headers(f.request)
except exceptions.NetlibException as e:
self.log(
"server communication error: %s" % repr(e),
@ -357,7 +356,19 @@ class HttpLayer(base.Layer):
self.disconnect()
self.connect()
get_response()
self.send_request_headers(f.request)
# This is taken out of the try except block because when streaming
# we can't send the request body while retrying as the generator gets exhausted
if f.request.stream:
chunks = self.read_request_body(f.request)
if callable(f.request.stream):
chunks = f.request.stream(chunks)
self.send_request_body(f.request, chunks)
else:
self.send_request_body(f.request, [f.request.data.content])
f.response = self.read_response_headers()
# call the appropriate script hook - this is an opportunity for
# an inline script to set f.stream = True

View File

@ -22,6 +22,16 @@ class Http1Layer(httpbase._HttpTransmissionLayer):
self.config.options._processed.get("body_size_limit")
)
def send_request_headers(self, request):
headers = http1.assemble_request_head(request)
self.server_conn.wfile.write(headers)
self.server_conn.wfile.flush()
def send_request_body(self, request, chunks):
for chunk in http1.assemble_body(request.headers, chunks):
self.server_conn.wfile.write(chunk)
self.server_conn.wfile.flush()
def send_request(self, request):
self.server_conn.wfile.write(http1.assemble_request(request))
self.server_conn.wfile.flush()

View File

@ -487,14 +487,23 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
@detect_zombie_stream
def read_request_body(self, request):
self.request_data_finished.wait()
data = []
while self.request_data_queue.qsize() > 0:
data.append(self.request_data_queue.get())
return data
if not request.stream:
self.request_data_finished.wait()
while True:
try:
yield self.request_data_queue.get(timeout=0.1)
except queue.Empty: # pragma: no cover
pass
if self.request_data_finished.is_set():
self.raise_zombie()
while self.request_data_queue.qsize() > 0:
yield self.request_data_queue.get()
break
self.raise_zombie()
@detect_zombie_stream
def send_request(self, message):
def send_request_headers(self, request):
if self.pushed:
# nothing to do here
return
@ -519,10 +528,10 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
self.server_stream_id = self.connections[self.server_conn].get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
headers = message.headers.copy()
headers.insert(0, ":path", message.path)
headers.insert(0, ":method", message.method)
headers.insert(0, ":scheme", message.scheme)
headers = request.headers.copy()
headers.insert(0, ":path", request.path)
headers.insert(0, ":method", request.method)
headers.insert(0, ":scheme", request.scheme)
priority_exclusive = None
priority_depends_on = None
@ -553,13 +562,24 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
self.raise_zombie()
self.connections[self.server_conn].lock.release()
@detect_zombie_stream
def send_request_body(self, request, chunks):
if self.pushed:
# nothing to do here
return
if not self.no_body:
self.connections[self.server_conn].safe_send_body(
self.raise_zombie,
self.server_stream_id,
[message.content]
chunks
)
@detect_zombie_stream
def send_request(self, message):
self.send_request_headers(message)
self.send_request_body(message, [message.content])
@detect_zombie_stream
def read_response_headers(self):
self.response_arrived.wait()

View File

@ -55,6 +55,7 @@ class WebSocketLayer(base.Layer):
return self._handle_unknown_frame(frame, source_conn, other_conn, is_server)
def _handle_data_frame(self, frame, source_conn, other_conn, is_server):
fb = self.server_frame_buffer if is_server else self.client_frame_buffer
fb.append(frame)
@ -70,43 +71,51 @@ class WebSocketLayer(base.Layer):
self.flow.messages.append(websocket_message)
self.channel.ask("websocket_message", self.flow)
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
pos = 0
for s in original_chunk_sizes:
yield payload[pos:pos + s]
pos += s
if not self.flow.stream:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
pos = 0
for s in original_chunk_sizes:
yield payload[pos:pos + s]
pos += s
else:
# just re-chunk everything into 4kB frames
# header len = 4 bytes without masking key and 8 bytes with masking key
chunk_size = 4092 if is_server else 4088
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield payload[i:i + chunk_size]
frms = [
websockets.Frame(
payload=chunk,
opcode=frame.header.opcode,
mask=(False if is_server else 1),
masking_key=(b'' if is_server else os.urandom(4)))
for chunk in get_chunk(websocket_message.content)
]
if len(frms) > 0:
frms[-1].header.fin = True
else:
# just re-chunk everything into 10kB frames
chunk_size = 10240
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield payload[i:i + chunk_size]
frms.append(websockets.Frame(
fin=True,
opcode=websockets.OPCODE.CONTINUE,
mask=(False if is_server else 1),
masking_key=(b'' if is_server else os.urandom(4))))
frms = [
websockets.Frame(
payload=chunk,
opcode=frame.header.opcode,
mask=(False if is_server else 1),
masking_key=(b'' if is_server else os.urandom(4)))
for chunk in get_chunk(websocket_message.content)
]
frms[0].header.opcode = message_type
frms[0].header.rsv1 = compressed_message
for frm in frms:
other_conn.send(bytes(frm))
if len(frms) > 0:
frms[-1].header.fin = True
else:
frms.append(websockets.Frame(
fin=True,
opcode=websockets.OPCODE.CONTINUE,
mask=(False if is_server else 1),
masking_key=(b'' if is_server else os.urandom(4))))
other_conn.send(bytes(frame))
frms[0].header.opcode = message_type
frms[0].header.rsv1 = compressed_message
for frm in frms:
other_conn.send(bytes(frm))
elif self.flow.stream:
other_conn.send(bytes(frame))
return True

View File

@ -45,6 +45,7 @@ class WebSocketFlow(flow.Flow):
self.close_code = '(status code missing)'
self.close_message = '(message missing)'
self.close_reason = 'unknown status code'
self.stream = False
if handshake_flow:
self.client_key = websockets.get_client_key(handshake_flow.request.headers)

View File

@ -29,3 +29,9 @@ def test_simple():
f = tflow.tflow(resp=True)
f.response.headers["content-length"] = "invalid"
tctx.cycle(sa, f)
tctx.configure(sa, stream_websockets = True)
f = tflow.twebsocketflow()
assert not f.stream
sa.websocket_start(f)
assert f.stream

View File

@ -1,7 +1,6 @@
from unittest import mock
import pytest
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.net.http import http1
from mitmproxy.net.tcp import TCPClient
@ -108,9 +107,5 @@ class TestStreaming(tservers.HTTPProxyTest):
r = p.request("post:'%s/p/200:b@10000'" % self.server.urlbase)
assert len(r.content) == 10000
if streaming:
with pytest.raises(exceptions.HttpReadDisconnect): # as the assertion in assert_write fails
# request with 10000 bytes
p.request("post:'%s/p/200':b@10000" % self.server.urlbase)
else:
assert p.request("post:'%s/p/200':b@10000" % self.server.urlbase)
# request with 10000 bytes
assert p.request("post:'%s/p/200':b@10000" % self.server.urlbase)

View File

@ -14,6 +14,7 @@ import mitmproxy.net
from ...net import tservers as net_tservers
from mitmproxy import exceptions
from mitmproxy.net.http import http1, http2
from pathod.language import generators
from ... import tservers
from ....conftest import requires_alpn
@ -166,7 +167,8 @@ class _Http2TestBase:
end_stream=None,
priority_exclusive=None,
priority_depends_on=None,
priority_weight=None):
priority_weight=None,
streaming=False):
if headers is None:
headers = []
if end_stream is None:
@ -182,7 +184,8 @@ class _Http2TestBase:
)
if body:
h2_conn.send_data(stream_id, body)
h2_conn.end_stream(stream_id)
if not streaming:
h2_conn.end_stream(stream_id)
wfile.write(h2_conn.data_to_send())
wfile.flush()
@ -862,3 +865,120 @@ class TestConnectionTerminated(_Http2Test):
assert connection_terminated_event.error_code == 5
assert connection_terminated_event.last_stream_id == 42
assert connection_terminated_event.additional_data == b'foobar'
@requires_alpn
class TestRequestStreaming(_Http2Test):
@classmethod
def handle_server_event(cls, event, h2_conn, rfile, wfile):
if isinstance(event, h2.events.ConnectionTerminated):
return False
elif isinstance(event, h2.events.DataReceived):
data = event.data
assert data
h2_conn.close_connection(error_code=5, last_stream_id=42, additional_data=data)
wfile.write(h2_conn.data_to_send())
wfile.flush()
return True
@pytest.mark.parametrize('streaming', [True, False])
def test_request_streaming(self, streaming):
class Stream:
def requestheaders(self, f):
f.request.stream = streaming
self.master.addons.add(Stream())
h2_conn = self.setup_connection()
body = generators.RandomGenerator("bytes", 100)[:]
self._send_request(
self.client.wfile,
h2_conn,
headers=[
(':authority', "127.0.0.1:{}".format(self.server.server.address[1])),
(':method', 'GET'),
(':scheme', 'https'),
(':path', '/'),
],
body=body,
streaming=True
)
done = False
connection_terminated_event = None
self.client.rfile.o.settimeout(2)
while not done:
try:
raw = b''.join(http2.read_raw_frame(self.client.rfile))
events = h2_conn.receive_data(raw)
for event in events:
if isinstance(event, h2.events.ConnectionTerminated):
connection_terminated_event = event
done = True
except:
break
if streaming:
assert connection_terminated_event.additional_data == body
else:
assert connection_terminated_event is None
@requires_alpn
class TestResponseStreaming(_Http2Test):
@classmethod
def handle_server_event(cls, event, h2_conn, rfile, wfile):
if isinstance(event, h2.events.ConnectionTerminated):
return False
elif isinstance(event, h2.events.RequestReceived):
data = generators.RandomGenerator("bytes", 100)[:]
h2_conn.send_headers(event.stream_id, [
(':status', '200'),
('content-length', '100')
])
h2_conn.send_data(event.stream_id, data)
wfile.write(h2_conn.data_to_send())
wfile.flush()
return True
@pytest.mark.parametrize('streaming', [True, False])
def test_response_streaming(self, streaming):
class Stream:
def responseheaders(self, f):
f.response.stream = streaming
self.master.addons.add(Stream())
h2_conn = self.setup_connection()
self._send_request(
self.client.wfile,
h2_conn,
headers=[
(':authority', "127.0.0.1:{}".format(self.server.server.address[1])),
(':method', 'GET'),
(':scheme', 'https'),
(':path', '/'),
]
)
done = False
self.client.rfile.o.settimeout(2)
data = None
while not done:
try:
raw = b''.join(http2.read_raw_frame(self.client.rfile))
events = h2_conn.receive_data(raw)
for event in events:
if isinstance(event, h2.events.DataReceived):
data = event.data
done = True
except:
break
if streaming:
assert data
else:
assert data is None

View File

@ -155,7 +155,13 @@ class TestSimple(_WebSocketTest):
wfile.write(bytes(frame))
wfile.flush()
def test_simple(self):
@pytest.mark.parametrize('streaming', [True, False])
def test_simple(self, streaming):
class Stream:
def websocket_start(self, f):
f.stream = streaming
self.master.addons.add(Stream())
self.setup_connection()
frame = websockets.Frame.from_file(self.client.rfile)
@ -328,3 +334,32 @@ class TestInvalidFrame(_WebSocketTest):
frame = websockets.Frame.from_file(self.client.rfile)
assert frame.header.opcode == 15
assert frame.payload == b'foobar'
class TestStreaming(_WebSocketTest):
@classmethod
def handle_websockets(cls, rfile, wfile):
wfile.write(bytes(websockets.Frame(opcode=websockets.OPCODE.TEXT, payload=b'server-foobar')))
wfile.flush()
@pytest.mark.parametrize('streaming', [True, False])
def test_streaming(self, streaming):
class Stream:
def websocket_start(self, f):
f.stream = streaming
self.master.addons.add(Stream())
self.setup_connection()
frame = None
if not streaming:
with pytest.raises(exceptions.TcpDisconnect): # Reader.safe_read get nothing as result
frame = websockets.Frame.from_file(self.client.rfile)
assert frame is None
else:
frame = websockets.Frame.from_file(self.client.rfile)
assert frame
assert self.master.state.flows[1].messages == [] # Message not appended as the final frame isn't received

View File

@ -239,13 +239,28 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
p.request("get:'%s'" % response)
def test_reconnect(self):
req = "get:'%s/p/200:b@1:da'" % self.server.urlbase
req = "get:'%s/p/200:b@1'" % self.server.urlbase
p = self.pathoc()
class MockOnce:
call = 0
def mock_once(self, http1obj, req):
self.call += 1
if self.call == 1:
raise exceptions.TcpDisconnect
else:
headers = http1.assemble_request_head(req)
http1obj.server_conn.wfile.write(headers)
http1obj.server_conn.wfile.flush()
with p.connect():
assert p.request(req)
# Server has disconnected. Mitmproxy should detect this, and reconnect.
assert p.request(req)
assert p.request(req)
with mock.patch("mitmproxy.proxy.protocol.http1.Http1Layer.send_request_headers",
side_effect=MockOnce().mock_once, autospec=True):
# Server disconnects while sending headers but mitmproxy reconnects
resp = p.request(req)
assert resp
assert resp.status_code == 200
def test_get_connection_switching(self):
req = "get:'%s/p/200:b@1'"
@ -1072,6 +1087,23 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
proxified to an upstream http proxy, we need to send the CONNECT
request again.
"""
class MockOnce:
call = 0
def mock_once(self, http1obj, req):
self.call += 1
if self.call == 2:
headers = http1.assemble_request_head(req)
http1obj.server_conn.wfile.write(headers)
http1obj.server_conn.wfile.flush()
raise exceptions.TcpDisconnect
else:
headers = http1.assemble_request_head(req)
http1obj.server_conn.wfile.write(headers)
http1obj.server_conn.wfile.flush()
self.chain[0].tmaster.addons.add(RequestKiller([1, 2]))
self.chain[1].tmaster.addons.add(RequestKiller([1]))
@ -1086,7 +1118,10 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
assert len(self.chain[0].tmaster.state.flows) == 1
assert len(self.chain[1].tmaster.state.flows) == 1
req = p.request("get:'/p/418:b\"content2\"'")
with mock.patch("mitmproxy.proxy.protocol.http1.Http1Layer.send_request_headers",
side_effect=MockOnce().mock_once, autospec=True):
req = p.request("get:'/p/418:b\"content2\"'")
assert req.status_code == 502
assert len(self.proxy.tmaster.state.flows) == 2