From ce7d971485d21787019893f5ef09656eedc78efb Mon Sep 17 00:00:00 2001 From: LmeSzinc <37934724+LmeSzinc@users.noreply.github.com> Date: Tue, 10 Sep 2024 00:23:40 +0800 Subject: [PATCH] Fix: [ALAS] Limit pool size when using run_in_executor() (cherry picked from commit e1bb483aaafa293a14e2faec7c9baadb236e1a01) --- module/device/connection.py | 9 ++++++++- module/device/method/nemu_ipc.py | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/module/device/connection.py b/module/device/connection.py index 59c64e9b4..bd88548e4 100644 --- a/module/device/connection.py +++ b/module/device/connection.py @@ -721,7 +721,12 @@ class Connection(ConnectionAttr): serial_list (list[str]): """ import asyncio + from concurrent.futures import ThreadPoolExecutor ev = asyncio.new_event_loop() + pool = ThreadPoolExecutor( + max_workers=len(serial_list), + thread_name_prefix='adb_brute_force_connect', + ) def _connect(serial): msg = self.adb_client.connect(serial) @@ -729,10 +734,12 @@ class Connection(ConnectionAttr): return msg async def connect(): - tasks = [ev.run_in_executor(None, _connect, serial) for serial in serial_list] + tasks = [ev.run_in_executor(pool, _connect, serial) for serial in serial_list] await asyncio.gather(*tasks) ev.run_until_complete(connect()) + pool.shutdown(wait=False) + ev.close() @Config.when(DEVICE_OVER_HTTP=True) def adb_connect(self): diff --git a/module/device/method/nemu_ipc.py b/module/device/method/nemu_ipc.py index cdb159f48..0fde50d51 100644 --- a/module/device/method/nemu_ipc.py +++ b/module/device/method/nemu_ipc.py @@ -275,11 +275,25 @@ class NemuIpcImpl: def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() + if has_cached_property(self, '_ev'): + self._ev.close() + del_cached_property(self, '_ev') + if has_cached_property(self, '_pool'): + self._pool.shutdown(wait=False) + del_cached_property(self, '_pool') @cached_property def _ev(self): return asyncio.new_event_loop() + @cached_property + def _pool(self): + from concurrent.futures import ThreadPoolExecutor + return ThreadPoolExecutor( + max_workers=1, + thread_name_prefix='NemuIpc', + ) + async def ev_run_async(self, func, *args, timeout=0.15, **kwargs): """ Args: @@ -294,7 +308,7 @@ class NemuIpcImpl: func_wrapped = partial(func, *args, **kwargs) # Increased timeout for slow PCs # Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs - result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=timeout) + result = await asyncio.wait_for(self._ev.run_in_executor(self._pool, func_wrapped), timeout=timeout) return result def ev_run_sync(self, func, *args, **kwargs):