Refactor: [ALAS] Minitouch multi-finger support

This commit is contained in:
LmeSzinc 2023-08-21 01:25:21 +08:00
parent bc9642772f
commit 0be028427b
2 changed files with 55 additions and 38 deletions

View File

@ -77,6 +77,19 @@ def retry(func):
return retry_wrapper return retry_wrapper
class MaatouchBuilder(CommandBuilder):
def __init__(self, device, contact=0, handle_orientation=False):
"""
Args:
device (MaaTouch):
"""
super().__init__(device, contact, handle_orientation)
def send(self):
return self.device.maatouch_send(builder=self)
class MaaTouchNotInstalledError(Exception): class MaaTouchNotInstalledError(Exception):
pass pass
@ -94,7 +107,7 @@ class MaaTouch(Connection):
@cached_property @cached_property
def maatouch_builder(self): def maatouch_builder(self):
self.maatouch_init() self.maatouch_init()
return CommandBuilder(self, handle_orientation=False) return MaatouchBuilder(self)
def maatouch_init(self): def maatouch_init(self):
logger.hr('MaaTouch init') logger.hr('MaaTouch init')
@ -165,14 +178,14 @@ class MaaTouch(Connection):
) )
) )
def maatouch_send(self): def maatouch_send(self, builder: MaatouchBuilder):
content = self.maatouch_builder.to_minitouch() content = builder.to_minitouch()
# logger.info("send operation: {}".format(content.replace("\n", "\\n"))) # logger.info("send operation: {}".format(content.replace("\n", "\\n")))
byte_content = content.encode('utf-8') byte_content = content.encode('utf-8')
self._maatouch_stream.sendall(byte_content) self._maatouch_stream.sendall(byte_content)
self._maatouch_stream.recv(0) self._maatouch_stream.recv(0)
self.sleep(self.maatouch_builder.delay / 1000 + self.maatouch_builder.DEFAULT_DELAY) self.sleep(self.maatouch_builder.delay / 1000 + builder.DEFAULT_DELAY)
self.maatouch_builder.clear() builder.clear()
def maatouch_install(self): def maatouch_install(self):
logger.hr('MaaTouch install') logger.hr('MaaTouch install')
@ -187,7 +200,7 @@ class MaaTouch(Connection):
builder = self.maatouch_builder builder = self.maatouch_builder
builder.down(x, y).commit() builder.down(x, y).commit()
builder.up().commit() builder.up().commit()
self.maatouch_send() builder.send()
@retry @retry
def long_click_maatouch(self, x, y, duration=1.0): def long_click_maatouch(self, x, y, duration=1.0):
@ -195,7 +208,7 @@ class MaaTouch(Connection):
builder = self.maatouch_builder builder = self.maatouch_builder
builder.down(x, y).commit().wait(duration) builder.down(x, y).commit().wait(duration)
builder.up().commit() builder.up().commit()
self.maatouch_send() builder.send()
@retry @retry
def swipe_maatouch(self, p1, p2): def swipe_maatouch(self, p1, p2):
@ -203,14 +216,14 @@ class MaaTouch(Connection):
builder = self.maatouch_builder builder = self.maatouch_builder
builder.down(*points[0]).commit() builder.down(*points[0]).commit()
self.maatouch_send() builder.send()
for point in points[1:]: for point in points[1:]:
builder.move(*point).commit().wait(10) builder.move(*point).commit().wait(10)
self.maatouch_send() builder.send()
builder.up().commit() builder.up().commit()
self.maatouch_send() builder.send()
@retry @retry
def drag_maatouch(self, p1, p2, point_random=(-10, -10, 10, 10)): def drag_maatouch(self, p1, p2, point_random=(-10, -10, 10, 10)):
@ -220,15 +233,15 @@ class MaaTouch(Connection):
builder = self.maatouch_builder builder = self.maatouch_builder
builder.down(*points[0]).commit() builder.down(*points[0]).commit()
self.maatouch_send() builder.send()
for point in points[1:]: for point in points[1:]:
builder.move(*point).commit().wait(10) builder.move(*point).commit().wait(10)
self.maatouch_send() builder.send()
builder.move(*p2).commit().wait(140) builder.move(*p2).commit().wait(140)
builder.move(*p2).commit().wait(140) builder.move(*p2).commit().wait(140)
self.maatouch_send() builder.send()
builder.up().commit() builder.up().commit()
self.maatouch_send() builder.send()

View File

