Merge pull request #1086 from Kriechi/h2-improv

cleanup h2
This commit is contained in:
Thomas Kriechbaumer 2016-05-10 11:09:11 -05:00
commit 595a01de4e

View File

@ -26,11 +26,6 @@ class SafeH2Connection(H2Connection):
self.conn = conn
self.lock = threading.RLock()
def safe_close_connection(self, error_code):
with self.lock:
self.close_connection(error_code)
self.conn.send(self.data_to_send())
def safe_increment_flow_control(self, stream_id, length):
if length == 0:
return
@ -47,7 +42,7 @@ class SafeH2Connection(H2Connection):
with self.lock:
try:
self.reset_stream(stream_id, error_code)
except h2.exceptions.StreamClosedError:
except h2.exceptions.StreamClosedError: # pragma: no cover
# stream is already closed - good
pass
self.conn.send(self.data_to_send())
@ -59,7 +54,7 @@ class SafeH2Connection(H2Connection):
def safe_send_headers(self, is_zombie, stream_id, headers):
with self.lock:
if is_zombie():
if is_zombie(): # pragma: no cover
return
self.send_headers(stream_id, headers)
self.conn.send(self.data_to_send())
@ -69,7 +64,7 @@ class SafeH2Connection(H2Connection):
position = 0
while position < len(chunk):
self.lock.acquire()
if is_zombie():
if is_zombie(): # pragma: no cover
self.lock.release()
return
max_outbound_frame_size = self.max_outbound_frame_size
@ -83,7 +78,7 @@ class SafeH2Connection(H2Connection):
self.lock.release()
position += max_outbound_frame_size
with self.lock:
if is_zombie():
if is_zombie(): # pragma: no cover
return
self.end_stream(stream_id)
self.conn.send(self.data_to_send())
@ -95,8 +90,6 @@ class Http2Layer(Layer):
super(Http2Layer, self).__init__(ctx)
self.mode = mode
self.streams = dict()
self.client_reset_streams = []
self.server_reset_streams = []
self.server_to_client_stream_ids = dict([(0, 0)])
self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
@ -112,9 +105,6 @@ class Http2Layer(Layer):
def connect(self): # pragma: no cover
raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
# self.ctx.connect()
# self.server_conn.connect()
# self._initiate_server_conn()
def set_server(self): # pragma: no cover
raise NotImplementedError("Cannot change server for HTTP2 connections.")
@ -162,9 +152,6 @@ class Http2Layer(Layer):
self.streams[eid].data_finished.set()
elif isinstance(event, h2.events.StreamReset):
self.streams[eid].zombie = time.time()
self.client_reset_streams.append(self.streams[eid].client_stream_id)
if self.streams[eid].server_stream_id:
self.server_reset_streams.append(self.streams[eid].server_stream_id)
if eid in self.streams and event.error_code == 0x8:
if is_server:
other_stream_id = self.streams[eid].client_stream_id
@ -227,6 +214,7 @@ class Http2Layer(Layer):
try:
raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile))
except:
# read frame failed: connection closed
for stream in self.streams.values():
stream.zombie = time.time()
return
@ -346,7 +334,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
with self.server_conn.h2.lock:
# We must not assign a stream id if we are already a zombie.
if self.zombie:
if self.zombie: # pragma: no cover
return
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
@ -388,7 +376,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
while self.response_data_queue.qsize() > 0:
yield self.response_data_queue.get()
return
if self.zombie:
if self.zombie: # pragma: no cover
return
def send_response_headers(self, response):
@ -410,9 +398,6 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
# RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
return True
def connect(self): # pragma: no cover
raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
def set_server(self, *args, **kwargs): # pragma: no cover
# do not mess with the server connection - all streams share it.
pass