This should fix the deadlock happening on callbacks using the same pool

Related to #663
This commit is contained in:
Dan 2021-04-07 13:57:59 +02:00
parent 0b0bec9e27
commit 30d3658e19
6 changed files with 16 additions and 12 deletions

View File

@ -229,7 +229,9 @@ class Client(Methods, Scaffold):
self.sleep_threshold = sleep_threshold self.sleep_threshold = sleep_threshold
self.hide_password = hide_password self.hide_password = hide_password
self.executor = ThreadPoolExecutor(self.workers, thread_name_prefix="Handler") self.handler_executor = ThreadPoolExecutor(self.workers, thread_name_prefix="Handler")
self.filter_executor = ThreadPoolExecutor(2, thread_name_prefix="Filter")
self.progress_executor = ThreadPoolExecutor(2, thread_name_prefix="Progress")
if isinstance(session_name, str): if isinstance(session_name, str):
if session_name == ":memory:" or len(session_name) >= MemoryStorage.SESSION_STRING_SIZE: if session_name == ":memory:" or len(session_name) >= MemoryStorage.SESSION_STRING_SIZE:
@ -934,7 +936,7 @@ class Client(Methods, Scaffold):
if inspect.iscoroutinefunction(progress): if inspect.iscoroutinefunction(progress):
await func() await func()
else: else:
await self.loop.run_in_executor(self.executor, func) await self.loop.run_in_executor(self.progress_executor, func)
r = await session.send( r = await session.send(
raw.functions.upload.GetFile( raw.functions.upload.GetFile(
@ -1024,7 +1026,7 @@ class Client(Methods, Scaffold):
if inspect.iscoroutinefunction(progress): if inspect.iscoroutinefunction(progress):
await func() await func()
else: else:
await self.loop.run_in_executor(self.executor, func) await self.loop.run_in_executor(self.progress_executor, func)
if len(chunk) < limit: if len(chunk) < limit:
break break

View File

@ -217,7 +217,7 @@ class Dispatcher:
await handler.callback(self.client, *args) await handler.callback(self.client, *args)
else: else:
await self.loop.run_in_executor( await self.loop.run_in_executor(
self.client.executor, self.client.handler_executor,
handler.callback, handler.callback,
self.client, self.client,
*args *args

View File

@ -47,7 +47,7 @@ class InvertFilter(Filter):
x = await self.base(client, update) x = await self.base(client, update)
else: else:
x = await client.loop.run_in_executor( x = await client.loop.run_in_executor(
client.executor, client.filter_executor,
self.base, self.base,
client, update client, update
) )
@ -65,7 +65,7 @@ class AndFilter(Filter):
x = await self.base(client, update) x = await self.base(client, update)
else: else:
x = await client.loop.run_in_executor( x = await client.loop.run_in_executor(
client.executor, client.filter_executor,
self.base, self.base,
client, update client, update
) )
@ -78,7 +78,7 @@ class AndFilter(Filter):
y = await self.other(client, update) y = await self.other(client, update)
else: else:
y = await client.loop.run_in_executor( y = await client.loop.run_in_executor(
client.executor, client.filter_executor,
self.other, self.other,
client, update client, update
) )
@ -96,7 +96,7 @@ class OrFilter(Filter):
x = await self.base(client, update) x = await self.base(client, update)
else: else:
x = await client.loop.run_in_executor( x = await client.loop.run_in_executor(
client.executor, client.filter_executor,
self.base, self.base,
client, update client, update
) )
@ -109,7 +109,7 @@ class OrFilter(Filter):
y = await self.other(client, update) y = await self.other(client, update)
else: else:
y = await client.loop.run_in_executor( y = await client.loop.run_in_executor(
client.executor, client.filter_executor,
self.other, self.other,
client, update client, update
) )

View File

@ -35,7 +35,7 @@ class Handler:
return await self.filters(client, update) return await self.filters(client, update)
else: else:
return await client.loop.run_in_executor( return await client.loop.run_in_executor(
client.executor, client.handler_executor,
self.filters, self.filters,
client, update client, update
) )

View File

@ -193,7 +193,7 @@ class SaveFile(Scaffold):
if inspect.iscoroutinefunction(progress): if inspect.iscoroutinefunction(progress):
await func() await func()
else: else:
await self.loop.run_in_executor(self.executor, func) await self.loop.run_in_executor(self.progress_executor, func)
except StopTransmission: except StopTransmission:
raise raise
except Exception as e: except Exception as e:

View File

@ -83,7 +83,9 @@ class Scaffold:
self.takeout = None self.takeout = None
self.sleep_threshold = None self.sleep_threshold = None
self.executor = None self.handler_executor = None
self.filter_executor = None
self.progress_executor = None
self.storage = None self.storage = None