http2: respect MAX_CONCURRENT_STREAMS by hold-off

This commit is contained in:
Thomas Kriechbaumer 2016-05-21 20:25:02 +02:00
parent e880f532ad
commit 7f4ac6f27b

View File

@ -73,7 +73,7 @@ class SafeH2Connection(connection.H2Connection):
frame_chunk = chunk[position:position + max_outbound_frame_size] frame_chunk = chunk[position:position + max_outbound_frame_size]
if self.local_flow_control_window(stream_id) < len(frame_chunk): if self.local_flow_control_window(stream_id) < len(frame_chunk):
self.lock.release() self.lock.release()
time.sleep(0) time.sleep(0.1)
continue continue
self.send_data(stream_id, frame_chunk) self.send_data(stream_id, frame_chunk)
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
@ -352,8 +352,22 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
raise NotImplementedError() raise NotImplementedError()
def send_request(self, message): def send_request(self, message):
if not hasattr(self.server_conn, 'h2'):
raise exceptions.Http2ProtocolException("Zombie Stream")
while True:
self.server_conn.h2.lock.acquire()
max_streams = self.server_conn.h2.remote_settings.max_concurrent_streams
if self.server_conn.h2.open_outbound_streams + 1 >= max_streams:
# wait until we get a free slot for a new outgoing stream
self.server_conn.h2.lock.release()
time.sleep(0.1)
else:
break
if self.pushed: if self.pushed:
# nothing to do here # nothing to do here
self.server_conn.h2.lock.release()
return return
with self.server_conn.h2.lock: with self.server_conn.h2.lock:
@ -364,16 +378,19 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
headers = message.headers.copy() headers = message.headers.copy()
headers.insert(0, ":path", message.path) headers.insert(0, ":path", message.path)
headers.insert(0, ":method", message.method) headers.insert(0, ":method", message.method)
headers.insert(0, ":scheme", message.scheme) headers.insert(0, ":scheme", message.scheme)
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
self.server_conn.h2.safe_send_headers(
self.is_zombie,
self.server_stream_id,
headers,
)
self.server_conn.h2.lock.release()
self.server_conn.h2.safe_send_headers(
self.is_zombie,
self.server_stream_id,
headers
)
self.server_conn.h2.safe_send_body( self.server_conn.h2.safe_send_body(
self.is_zombie, self.is_zombie,
self.server_stream_id, self.server_stream_id,
@ -408,7 +425,7 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
if self.response_data_finished.is_set(): if self.response_data_finished.is_set():
while self.response_data_queue.qsize() > 0: while self.response_data_queue.qsize() > 0:
yield self.response_data_queue.get() yield self.response_data_queue.get()
return break
if self.zombie: # pragma: no cover if self.zombie: # pragma: no cover
raise exceptions.Http2ProtocolException("Zombie Stream") raise exceptions.Http2ProtocolException("Zombie Stream")