From 000e26674e39a04c23fcc717d50c9fa475330796 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 8 Jan 2022 08:06:58 +0100 Subject: [PATCH] catch cancellation errors when draining writers, fix #5034 (#5040) --- CHANGELOG.md | 1 + mitmproxy/proxy/server.py | 26 +++++++++++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 268574d7b..77af9e6a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * Fix bug that crashed when using `view.flows.resolve` (#4916, @rbdixon) * Fix a bug where `running()` is invoked twice on startup (#3584, @mhils) * Correct documentation example for User-Agent header modification (#4997, @jamesyale) +* Fix random connection stalls (#5040, @EndUser509) ## 28 September 2021: mitmproxy 7.0.4 diff --git a/mitmproxy/proxy/server.py b/mitmproxy/proxy/server.py index 4b26b23c6..7af7f09d6 100644 --- a/mitmproxy/proxy/server.py +++ b/mitmproxy/proxy/server.py @@ -223,11 +223,14 @@ class ConnectionHandler(metaclass=abc.ABCMeta): except asyncio.CancelledError as e: cancelled = e break - else: - self.server_event(events.DataReceived(connection, data)) - for transport in self.transports.values(): - if transport.writer is not None: - await transport.writer.drain() + + self.server_event(events.DataReceived(connection, data)) + + try: + await self.drain_writers() + except asyncio.CancelledError as e: + cancelled = e + break if cancelled is None: connection.state &= ~ConnectionState.CAN_READ @@ -253,6 +256,19 @@ class ConnectionHandler(metaclass=abc.ABCMeta): if cancelled: raise cancelled + async def drain_writers(self): + """ + Drain all writers to create some backpressure. We won't continue reading until there's space available in our + write buffers, so if we cannot write fast enough our own read buffers run full and the TCP recv stream is throttled. + """ + for transport in self.transports.values(): + if transport.writer is not None: + try: + await transport.writer.drain() + except OSError as e: + if transport.handler is not None: + asyncio_utils.cancel_task(transport.handler, f"Error sending data: {e}") + async def on_timeout(self) -> None: self.log(f"Closing connection due to inactivity: {self.client}") handler = self.transports[self.client].handler