This commit is contained in:
Thomas Kriechbaumer 2015-09-29 16:23:55 +02:00
parent ae4a1dd6de
commit e9eed5e4c2
4 changed files with 301 additions and 136 deletions

View File

@ -241,8 +241,6 @@ class HTTPRequest(MessageMixin, Request):
timestamp_end=request.timestamp_end,
form_out=(request.form_out if hasattr(request, 'form_out') else None),
)
if hasattr(request, 'stream_id'):
req.stream_id = request.stream_id
return req
def __hash__(self):
@ -347,8 +345,6 @@ class HTTPResponse(MessageMixin, Response):
timestamp_start=response.timestamp_start,
timestamp_end=response.timestamp_end,
)
if hasattr(response, 'stream_id'):
resp.stream_id = response.stream_id
return resp
def _refresh_cookie(self, c, delta):

View File

@ -1,27 +1,37 @@
from __future__ import (absolute_import, print_function, division)
import sys
import traceback
import six
import struct
import threading
import Queue
from netlib import tcp
from netlib.exceptions import HttpException, HttpReadDisconnect, NetlibException
from netlib.http import http1, Headers
from netlib.http import CONTENT_MISSING
from netlib.tcp import Address
from netlib.http.http2.connections import HTTP2Protocol
from netlib.http.http2.frame import GoAwayFrame, PriorityFrame, WindowUpdateFrame
from netlib.http import http1, Headers, CONTENT_MISSING
from netlib.tcp import Address, ssl_read_select
import h2
from h2.connection import H2Connection
from h2.events import *
from hyperframe import frame
from .base import Layer, Kill
from .. import utils
from ..exceptions import HttpProtocolException, ProtocolException
from ..models import (
HTTPFlow, HTTPRequest, HTTPResponse, make_error_response, make_connect_response, Error, expect_continue_response
HTTPFlow,
HTTPRequest,
HTTPResponse,
make_error_response,
make_connect_response,
Error,
expect_continue_response
)
from .base import Layer, Kill
class _HttpLayer(Layer):
supports_streaming = False
def read_request(self):
raise NotImplementedError()
@ -31,26 +41,6 @@ class _HttpLayer(Layer):
def send_request(self, request):
raise NotImplementedError()
def read_response(self, request):
raise NotImplementedError()
def send_response(self, response):
raise NotImplementedError()
def check_close_connection(self, flow):
raise NotImplementedError()
class _StreamingHttpLayer(_HttpLayer):
supports_streaming = True
def read_response_headers(self):
raise NotImplementedError
def read_response_body(self, request, response):
raise NotImplementedError()
yield "this is a generator" # pragma: no cover
def read_response(self, request):
response = self.read_response_headers()
response.data.content = b"".join(
@ -58,21 +48,30 @@ class _StreamingHttpLayer(_HttpLayer):
)
return response
def send_response_headers(self, response):
raise NotImplementedError
def send_response_body(self, response, chunks):
def read_response_headers(self):
raise NotImplementedError()
def read_response_body(self, request, response):
raise NotImplementedError()
yield "this is a generator" # pragma: no cover
def send_response(self, response):
if response.content == CONTENT_MISSING:
raise HttpException("Cannot assemble flow with CONTENT_MISSING")
self.send_response_headers(response)
self.send_response_body(response, [response.content])
def send_response_headers(self, response):
raise NotImplementedError()
class Http1Layer(_StreamingHttpLayer):
def send_response_body(self, response, chunks):
raise NotImplementedError()
def check_close_connection(self, flow):
raise NotImplementedError()
class Http1Layer(_HttpLayer):
def __init__(self, ctx, mode):
super(Http1Layer, self).__init__(ctx)
self.mode = mode
@ -130,104 +129,277 @@ class Http1Layer(_StreamingHttpLayer):
layer = HttpLayer(self, self.mode)
layer()
class SafeH2Connection(H2Connection):
def __init__(self, conn, *args, **kwargs):
super(SafeH2Connection, self).__init__(*args, **kwargs)
self.conn = conn
self.lock = threading.RLock()
# TODO: The HTTP2 layer is missing multiplexing, which requires a major rewrite.
class Http2Layer(_HttpLayer):
def safe_increment_flow_control(self, stream_id, length):
with self.lock:
self.increment_flow_control_window(length)
self.conn.send(self.data_to_send())
with self.lock:
if stream_id in self.streams and not self.streams[stream_id].closed:
self.increment_flow_control_window(length, stream_id=stream_id)
self.conn.send(self.data_to_send())
def safe_reset_stream(self, stream_id, error_code):
with self.lock:
self.reset_stream(stream_id, error_code)
self.conn.send(self.h2.data_to_send())
def safe_acknowledge_settings(self, event):
with self.conn.h2.lock:
self.conn.h2.acknowledge_settings(event)
self.conn.send(self.data_to_send())
def safe_update_settings(self, new_settings):
with self.conn.h2.lock:
self.update_settings(new_settings)
self.conn.send(self.data_to_send())
def safe_send_headers(self, stream_id, headers):
with self.lock:
self.send_headers(stream_id, headers)
self.conn.send(self.data_to_send())
def safe_send_body(self, stream_id, chunks):
for chunk in chunks:
max_outbound_frame_size = self.max_outbound_frame_size
for i in xrange(0, len(chunk), max_outbound_frame_size):
frame_chunk = chunk[i:i+max_outbound_frame_size]
with self.lock:
self.send_data(stream_id, frame_chunk)
self.conn.send(self.data_to_send())
with self.lock:
self.end_stream(stream_id)
self.conn.send(self.data_to_send())
class Http2Layer(Layer):
def __init__(self, ctx, mode):
super(Http2Layer, self).__init__(ctx)
self.mode = mode
self.client_protocol = HTTP2Protocol(self.client_conn, is_server=True,
unhandled_frame_cb=self.handle_unexpected_frame_from_client)
self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
unhandled_frame_cb=self.handle_unexpected_frame_from_server)
self.streams = dict()
self.server_to_client_stream_ids = dict([(0, 0)])
self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
def read_request(self):
request = HTTPRequest.from_protocol(
self.client_protocol,
body_size_limit=self.config.body_size_limit
)
self._stream_id = request.stream_id
return request
# make sure that we only pass actual SSL.Connection objects in here,
# because otherwise ssl_read_select fails!
self.active_conns = [self.client_conn.connection]
def send_request(self, message):
# TODO: implement flow control and WINDOW_UPDATE frames
self.server_conn.send(self.server_protocol.assemble(message))
if self.server_conn:
self._initiate_server_conn()
def read_response(self, request):
return HTTPResponse.from_protocol(
self.server_protocol,
request_method=request.method,
body_size_limit=self.config.body_size_limit,
include_body=True,
stream_id=self._stream_id
)
def send_response(self, message):
# TODO: implement flow control to prevent client buffer filling up
# maintain a send buffer size, and read WindowUpdateFrames from client to increase the send buffer
self.client_conn.send(self.client_protocol.assemble(message))
def check_close_connection(self, flow):
# TODO: add a timer to disconnect after a 10 second timeout
return False
def _initiate_server_conn(self):
self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
self.server_conn.h2.initiate_connection()
self.server_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False})
self.server_conn.send(self.server_conn.h2.data_to_send())
self.active_conns.append(self.server_conn.connection)
def connect(self):
self.ctx.connect()
self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
unhandled_frame_cb=self.handle_unexpected_frame_from_server)
self.server_protocol.perform_connection_preface()
self.server_conn.connect()
self._initiate_server_conn()
def set_server(self, *args, **kwargs):
self.ctx.set_server(*args, **kwargs)
self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
unhandled_frame_cb=self.handle_unexpected_frame_from_server)
self.server_protocol.perform_connection_preface()
def set_server(self):
raise NotImplementedError("Cannot change server for HTTP2 connections.")
def disconnect(self):
raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
def __call__(self):
self.server_protocol.perform_connection_preface()
preamble = self.client_conn.rfile.read(24)
self.client_conn.h2.initiate_connection()
self.client_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False})
self.client_conn.h2.receive_data(preamble)
self.client_conn.send(self.client_conn.h2.data_to_send())
while True:
r = ssl_read_select(self.active_conns, 1)
for conn in r:
source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
is_server = (conn == self.server_conn.connection)
fields = struct.unpack("!HB", source_conn.rfile.peek(3))
length = (fields[0] << 8) + fields[1]
raw_frame = source_conn.rfile.safe_read(9 + length)
with source_conn.h2.lock:
events = source_conn.h2.receive_data(raw_frame)
source_conn.send(source_conn.h2.data_to_send())
for event in events:
if hasattr(event, 'stream_id'):
if is_server:
eid = self.server_to_client_stream_ids[event.stream_id]
else:
eid = event.stream_id
if isinstance(event, RequestReceived):
headers = Headers([[str(k), str(v)] for k, v in event.headers])
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
self.streams[eid].start()
elif isinstance(event, ResponseReceived):
headers = Headers([[str(k), str(v)] for k, v in event.headers])
self.streams[eid].response_headers = headers
self.streams[eid].response_arrived.set()
elif isinstance(event, DataReceived):
self.streams[eid].data_queue.put(event.data)
source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
elif isinstance(event, StreamEnded):
self.streams[eid].data_finished.set()
elif isinstance(event, StreamReset):
self.streams[eid].zombie = True
if eid in self.streams and event.error_code == 0x8:
if is_server:
other_stream_id = self.streams[eid].client_stream_id
else:
other_stream_id = self.streams[eid].server_stream_id
other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
elif isinstance(event, RemoteSettingsChanged):
source_conn.h2.safe_acknowledge_settings(event)
new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
other_conn.h2.safe_update_settings(new_settings)
# TODO: cleanup resources once we are sure nobody needs them
# for stream_id in self.streams.keys():
# if self.streams[stream_id].zombie:
# self.streams.pop(stream_id, None)
class Http2SingleStreamLayer(_HttpLayer, threading.Thread):
def __init__(self, ctx, stream_id, request_headers):
super(Http2SingleStreamLayer, self).__init__(ctx)
self.zombie = False
self.client_stream_id = stream_id
self.server_stream_id = None
self.request_headers = request_headers
self.response_headers = None
self.data_queue = Queue.Queue()
self.response_arrived = threading.Event()
self.data_finished = threading.Event()
def read_request(self):
self.data_finished.wait()
self.data_finished.clear()
authority = self.request_headers.get(':authority', '')
method = self.request_headers.get(':method', 'GET')
scheme = self.request_headers.get(':scheme', 'https')
path = self.request_headers.get(':path', '/')
host = None
port = None
if path == '*' or path.startswith("/"):
form_in = "relative"
elif method == 'CONNECT':
form_in = "authority"
if ":" in authority:
host, port = authority.split(":", 1)
else:
host = authority
else:
form_in = "absolute"
# FIXME: verify if path or :host contains what we need
scheme, host, port, _ = utils.parse_url(path)
if host is None:
host = 'localhost'
if port is None:
port = 80 if scheme == 'http' else 443
port = int(port)
data = []
while self.data_queue.qsize() > 0:
data.append(self.data_queue.get())
return HTTPRequest(
form_in,
method,
scheme,
host,
port,
path,
(2, 0),
self.request_headers,
data,
# TODO: timestamp_start=None,
# TODO: timestamp_end=None,
form_out=None, # TODO: (request.form_out if hasattr(request, 'form_out') else None),
)
def send_request(self, message):
with self.server_conn.h2.lock:
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
self.server_conn.h2.safe_send_headers(
self.server_stream_id,
message.headers
)
self.server_conn.h2.safe_send_body(
self.server_stream_id,
message.body
)
def read_response_headers(self):
self.response_arrived.wait()
status_code = int(self.response_headers.get(':status', 502))
return HTTPResponse(
http_version=(2, 0),
status_code=status_code,
reason='',
headers=self.response_headers,
content=None,
# TODO: timestamp_start=response.timestamp_start,
# TODO: timestamp_end=response.timestamp_end,
)
def read_response_body(self, request, response):
while True:
try:
yield self.data_queue.get(timeout=1)
except Queue.Empty:
pass
if self.data_finished.is_set():
while self.data_queue.qsize() > 0:
yield self.data_queue.get()
return
def send_response_headers(self, response):
self.client_conn.h2.safe_send_headers(
self.client_stream_id,
response.headers
)
def send_response_body(self, _response, chunks):
self.client_conn.h2.safe_send_body(
self.client_stream_id,
chunks
)
def check_close_connection(self, flow):
# This layer only handles a single stream.
# RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
return True
def connect(self):
raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
def set_server(self, *args, **kwargs):
# do not mess with the server connection - all streams share it.
pass
def run(self):
layer = HttpLayer(self, self.mode)
layer()
# terminate the connection
self.client_conn.send(GoAwayFrame().to_bytes())
def handle_unexpected_frame_from_client(self, frame):
if isinstance(frame, WindowUpdateFrame):
# Clients are sending WindowUpdate frames depending on their flow control algorithm.
# Since we cannot predict these frames, and we do not need to respond to them,
# simply accept them, and hide them from the log.
# Ideally we should keep track of our own flow control window and
# stall transmission if the outgoing flow control buffer is full.
return
if isinstance(frame, PriorityFrame):
# Clients are sending Priority frames depending on their implementation.
# The RFC does not clearly state when or which priority preferences should be set.
# Since we cannot predict these frames, and we do not need to respond to them,
# simply accept them, and hide them from the log.
# Ideally we should forward them to the server.
return
if isinstance(frame, GoAwayFrame):
# Client wants to terminate the connection,
# relay it to the server.
self.server_conn.send(frame.to_bytes())
return
self.log("Unexpected HTTP2 frame from client: %s" % frame.human_readable(), "info")
def handle_unexpected_frame_from_server(self, frame):
if isinstance(frame, WindowUpdateFrame):
# Servers are sending WindowUpdate frames depending on their flow control algorithm.
# Since we cannot predict these frames, and we do not need to respond to them,
# simply accept them, and hide them from the log.
# Ideally we should keep track of our own flow control window and
# stall transmission if the outgoing flow control buffer is full.
return
if isinstance(frame, GoAwayFrame):
# Server wants to terminate the connection,
# relay it to the client.
self.client_conn.send(frame.to_bytes())
return
self.log("Unexpected HTTP2 frame from server: %s" % frame.human_readable(), "info")
self.zombie = True
class ConnectServerConnection(object):
@ -420,7 +592,7 @@ class HttpLayer(Layer):
layer()
def send_response_to_client(self, flow):
if not (self.supports_streaming and flow.response.stream):
if not flow.response.stream:
# no streaming:
# we already received the full response from the server and can
# send it to the client straight away.
@ -441,10 +613,7 @@ class HttpLayer(Layer):
def get_response_from_server(self, flow):
def get_response():
self.send_request(flow.request)
if self.supports_streaming:
flow.response = self.read_response_headers()
else:
flow.response = self.read_response(flow.request)
flow.response = self.read_response_headers()
try:
get_response()
@ -474,15 +643,14 @@ class HttpLayer(Layer):
if flow == Kill:
raise Kill()
if self.supports_streaming:
if flow.response.stream:
flow.response.data.content = CONTENT_MISSING
else:
flow.response.data.content = b"".join(self.read_response_body(
flow.request,
flow.response
))
flow.response.timestamp_end = utils.timestamp()
if flow.response.stream:
flow.response.data.content = CONTENT_MISSING
else:
flow.response.data.content = b"".join(self.read_response_body(
flow.request,
flow.response
))
flow.response.timestamp_end = utils.timestamp()
# no further manipulation of self.server_conn beyond this point
# we can safely set it as the final attribute value here.

View File

@ -1,3 +1,3 @@
-e git+https://github.com/mitmproxy/netlib.git#egg=netlib
-e git+https://github.com/mitmproxy/pathod.git#egg=pathod
-e .[dev,examples,contentviews]
-e .[dev,examples,contentviews]

View File

@ -17,6 +17,7 @@ with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
# This will break `pip install` on systems with old setuptools versions.
deps = {
"netlib>=%s, <%s" % (version.MINORVERSION, version.NEXT_MINORVERSION),
"h2>=2.0.0",
"tornado>=4.3.0, <4.4",
"configargparse>=0.10.0, <0.11",
"pyperclip>=1.5.22, <1.6",