http2: properly handle connection errors

This commit is contained in:
Thomas Kriechbaumer 2016-05-13 08:38:00 -05:00
parent e61014d203
commit 43ab9f7bd0
3 changed files with 37 additions and 13 deletions

View File

@ -50,6 +50,10 @@ class HttpProtocolException(ProtocolException):
pass pass
class Http2ProtocolException(ProtocolException):
pass
class ServerException(ProxyException): class ServerException(ProxyException):
pass pass

View File

@ -11,7 +11,7 @@ from netlib.http import Headers
from h2.exceptions import H2Error from h2.exceptions import H2Error
from .. import utils from .. import utils
from ..exceptions import HttpProtocolException, ProtocolException from ..exceptions import HttpProtocolException, Http2ProtocolException, ProtocolException
from ..models import ( from ..models import (
HTTPFlow, HTTPFlow,
HTTPResponse, HTTPResponse,
@ -243,7 +243,7 @@ class HttpLayer(Layer):
try: try:
response = make_error_response(code, message) response = make_error_response(code, message)
self.send_response(response) self.send_response(response)
except (NetlibException, H2Error): except (NetlibException, H2Error, Http2ProtocolException):
self.log(traceback.format_exc(), "debug") self.log(traceback.format_exc(), "debug")
def change_upstream_proxy_server(self, address): def change_upstream_proxy_server(self, address):
@ -288,9 +288,9 @@ class HttpLayer(Layer):
try: try:
get_response() get_response()
except NetlibException as v: except NetlibException as e:
self.log( self.log(
"server communication error: %s" % repr(v), "server communication error: %s" % repr(e),
level="debug" level="debug"
) )
# In any case, we try to reconnect at least once. This is # In any case, we try to reconnect at least once. This is
@ -304,6 +304,11 @@ class HttpLayer(Layer):
# > server detects timeout, disconnects # > server detects timeout, disconnects
# > read (100-n)% of large request # > read (100-n)% of large request
# > send large request upstream # > send large request upstream
if isinstance(e, Http2ProtocolException):
# do not try to reconnect for HTTP2
raise ProtocolException("First and only attempt to get response via HTTP2 failed.")
self.disconnect() self.disconnect()
self.connect() self.connect()
get_response() get_response()

View File

@ -4,6 +4,7 @@ import threading
import time import time
from six.moves import queue from six.moves import queue
import traceback
import h2 import h2
import six import six
from h2.connection import H2Connection from h2.connection import H2Connection
@ -15,6 +16,7 @@ from netlib.utils import http2_read_raw_frame
from .base import Layer from .base import Layer
from .http import _HttpTransmissionLayer, HttpLayer from .http import _HttpTransmissionLayer, HttpLayer
from ..exceptions import ProtocolException, Http2ProtocolException
from .. import utils from .. import utils
from ..models import HTTPRequest, HTTPResponse from ..models import HTTPRequest, HTTPResponse
@ -55,7 +57,7 @@ class SafeH2Connection(H2Connection):
def safe_send_headers(self, is_zombie, stream_id, headers): def safe_send_headers(self, is_zombie, stream_id, headers):
with self.lock: with self.lock:
if is_zombie(): # pragma: no cover if is_zombie(): # pragma: no cover
return raise Http2ProtocolException("Zombie Stream")
self.send_headers(stream_id, headers.fields) self.send_headers(stream_id, headers.fields)
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
@ -66,7 +68,7 @@ class SafeH2Connection(H2Connection):
self.lock.acquire() self.lock.acquire()
if is_zombie(): # pragma: no cover if is_zombie(): # pragma: no cover
self.lock.release() self.lock.release()
return raise Http2ProtocolException("Zombie Stream")
max_outbound_frame_size = self.max_outbound_frame_size max_outbound_frame_size = self.max_outbound_frame_size
frame_chunk = chunk[position:position + max_outbound_frame_size] frame_chunk = chunk[position:position + max_outbound_frame_size]
if self.local_flow_control_window(stream_id) < len(frame_chunk): if self.local_flow_control_window(stream_id) < len(frame_chunk):
@ -79,7 +81,7 @@ class SafeH2Connection(H2Connection):
position += max_outbound_frame_size position += max_outbound_frame_size
with self.lock: with self.lock:
if is_zombie(): # pragma: no cover if is_zombie(): # pragma: no cover
return raise Http2ProtocolException("Zombie Stream")
self.end_stream(stream_id) self.end_stream(stream_id)
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
@ -104,7 +106,7 @@ class Http2Layer(Layer):
self.active_conns.append(self.server_conn.connection) self.active_conns.append(self.server_conn.connection)
def connect(self): # pragma: no cover def connect(self): # pragma: no cover
raise ValueError("CONNECT inside an HTTP2 stream is not supported.") raise Http2ProtocolException("HTTP2 layer should already have a connection.")
def set_server(self): # pragma: no cover def set_server(self): # pragma: no cover
raise NotImplementedError("Cannot change server for HTTP2 connections.") raise NotImplementedError("Cannot change server for HTTP2 connections.")
@ -215,6 +217,7 @@ class Http2Layer(Layer):
raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile)) raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile))
except: except:
# read frame failed: connection closed # read frame failed: connection closed
# kill all streams
for stream in self.streams.values(): for stream in self.streams.values():
stream.zombie = time.time() stream.zombie = time.time()
return return
@ -232,7 +235,7 @@ class Http2Layer(Layer):
class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
def __init__(self, ctx, stream_id, request_headers): def __init__(self, ctx, stream_id, request_headers):
super(Http2SingleStreamLayer, self).__init__(ctx) super(Http2SingleStreamLayer, self).__init__(ctx, name="Thread-Http2SingleStreamLayer-{}".format(stream_id))
self.zombie = None self.zombie = None
self.client_stream_id = stream_id self.client_stream_id = stream_id
self.server_stream_id = None self.server_stream_id = None
@ -335,7 +338,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
with self.server_conn.h2.lock: with self.server_conn.h2.lock:
# We must not assign a stream id if we are already a zombie. # We must not assign a stream id if we are already a zombie.
if self.zombie: # pragma: no cover if self.zombie: # pragma: no cover
return raise Http2ProtocolException("Zombie Stream")
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
@ -350,6 +353,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.server_stream_id, self.server_stream_id,
message.body message.body
) )
if self.zombie: # pragma: no cover
raise Http2ProtocolException("Zombie Stream")
def read_response_headers(self): def read_response_headers(self):
self.response_arrived.wait() self.response_arrived.wait()
@ -377,7 +382,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
yield self.response_data_queue.get() yield self.response_data_queue.get()
return return
if self.zombie: # pragma: no cover if self.zombie: # pragma: no cover
return raise Http2ProtocolException("Zombie Stream")
def send_response_headers(self, response): def send_response_headers(self, response):
self.client_conn.h2.safe_send_headers( self.client_conn.h2.safe_send_headers(
@ -385,6 +390,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.client_stream_id, self.client_stream_id,
response.headers response.headers
) )
if self.zombie: # pragma: no cover
raise Http2ProtocolException("Zombie Stream")
def send_response_body(self, _response, chunks): def send_response_body(self, _response, chunks):
self.client_conn.h2.safe_send_body( self.client_conn.h2.safe_send_body(
@ -392,6 +399,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.client_stream_id, self.client_stream_id,
chunks chunks
) )
if self.zombie: # pragma: no cover
raise Http2ProtocolException("Zombie Stream")
def check_close_connection(self, flow): def check_close_connection(self, flow):
# This layer only handles a single stream. # This layer only handles a single stream.
@ -404,5 +413,11 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
def run(self): def run(self):
layer = HttpLayer(self, self.mode) layer = HttpLayer(self, self.mode)
layer()
try:
layer()
except ProtocolException as e:
self.log(repr(e), "info")
self.log(traceback.format_exc(), "debug")
self.zombie = time.time() self.zombie = time.time()