mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 23:09:44 +00:00
cleanup code
This commit is contained in:
parent
2cd71091ad
commit
24641d8561
@ -239,6 +239,57 @@ class Http2Layer(Layer):
|
||||
# CONNECT for proxying?
|
||||
raise NotImplementedError()
|
||||
|
||||
def _handle_event(self, event, source_conn, other_conn, is_server):
|
||||
is_server = (conn == self.server_conn.connection)
|
||||
if hasattr(event, 'stream_id'):
|
||||
if is_server:
|
||||
eid = self.server_to_client_stream_ids[event.stream_id]
|
||||
else:
|
||||
eid = event.stream_id
|
||||
|
||||
if isinstance(event, RequestReceived):
|
||||
headers = Headers([[str(k), str(v)] for k, v in event.headers])
|
||||
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].start()
|
||||
elif isinstance(event, ResponseReceived):
|
||||
headers = Headers([[str(k), str(v)] for k, v in event.headers])
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].response_headers = headers
|
||||
self.streams[eid].response_arrived.set()
|
||||
elif isinstance(event, DataReceived):
|
||||
self.streams[eid].data_queue.put(event.data)
|
||||
source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
|
||||
elif isinstance(event, StreamEnded):
|
||||
self.streams[eid].timestamp_end = time.time()
|
||||
self.streams[eid].data_finished.set()
|
||||
elif isinstance(event, StreamReset):
|
||||
self.streams[eid].zombie = time.time()
|
||||
if eid in self.streams and event.error_code == 0x8:
|
||||
if is_server:
|
||||
other_stream_id = self.streams[eid].client_stream_id
|
||||
else:
|
||||
other_stream_id = self.streams[eid].server_stream_id
|
||||
other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
|
||||
elif isinstance(event, RemoteSettingsChanged):
|
||||
source_conn.h2.safe_acknowledge_settings(event)
|
||||
new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
|
||||
other_conn.h2.safe_update_settings(new_settings)
|
||||
elif isinstance(event, ConnectionTerminated):
|
||||
other_conn.h2.safe_close_connection(event.error_code)
|
||||
return
|
||||
elif isinstance(event, TrailersReceived):
|
||||
raise NotImplementedError()
|
||||
elif isinstance(event, PushedStreamReceived):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _cleanup_streams(self):
|
||||
death_time = time.time() - 10
|
||||
for stream_id in self.streams.keys():
|
||||
zombie = self.streams[stream_id].zombie
|
||||
if zombie and zombie <= death_time:
|
||||
self.streams.pop(stream_id, None)
|
||||
|
||||
def __call__(self):
|
||||
if self.server_conn:
|
||||
self._initiate_server_conn()
|
||||
@ -254,10 +305,9 @@ class Http2Layer(Layer):
|
||||
for conn in r:
|
||||
source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
|
||||
other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
|
||||
is_server = (conn == self.server_conn.connection)
|
||||
|
||||
fields = struct.unpack("!HB", source_conn.rfile.peek(3))
|
||||
length = (fields[0] << 8) + fields[1]
|
||||
field = source_conn.rfile.peek(3)
|
||||
length = (field[0] << 16) + (field[1] << 8) + field[2]
|
||||
raw_frame = source_conn.rfile.safe_read(9 + length)
|
||||
|
||||
with source_conn.h2.lock:
|
||||
@ -265,53 +315,9 @@ class Http2Layer(Layer):
|
||||
source_conn.send(source_conn.h2.data_to_send())
|
||||
|
||||
for event in events:
|
||||
if hasattr(event, 'stream_id'):
|
||||
if is_server:
|
||||
eid = self.server_to_client_stream_ids[event.stream_id]
|
||||
else:
|
||||
eid = event.stream_id
|
||||
self.handle_event(event, source_conn, other_conn)
|
||||
|
||||
if isinstance(event, RequestReceived):
|
||||
headers = Headers([[str(k), str(v)] for k, v in event.headers])
|
||||
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].start()
|
||||
elif isinstance(event, ResponseReceived):
|
||||
headers = Headers([[str(k), str(v)] for k, v in event.headers])
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].response_headers = headers
|
||||
self.streams[eid].response_arrived.set()
|
||||
elif isinstance(event, DataReceived):
|
||||
self.streams[eid].data_queue.put(event.data)
|
||||
source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
|
||||
elif isinstance(event, StreamEnded):
|
||||
self.streams[eid].timestamp_end = time.time()
|
||||
self.streams[eid].data_finished.set()
|
||||
elif isinstance(event, StreamReset):
|
||||
self.streams[eid].zombie = time.time()
|
||||
if eid in self.streams and event.error_code == 0x8:
|
||||
if is_server:
|
||||
other_stream_id = self.streams[eid].client_stream_id
|
||||
else:
|
||||
other_stream_id = self.streams[eid].server_stream_id
|
||||
other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
|
||||
elif isinstance(event, RemoteSettingsChanged):
|
||||
source_conn.h2.safe_acknowledge_settings(event)
|
||||
new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
|
||||
other_conn.h2.safe_update_settings(new_settings)
|
||||
elif isinstance(event, ConnectionTerminated):
|
||||
other_conn.h2.safe_close_connection(event.error_code)
|
||||
return
|
||||
elif isinstance(event, TrailersReceived):
|
||||
raise NotImplementedError()
|
||||
elif isinstance(event, PushedStreamReceived):
|
||||
raise NotImplementedError()
|
||||
|
||||
death_time = time.time() - 10
|
||||
for stream_id in self.streams.keys():
|
||||
zombie = self.streams[stream_id].zombie
|
||||
if zombie and zombie <= death_time:
|
||||
self.streams.pop(stream_id, None)
|
||||
self.cleanup_streams()
|
||||
|
||||
|
||||
class Http2SingleStreamLayer(_HttpLayer, threading.Thread):
|
||||
|
Loading…
Reference in New Issue
Block a user