http2: improve kill and cleanup threads

This commit is contained in:
Thomas Kriechbaumer 2016-09-03 13:25:22 +02:00
parent 69b770469e
commit e273a29a8c
2 changed files with 16 additions and 11 deletions

View File

@ -175,7 +175,7 @@ class Http2Layer(base.Layer):
def _handle_data_received(self, eid, event, source_conn): def _handle_data_received(self, eid, event, source_conn):
bsl = self.config.options.body_size_limit bsl = self.config.options.body_size_limit
if bsl and self.streams[eid].queued_data_length > bsl: if bsl and self.streams[eid].queued_data_length > bsl:
self.streams[eid].zombie = time.time() self.streams[eid].kill()
source_conn.h2.safe_reset_stream( source_conn.h2.safe_reset_stream(
event.stream_id, event.stream_id,
h2.errors.REFUSED_STREAM h2.errors.REFUSED_STREAM
@ -196,7 +196,7 @@ class Http2Layer(base.Layer):
return True return True
def _handle_stream_reset(self, eid, event, is_server, other_conn): def _handle_stream_reset(self, eid, event, is_server, other_conn):
self.streams[eid].zombie = time.time() self.streams[eid].kill()
if eid in self.streams and event.error_code == h2.errors.CANCEL: if eid in self.streams and event.error_code == h2.errors.CANCEL:
if is_server: if is_server:
other_stream_id = self.streams[eid].client_stream_id other_stream_id = self.streams[eid].client_stream_id
@ -302,11 +302,7 @@ class Http2Layer(base.Layer):
def _kill_all_streams(self): def _kill_all_streams(self):
for stream in self.streams.values(): for stream in self.streams.values():
if not stream.zombie: stream.kill()
stream.zombie = time.time()
stream.request_data_finished.set()
stream.response_arrived.set()
stream.data_finished.set()
def __call__(self): def __call__(self):
self._initiate_server_conn() self._initiate_server_conn()
@ -377,6 +373,9 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
self.response_headers = None self.response_headers = None
self.pushed = False self.pushed = False
self.timestamp_start = None
self.timestamp_end = None
self.request_data_queue = queue.Queue() self.request_data_queue = queue.Queue()
self.request_queued_data_length = 0 self.request_queued_data_length = 0
self.request_data_finished = threading.Event() self.request_data_finished = threading.Event()
@ -393,6 +392,13 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
self.priority_weight = None self.priority_weight = None
self.handled_priority_event = None self.handled_priority_event = None
def kill(self):
if not self.zombie:
self.zombie = time.time()
self.request_data_finished.set()
self.response_arrived.set()
self.response_data_finished.set()
def connect(self): # pragma: no cover def connect(self): # pragma: no cover
raise exceptions.Http2ProtocolException("HTTP2 layer should already have a connection.") raise exceptions.Http2ProtocolException("HTTP2 layer should already have a connection.")
@ -604,5 +610,4 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
except exceptions.Kill: except exceptions.Kill:
self.log("Connection killed", "info") self.log("Connection killed", "info")
if not self.zombie: self.kill()
self.zombie = time.time()

View File

@ -37,7 +37,7 @@ def sysinfo():
return "\n".join(data) return "\n".join(data)
def dump_info(sig, frm, file=sys.stdout): # pragma: no cover def dump_info(signal=None, frame=None, file=sys.stdout): # pragma: no cover
print("****************************************************", file=file) print("****************************************************", file=file)
print("Summary", file=file) print("Summary", file=file)
print("=======", file=file) print("=======", file=file)
@ -81,7 +81,7 @@ def dump_info(sig, frm, file=sys.stdout): # pragma: no cover
print("****************************************************", file=file) print("****************************************************", file=file)
def dump_stacks(signal, frame, file=sys.stdout): def dump_stacks(signal=None, frame=None, file=sys.stdout):
id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = [] code = []
for threadId, stack in sys._current_frames().items(): for threadId, stack in sys._current_frames().items():