Fix: [ALAS] Limit pool size when using run_in_executor()

(cherry picked from commit e1bb483aaafa293a14e2faec7c9baadb236e1a01)
This commit is contained in:
LmeSzinc 2024-09-10 00:23:40 +08:00
parent 5a9a1beaa9
commit ce7d971485
2 changed files with 23 additions and 2 deletions

View File

@ -721,7 +721,12 @@ class Connection(ConnectionAttr):
serial_list (list[str]):
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
ev = asyncio.new_event_loop()
pool = ThreadPoolExecutor(
max_workers=len(serial_list),
thread_name_prefix='adb_brute_force_connect',
)
def _connect(serial):
msg = self.adb_client.connect(serial)
@ -729,10 +734,12 @@ class Connection(ConnectionAttr):
return msg
async def connect():
tasks = [ev.run_in_executor(None, _connect, serial) for serial in serial_list]
tasks = [ev.run_in_executor(pool, _connect, serial) for serial in serial_list]
await asyncio.gather(*tasks)
ev.run_until_complete(connect())
pool.shutdown(wait=False)
ev.close()
@Config.when(DEVICE_OVER_HTTP=True)
def adb_connect(self):

View File

@ -275,11 +275,25 @@ class NemuIpcImpl:
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
if has_cached_property(self, '_ev'):
self._ev.close()
del_cached_property(self, '_ev')
if has_cached_property(self, '_pool'):
self._pool.shutdown(wait=False)
del_cached_property(self, '_pool')
@cached_property
def _ev(self):
return asyncio.new_event_loop()
@cached_property
def _pool(self):
from concurrent.futures import ThreadPoolExecutor
return ThreadPoolExecutor(
max_workers=1,
thread_name_prefix='NemuIpc',
)
async def ev_run_async(self, func, *args, timeout=0.15, **kwargs):
"""
Args:
@ -294,7 +308,7 @@ class NemuIpcImpl:
func_wrapped = partial(func, *args, **kwargs)
# Increased timeout for slow PCs
# Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs
result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=timeout)
result = await asyncio.wait_for(self._ev.run_in_executor(self._pool, func_wrapped), timeout=timeout)
return result
def ev_run_sync(self, func, *args, **kwargs):