@ -184,7 +184,7 @@ class CommandBuilder:
max_x = 1280 max_x = 1280
max_y = 720 max_y = 720
def __init__(self, device, handle_orientation=True): def __init__(self, device, contact=0, handle_orientation=True):
""" """
Args: Args:
device: device:
@ -192,6 +192,7 @@ class CommandBuilder:
self.device = device self.device = device
self.commands = [] self.commands = []
self.delay = 0 self.delay = 0
self.contact = contact
self.handle_orientation = handle_orientation self.handle_orientation = handle_orientation
@property @property
@ -243,21 +244,21 @@ class CommandBuilder:
self.delay += ms self.delay += ms
return self return self
def up(self, contact=0): def up(self):
""" add minitouch command: 'u <contact>\n' """ """ add minitouch command: 'u <contact>\n' """
self.commands.append(Command('u', contact=contact)) self.commands.append(Command('u', contact=self.contact))
return self return self
def down(self, x, y, contact=0, pressure=100): def down(self, x, y, pressure=100):
""" add minitouch command: 'd <contact> <x> <y> <pressure>\n' """ """ add minitouch command: 'd <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y) x, y = self.convert(x, y)
self.commands.append(Command('d', x=x, y=y, contact=contact, pressure=pressure)) self.commands.append(Command('d', x=x, y=y, contact=self.contact, pressure=pressure))
return self return self
def move(self, x, y, contact=0, pressure=100): def move(self, x, y, pressure=100):
""" add minitouch command: 'm <contact> <x> <y> <pressure>\n' """ """ add minitouch command: 'm <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y) x, y = self.convert(x, y)
self.commands.append(Command('m', x=x, y=y, contact=contact, pressure=pressure)) self.commands.append(Command('m', x=x, y=y, contact=self.contact, pressure=pressure))
return self return self
def clear(self): def clear(self):
@ -271,6 +272,9 @@ class CommandBuilder:
def to_atx_agent(self) -> List[str]: def to_atx_agent(self) -> List[str]:
return [command.to_atx_agent(self.max_x, self.max_y) for command in self.commands] return [command.to_atx_agent(self.max_x, self.max_y) for command in self.commands]
def send(self):
return self.device.minitouch_send(builder=self)
class MinitouchNotInstalledError(Exception): class MinitouchNotInstalledError(Exception):
pass pass
@ -446,14 +450,14 @@ class Minitouch(Connection):
) )
@Config.when(DEVICE_OVER_HTTP=False) @Config.when(DEVICE_OVER_HTTP=False)
def minitouch_send(self): def minitouch_send(self, builder: CommandBuilder):
content = self.minitouch_builder.to_minitouch() content = builder.to_minitouch()
# logger.info("send operation: {}".format(content.replace("\n", "\\n"))) # logger.info("send operation: {}".format(content.replace("\n", "\\n")))
byte_content = content.encode('utf-8') byte_content = content.encode('utf-8')
self._minitouch_client.sendall(byte_content) self._minitouch_client.sendall(byte_content)
self._minitouch_client.recv(0) self._minitouch_client.recv(0)
time.sleep(self.minitouch_builder.delay / 1000 + self.minitouch_builder.DEFAULT_DELAY) time.sleep(self.minitouch_builder.delay / 1000 + builder.DEFAULT_DELAY)
self.minitouch_builder.clear() builder.clear()
@cached_property @cached_property
def _minitouch_loop(self): def _minitouch_loop(self):
@ -514,8 +518,8 @@ class Minitouch(Connection):
self._minitouch_ws = self._minitouch_loop_run(connect()) self._minitouch_ws = self._minitouch_loop_run(connect())
@Config.when(DEVICE_OVER_HTTP=True) @Config.when(DEVICE_OVER_HTTP=True)
def minitouch_send(self): def minitouch_send(self, builder: CommandBuilder):
content = self.minitouch_builder.to_atx_agent() content = builder.to_atx_agent()
async def send(): async def send():
for row in content: for row in content:
@ -523,15 +527,15 @@ class Minitouch(Connection):
await self._minitouch_ws.send(row) await self._minitouch_ws.send(row)
self._minitouch_loop_run(send()) self._minitouch_loop_run(send())
time.sleep(self.minitouch_builder.delay / 1000 + self.minitouch_builder.DEFAULT_DELAY) time.sleep(builder.delay / 1000 + builder.DEFAULT_DELAY)
self.minitouch_builder.clear() builder.clear()
@retry @retry
def click_minitouch(self, x, y): def click_minitouch(self, x, y):
builder = self.minitouch_builder builder = self.minitouch_builder
builder.down(x, y).commit() builder.down(x, y).commit()
builder.up().commit() builder.up().commit()
self.minitouch_send() builder.send()
@retry @retry
def long_click_minitouch(self, x, y, duration=1.0): def long_click_minitouch(self, x, y, duration=1.0):
@ -539,7 +543,7 @@ class Minitouch(Connection):
builder = self.minitouch_builder builder = self.minitouch_builder
builder.down(x, y).commit().wait(duration) builder.down(x, y).commit().wait(duration)
builder.up().commit() builder.up().commit()
self.minitouch_send() builder.send()
@retry @retry
def swipe_minitouch(self, p1, p2): def swipe_minitouch(self, p1, p2):
@ -547,14 +551,14 @@ class Minitouch(Connection):
builder = self.minitouch_builder builder = self.minitouch_builder
builder.down(*points[0]).commit() builder.down(*points[0]).commit()
self.minitouch_send() builder.send()
for point in points[1:]: for point in points[1:]:
builder.move(*point).commit().wait(10) builder.move(*point).commit().wait(10)
self.minitouch_send() builder.send()
builder.up().commit() builder.up().commit()
self.minitouch_send() builder.send()
@retry @retry
def drag_minitouch(self, p1, p2, point_random=(-10, -10, 10, 10)): def drag_minitouch(self, p1, p2, point_random=(-10, -10, 10, 10)):
@ -564,15 +568,15 @@ class Minitouch(Connection):
builder = self.minitouch_builder builder = self.minitouch_builder
builder.down(*points[0]).commit() builder.down(*points[0]).commit()
self.minitouch_send() builder.send()
for point in points[1:]: for point in points[1:]:
builder.move(*point).commit().wait(10) builder.move(*point).commit().wait(10)
self.minitouch_send() builder.send()
builder.move(*p2).commit().wait(140) builder.move(*p2).commit().wait(140)
builder.move(*p2).commit().wait(140) builder.move(*p2).commit().wait(140)
self.minitouch_send() builder.send()
builder.up().commit() builder.up().commit()
self.minitouch_send() builder.send()