Merge pull request #1245 from Kriechi/http2-priority-information

http2: handle priority information better
This commit is contained in:
Thomas Kriechbaumer 2016-07-05 20:56:01 +02:00 committed by GitHub
commit 1adcd6ad6b
2 changed files with 49 additions and 18 deletions

View File

@ -5,7 +5,6 @@ import time
import traceback import traceback
import h2.exceptions import h2.exceptions
import hyperframe
import six import six
from h2 import connection from h2 import connection
from h2 import events from h2 import events
@ -55,11 +54,11 @@ class SafeH2Connection(connection.H2Connection):
self.update_settings(new_settings) self.update_settings(new_settings)
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
def safe_send_headers(self, is_zombie, stream_id, headers): def safe_send_headers(self, is_zombie, stream_id, headers, **kwargs):
# make sure to have a lock with self.lock:
if is_zombie(): # pragma: no cover if is_zombie(): # pragma: no cover
raise exceptions.Http2ProtocolException("Zombie Stream") raise exceptions.Http2ProtocolException("Zombie Stream")
self.send_headers(stream_id, headers.fields) self.send_headers(stream_id, headers.fields, **kwargs)
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
def safe_send_body(self, is_zombie, stream_id, chunks): def safe_send_body(self, is_zombie, stream_id, chunks):
@ -141,6 +140,12 @@ class Http2Layer(base.Layer):
headers = netlib.http.Headers([[k, v] for k, v in event.headers]) headers = netlib.http.Headers([[k, v] for k, v in event.headers])
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
self.streams[eid].timestamp_start = time.time() self.streams[eid].timestamp_start = time.time()
self.streams[eid].no_body = (event.stream_ended is not None)
if event.priority_updated is not None:
self.streams[eid].priority_weight = event.priority_updated.weight
self.streams[eid].priority_depends_on = event.priority_updated.depends_on
self.streams[eid].priority_exclusive = event.priority_updated.exclusive
self.streams[eid].handled_priority_event = event.priority_updated
self.streams[eid].start() self.streams[eid].start()
elif isinstance(event, events.ResponseReceived): elif isinstance(event, events.ResponseReceived):
headers = netlib.http.Headers([[k, v] for k, v in event.headers]) headers = netlib.http.Headers([[k, v] for k, v in event.headers])
@ -184,7 +189,6 @@ class Http2Layer(base.Layer):
self.client_conn.send(self.client_conn.h2.data_to_send()) self.client_conn.send(self.client_conn.h2.data_to_send())
self._kill_all_streams() self._kill_all_streams()
return False return False
elif isinstance(event, events.PushedStreamReceived): elif isinstance(event, events.PushedStreamReceived):
# pushed stream ids should be unique and not dependent on race conditions # pushed stream ids should be unique and not dependent on race conditions
# only the parent stream id must be looked up first # only the parent stream id must be looked up first
@ -202,6 +206,11 @@ class Http2Layer(base.Layer):
self.streams[event.pushed_stream_id].request_data_finished.set() self.streams[event.pushed_stream_id].request_data_finished.set()
self.streams[event.pushed_stream_id].start() self.streams[event.pushed_stream_id].start()
elif isinstance(event, events.PriorityUpdated): elif isinstance(event, events.PriorityUpdated):
if self.streams[eid].handled_priority_event is event:
# This event was already handled during stream creation
# HeadersFrame + Priority information as RequestReceived
return True
stream_id = event.stream_id stream_id = event.stream_id
if stream_id in self.streams.keys() and self.streams[stream_id].server_stream_id: if stream_id in self.streams.keys() and self.streams[stream_id].server_stream_id:
stream_id = self.streams[stream_id].server_stream_id stream_id = self.streams[stream_id].server_stream_id
@ -210,9 +219,18 @@ class Http2Layer(base.Layer):
if depends_on in self.streams.keys() and self.streams[depends_on].server_stream_id: if depends_on in self.streams.keys() and self.streams[depends_on].server_stream_id:
depends_on = self.streams[depends_on].server_stream_id depends_on = self.streams[depends_on].server_stream_id
# weight is between 1 and 256 (inclusive), but represented as uint8 (0 to 255) self.streams[eid].priority_weight = event.weight
frame = hyperframe.frame.PriorityFrame(stream_id, depends_on, event.weight - 1, event.exclusive) self.streams[eid].priority_depends_on = event.depends_on
self.server_conn.send(frame.serialize()) self.streams[eid].priority_exclusive = event.exclusive
with self.server_conn.h2.lock:
self.server_conn.h2.prioritize(
stream_id,
weight=event.weight,
depends_on=depends_on,
exclusive=event.exclusive
)
self.server_conn.send(self.server_conn.h2.data_to_send())
elif isinstance(event, events.TrailersReceived): elif isinstance(event, events.TrailersReceived):
raise NotImplementedError() raise NotImplementedError()
@ -296,6 +314,13 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
self.response_queued_data_length = 0 self.response_queued_data_length = 0
self.response_data_finished = threading.Event() self.response_data_finished = threading.Event()
self.no_body = False
self.priority_weight = None
self.priority_depends_on = None
self.priority_exclusive = None
self.handled_priority_event = None
@property @property
def data_queue(self): def data_queue(self):
if self.response_arrived.is_set(): if self.response_arrived.is_set():
@ -394,17 +419,23 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
self.is_zombie, self.is_zombie,
self.server_stream_id, self.server_stream_id,
headers, headers,
end_stream=self.no_body,
priority_weight=self.priority_weight,
priority_depends_on=self.priority_depends_on,
priority_exclusive=self.priority_exclusive,
) )
except Exception as e: except Exception as e:
raise e raise e
finally: finally:
self.server_conn.h2.lock.release() self.server_conn.h2.lock.release()
if not self.no_body:
self.server_conn.h2.safe_send_body( self.server_conn.h2.safe_send_body(
self.is_zombie, self.is_zombie,
self.server_stream_id, self.server_stream_id,
message.body message.body
) )
if self.zombie: # pragma: no cover if self.zombie: # pragma: no cover
raise exceptions.Http2ProtocolException("Zombie Stream") raise exceptions.Http2ProtocolException("Zombie Stream")

View File

@ -66,7 +66,7 @@ setup(
"construct>=2.5.2, <2.6", "construct>=2.5.2, <2.6",
"cryptography>=1.3, <1.5", "cryptography>=1.3, <1.5",
"Flask>=0.10.1, <0.12", "Flask>=0.10.1, <0.12",
"h2>=2.3.1, <3", "h2>=2.4.0, <3",
"html2text>=2016.1.8, <=2016.5.29", "html2text>=2016.1.8, <=2016.5.29",
"hyperframe>=4.0.1, <5", "hyperframe>=4.0.1, <5",
"lxml>=3.5.0, <3.7", "lxml>=3.5.0, <3.7",