catch cancellation errors when draining writers, fix #5034 (#5040)

This commit is contained in:
Maximilian Hils 2022-01-08 08:06:58 +01:00 committed by GitHub
parent ef8c88da1f
commit 000e26674e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 5 deletions

View File

@ -21,6 +21,7 @@
* Fix bug that crashed when using `view.flows.resolve` (#4916, @rbdixon) * Fix bug that crashed when using `view.flows.resolve` (#4916, @rbdixon)
* Fix a bug where `running()` is invoked twice on startup (#3584, @mhils) * Fix a bug where `running()` is invoked twice on startup (#3584, @mhils)
* Correct documentation example for User-Agent header modification (#4997, @jamesyale) * Correct documentation example for User-Agent header modification (#4997, @jamesyale)
* Fix random connection stalls (#5040, @EndUser509)
## 28 September 2021: mitmproxy 7.0.4 ## 28 September 2021: mitmproxy 7.0.4

View File

@ -223,11 +223,14 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
except asyncio.CancelledError as e: except asyncio.CancelledError as e:
cancelled = e cancelled = e
break break
else:
self.server_event(events.DataReceived(connection, data)) self.server_event(events.DataReceived(connection, data))
for transport in self.transports.values():
if transport.writer is not None: try:
await transport.writer.drain() await self.drain_writers()
except asyncio.CancelledError as e:
cancelled = e
break
if cancelled is None: if cancelled is None:
connection.state &= ~ConnectionState.CAN_READ connection.state &= ~ConnectionState.CAN_READ
@ -253,6 +256,19 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
if cancelled: if cancelled:
raise cancelled raise cancelled
async def drain_writers(self):
"""
Drain all writers to create some backpressure. We won't continue reading until there's space available in our
write buffers, so if we cannot write fast enough our own read buffers run full and the TCP recv stream is throttled.
"""
for transport in self.transports.values():
if transport.writer is not None:
try:
await transport.writer.drain()
except OSError as e:
if transport.handler is not None:
asyncio_utils.cancel_task(transport.handler, f"Error sending data: {e}")
async def on_timeout(self) -> None: async def on_timeout(self) -> None:
self.log(f"Closing connection due to inactivity: {self.client}") self.log(f"Closing connection due to inactivity: {self.client}")
handler = self.transports[self.client].handler handler = self.transports[self.client].handler