Merge pull request #417 from LmeSzinc/dev

Dev
This commit is contained in:
LmeSzinc 2024-04-15 00:43:12 +08:00 committed by GitHub
commit 9849267185
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1100 additions and 117 deletions

View File

@ -199,7 +199,9 @@
"DomainStrategy": "combat",
"UseImmersifier": true,
"DoubleEvent": true,
"UseStamina": false
"WeeklyFarming": false,
"UseStamina": false,
"SimulatedUniverseElite": {}
},
"RogueBlessing": {
"PresetBlessingFilter": "preset",

View File

@ -1,5 +1,3 @@
from concurrent.futures import ThreadPoolExecutor
import module.config.server as server_
from module.base.button import Button, ButtonWrapper, ClickButton, match_template
from module.base.timer import Timer
@ -50,11 +48,22 @@ class ModuleBase:
self.interval_timer = {}
@cached_class_property
def worker(self) -> ThreadPoolExecutor:
def worker(self):
"""
A thread pool to run things at background
Examples:
```
def func(image):
logger.info('Update thread start')
with self.config.multi_set():
self.dungeon_get_simuni_point(image)
self.dungeon_update_stamina(image)
ModuleBase.worker.submit(func, self.device.image)
```
"""
logger.hr('Creating worker')
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(1)
return pool

View File

@ -48,7 +48,8 @@
"aScreenCap_nc",
"DroidCast",
"DroidCast_raw",
"scrcpy"
"scrcpy",
"nemu_ipc"
],
"display": "hide"
},
@ -1466,9 +1467,19 @@
"type": "checkbox",
"value": true
},
"WeeklyFarming": {
"type": "checkbox",
"value": false
},
"UseStamina": {
"type": "checkbox",
"value": false
},
"SimulatedUniverseElite": {
"type": "stored",
"value": {},
"display": "hide",
"stored": "StoredSimulatedUniverseElite"
}
},
"RogueBlessing": {

View File

@ -29,7 +29,18 @@ Emulator:
option: [ auto, cn, en ]
ScreenshotMethod:
value: auto
option: [ auto, ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, DroidCast_raw, scrcpy ]
option: [
auto,
ADB,
ADB_nc,
uiautomator2,
aScreenCap,
aScreenCap_nc,
DroidCast,
DroidCast_raw,
scrcpy,
nemu_ipc,
]
ControlMethod:
value: MaaTouch
option: [ minitouch, MaaTouch ]
@ -246,7 +257,10 @@ RogueWorld:
option: [ combat, occurrence ]
UseImmersifier: true
DoubleEvent: true
WeeklyFarming: false
UseStamina: false
SimulatedUniverseElite:
stored: StoredSimulatedUniverseElite
RogueBlessing:
PresetBlessingFilter:

View File

@ -293,5 +293,18 @@
},
"order": 0,
"color": "#777777"
},
"SimulatedUniverseElite": {
"name": "SimulatedUniverseElite",
"path": "Rogue.RogueWorld.SimulatedUniverseElite",
"i18n": "RogueWorld.SimulatedUniverseElite.name",
"stored": "StoredSimulatedUniverseElite",
"attrs": {
"time": "2020-01-01 00:00:00",
"total": 0,
"value": 100
},
"order": 0,
"color": "#777777"
}
}

View File

@ -20,7 +20,7 @@ class GeneratedConfig:
Emulator_GameClient = 'android' # android, cloud_android
Emulator_PackageName = 'auto' # auto, CN-Official, CN-Bilibili, OVERSEA-America, OVERSEA-Asia, OVERSEA-Europe, OVERSEA-TWHKMO
Emulator_GameLanguage = 'auto' # auto, cn, en
Emulator_ScreenshotMethod = 'auto' # auto, ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, DroidCast_raw, scrcpy
Emulator_ScreenshotMethod = 'auto' # auto, ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, DroidCast_raw, scrcpy, nemu_ipc
Emulator_ControlMethod = 'MaaTouch' # minitouch, MaaTouch
Emulator_AdbRestart = False
@ -138,7 +138,9 @@ class GeneratedConfig:
RogueWorld_DomainStrategy = 'combat' # combat, occurrence
RogueWorld_UseImmersifier = True
RogueWorld_DoubleEvent = True
RogueWorld_WeeklyFarming = False
RogueWorld_UseStamina = False
RogueWorld_SimulatedUniverseElite = {}
# Group `RogueBlessing`
RogueBlessing_PresetBlessingFilter = 'preset' # preset, custom

View File

@ -131,7 +131,8 @@
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
"scrcpy": "scrcpy",
"nemu_ipc": "nemu_ipc"
},
"ControlMethod": {
"name": "Control Method",
@ -969,9 +970,17 @@
"name": "Participate in Double Planer Event",
"help": ""
},
"WeeklyFarming": {
"name": "Farm 100 Elites Weekly",
"help": ""
},
"UseStamina": {
"name": "Farm Planers Using Trailblase Power",
"help": "Task \"Dungeon\" will no longer run, and all trailblaze power will be used first to claim immersion rewards, except for double events."
},
"SimulatedUniverseElite": {
"name": "RogueWorld.SimulatedUniverseElite.name",
"help": "RogueWorld.SimulatedUniverseElite.help"
}
},
"RogueBlessing": {

View File

@ -131,7 +131,8 @@
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
"scrcpy": "scrcpy",
"nemu_ipc": "nemu_ipc"
},
"ControlMethod": {
"name": "Método de control",
@ -969,9 +970,17 @@
"name": "Participa en doble planer evento",
"help": ""
},
"WeeklyFarming": {
"name": "Granja 100 élites semanalmente",
"help": ""
},
"UseStamina": {
"name": "Reclamar de planers mediante poder trazacaminos",
"help": "La tarea de mazmorra ya no se ejecutará y todo el poder trazacaminos se usará primero para reclamar recompensas de inmersión, excepto para eventos dobles"
},
"SimulatedUniverseElite": {
"name": "RogueWorld.SimulatedUniverseElite.name",
"help": "RogueWorld.SimulatedUniverseElite.help"
}
},
"RogueBlessing": {

View File

@ -131,7 +131,8 @@
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
"scrcpy": "scrcpy",
"nemu_ipc": "nemu_ipc"
},
"ControlMethod": {
"name": "Emulator.ControlMethod.name",
@ -969,9 +970,17 @@
"name": "RogueWorld.DoubleEvent.name",
"help": "RogueWorld.DoubleEvent.help"
},
"WeeklyFarming": {
"name": "RogueWorld.WeeklyFarming.name",
"help": "RogueWorld.WeeklyFarming.help"
},
"UseStamina": {
"name": "RogueWorld.UseStamina.name",
"help": "RogueWorld.UseStamina.help"
},
"SimulatedUniverseElite": {
"name": "RogueWorld.SimulatedUniverseElite.name",
"help": "RogueWorld.SimulatedUniverseElite.help"
}
},
"RogueBlessing": {

View File

@ -131,7 +131,8 @@
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
"scrcpy": "scrcpy",
"nemu_ipc": "nemu_ipc"
},
"ControlMethod": {
"name": "模拟器控制方案",
@ -969,9 +970,17 @@
"name": "参与双倍内圈仪器活动",
"help": ""
},
"WeeklyFarming": {
"name": "每周刷100精英怪",
"help": ""
},
"UseStamina": {
"name": "使用开拓力刷内圈遗器",
"help": "每日副本任务将不再打本,所有开拓力将优先被用于领取浸器奖励,双倍活动时除外"
},
"SimulatedUniverseElite": {
"name": "剩余Boss材料掉落次数",
"help": "RogueWorld.SimulatedUniverseElite.help"
}
},
"RogueBlessing": {

View File

@ -131,7 +131,8 @@
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
"scrcpy": "scrcpy",
"nemu_ipc": "nemu_ipc"
},
"ControlMethod": {
"name": "模擬器控制方案",
@ -969,9 +970,17 @@
"name": "參與雙倍內圈儀器活動",
"help": ""
},
"WeeklyFarming": {
"name": "每週農100精英怪",
"help": ""
},
"UseStamina": {
"name": "用開拓力農遺器",
"help": "每日副本任務將不再打本,所有開拓力將優先被用於領取浸器獎勵,雙倍活動時除外"
},
"SimulatedUniverseElite": {
"name": "RogueWorld.SimulatedUniverseElite.name",
"help": "RogueWorld.SimulatedUniverseElite.help"
}
},
"RogueBlessing": {

View File

@ -208,6 +208,30 @@ class StoredSimulatedUniverse(StoredCounter, StoredExpiredAtMonday0400):
pass
class StoredSimulatedUniverseElite(StoredCounter, StoredExpiredAtMonday0400):
# These variables are used in Rogue Farming feature.
# Times of boss drop chance per week. In current version of StarRail, this value is 100.
FIXED_DEFAULT = 100
# Times left to farm. Resets to 100 every Monday 04:00, and decreases each time the elite boss is cleared.
value = FIXED_DEFAULT
def farm_dec(self, delta=1):
self.value -= delta
if self.value < 0:
self.value = 0
def farm_reset(self):
self.value = self.FIXED_DEFAULT
def farm_not_complete(self) -> bool:
return self.value > 0
def farm_get_remain(self) -> int:
return self.value
class StoredAssignment(StoredCounter):
pass

View File

@ -20,6 +20,7 @@ from module.config.stored.classes import (
StoredImmersifier,
StoredInt,
StoredSimulatedUniverse,
StoredSimulatedUniverseElite,
StoredTrailblazePower,
)
@ -50,3 +51,4 @@ class StoredGenerated:
Assignment = StoredAssignment("Assignment.Assignment.Assignment")
Credit = StoredInt("DataUpdate.ItemStorage.Credit")
StallerJade = StoredInt("DataUpdate.ItemStorage.StallerJade")
SimulatedUniverseElite = StoredSimulatedUniverseElite("Rogue.RogueWorld.SimulatedUniverseElite")

View File

@ -1,9 +1,9 @@
import ipaddress
import logging
import platform
import re
import socket
import subprocess
import sys
import time
from functools import wraps
@ -12,7 +12,8 @@ from adbutils import AdbClient, AdbDevice, AdbTimeout, ForwardItem, ReverseItem
from adbutils.errors import AdbError
import module.config.server as server_
from module.base.decorator import Config, cached_property, del_cached_property
import platform
from module.base.decorator import Config, cached_property, del_cached_property, run_once
from module.base.utils import SelectedGrids, ensure_time
from module.device.connection_attr import ConnectionAttr
from module.device.method.utils import (
@ -84,10 +85,17 @@ class AdbDeviceWithStatus(AdbDevice):
def __bool__(self):
return True
@cached_property
def port(self) -> int:
try:
return int(self.serial.split(':')[1])
except (IndexError, ValueError):
return 0
@cached_property
def may_mumu12_family(self):
# 127.0.0.1:16XXX
return len(self.serial) == 15 and self.serial.startswith('127.0.0.1:16')
return 16384 <= self.port <= 17408
class Connection(ConnectionAttr):
@ -276,6 +284,7 @@ class Connection(ConnectionAttr):
@cached_property
def nemud_app_keep_alive(self) -> str:
res = self.adb_getprop('nemud.app_keep_alive')
logger.attr('nemud.app_keep_alive', res)
return res
@retry
@ -284,7 +293,6 @@ class Connection(ConnectionAttr):
return False
res = self.nemud_app_keep_alive
logger.attr('nemud.app_keep_alive', res)
if res == '':
# Empty property, probably MuMu6 or MuMu12 version < 3.5.6
return True
@ -299,6 +307,15 @@ class Connection(ConnectionAttr):
logger.warning(f'Invalid nemud.app_keep_alive value: {res}')
return False
@cached_property
def is_mumu_over_version_356(self) -> bool:
"""
Returns:
bool: If MuMu12 version >= 3.5.6,
which has nemud.app_keep_alive and always be a vertical device
"""
return self.nemud_app_keep_alive != ''
@cached_property
def _nc_server_host_port(self):
"""
@ -549,14 +566,14 @@ class Connection(ConnectionAttr):
# Disconnect offline device before connecting
for device in self.list_device():
if device.status == 'offline':
logger.warning(f'Device {serial} is offline, disconnect it before connecting')
self.adb_disconnect(serial)
logger.warning(f'Device {device.serial} is offline, disconnect it before connecting')
self.adb_disconnect(device.serial)
elif device.status == 'unauthorized':
logger.error(f'Device {serial} is unauthorized, please accept ADB debugging on your device')
logger.error(f'Device {device.serial} is unauthorized, please accept ADB debugging on your device')
elif device.status == 'device':
pass
else:
logger.warning(f'Device {serial} is is having a unknown status: {device.status}')
logger.warning(f'Device {device.serial} is is having a unknown status: {device.status}')
# Skip for emulator-5554
if 'emulator-' in serial:
@ -764,23 +781,45 @@ class Connection(ConnectionAttr):
If serial=='auto' and only 1 device detected, use it
"""
logger.hr('Detect device')
logger.info('Here are the available devices, '
'copy to Alas.Emulator.Serial to use it or set Alas.Emulator.Serial="auto"')
devices = self.list_device()
available = SelectedGrids([])
devices = SelectedGrids([])
# Show available devices
available = devices.select(status='device')
for device in available:
logger.info(device.serial)
if not len(available):
logger.info('No available devices')
@run_once
def brute_force_connect():
logger.info('Brute force connect')
from deploy.Windows.emulator import EmulatorManager
manager = EmulatorManager()
manager.brute_force_connect()
# Show unavailable devices if having any
unavailable = devices.delete(available)
if len(unavailable):
logger.info('Here are the devices detected but unavailable')
for device in unavailable:
logger.info(f'{device.serial} ({device.status})')
for _ in range(2):
logger.info('Here are the available devices, '
'copy to Alas.Emulator.Serial to use it or set Alas.Emulator.Serial="auto"')
devices = self.list_device()
# Show available devices
available = devices.select(status='device')
for device in available:
logger.info(device.serial)
if not len(available):
logger.info('No available devices')
# Show unavailable devices if having any
unavailable = devices.delete(available)
if len(unavailable):
logger.info('Here are the devices detected but unavailable')
for device in unavailable:
logger.info(f'{device.serial} ({device.status})')
# brute_force_connect
if self.config.Emulator_Serial == 'auto' and available.count == 0:
logger.warning(f'No available device found')
if sys.platform == 'win32':
brute_force_connect()
continue
else:
break
else:
break
# Auto device detection
if self.config.Emulator_Serial == 'auto':
@ -790,7 +829,7 @@ class Connection(ConnectionAttr):
raise RequestHumanTakeover
elif available.count == 1:
logger.info(f'Auto device detection found only one device, using it')
self.serial = available[0].serial
self.config.Emulator_Serial = self.serial = available[0].serial
del_cached_property(self, 'adb')
elif available.count == 2 \
and available.select(serial='127.0.0.1:7555') \
@ -799,7 +838,7 @@ class Connection(ConnectionAttr):
# For MuMu12 serials like 127.0.0.1:7555 and 127.0.0.1:16384
# ignore 7555 use 16384
remain = available.select(may_mumu12_family=True).first_or_none()
self.serial = remain.serial
self.config.Emulator_Serial = self.serial = remain.serial
del_cached_property(self, 'adb')
else:
logger.critical('Multiple devices found, auto device detection cannot decide which to choose, '
@ -808,6 +847,7 @@ class Connection(ConnectionAttr):
# Handle LDPlayer
# LDPlayer serial jumps between `127.0.0.1:5555+{X}` and `emulator-5554+{X}`
# No config write since it's dynamic
port_serial, emu_serial = get_serial_pair(self.serial)
if port_serial and emu_serial:
# Might be LDPlayer, check connected devices
@ -834,6 +874,57 @@ class Connection(ConnectionAttr):
f'Using serial: {emu_serial}')
self.serial = emu_serial
# Redirect MuMu12 from 127.0.0.1:7555 to 127.0.0.1:16xxx
if self.serial == '127.0.0.1:7555':
for _ in range(2):
mumu12 = available.select(may_mumu12_family=True)
if mumu12.count == 1:
emu_serial = mumu12.first_or_none().serial
logger.warning(f'Redirect MuMu12 {self.serial} to {emu_serial}')
self.config.Emulator_Serial = self.serial = emu_serial
break
elif mumu12.count >= 2:
logger.warning(f'Multiple MuMu12 serial found, cannot redirect')
break
else:
# Only 127.0.0.1:7555
if self.is_mumu_over_version_356:
# is_mumu_over_version_356 and nemud_app_keep_alive was cached
# Acceptable since it's the same device
logger.warning(f'Device {self.serial} is MuMu12 but corresponding port not found')
brute_force_connect()
devices = self.list_device()
# Show available devices
available = devices.select(status='device')
for device in available:
logger.info(device.serial)
if not len(available):
logger.info('No available devices')
continue
else:
# MuMu6
break
# MuMu12 uses 127.0.0.1:16385 if port 16384 is occupied, auto redirect
# No config write since it's dynamic
if self.is_mumu12_family:
matched = False
for device in available.select(may_mumu12_family=True):
if device.port == self.port:
# Exact match
matched = True
break
if not matched:
for device in available.select(may_mumu12_family=True):
if -2 <= device.port - self.port <= 2:
# Port switched
logger.info(f'MuMu12 port switches from {self.serial} to {device.serial}')
del_cached_property(self, 'port')
del_cached_property(self, 'is_mumu12_family')
del_cached_property(self, 'is_mumu_family')
self.serial = device.serial
break
@retry
def list_package(self, show_log=True):
"""

View File

@ -7,7 +7,6 @@ from adbutils import AdbClient, AdbDevice
from module.base.decorator import cached_property
from module.config.config import AzurLaneConfig
from module.config.utils import deep_iter
from module.exception import RequestHumanTakeover
from module.logger import logger
@ -49,7 +48,6 @@ class ConnectionAttr:
self.serial_check()
self.config.DEVICE_OVER_HTTP = self.is_over_http
@staticmethod
def revise_serial(serial):
serial = serial.replace(' ', '')
@ -123,6 +121,18 @@ class ConnectionAttr:
def is_wsa(self):
return bool(re.match(r'^wsa', self.serial))
@cached_property
def port(self) -> int:
try:
return int(self.serial.split(':')[1])
except (IndexError, ValueError):
return 0
@cached_property
def is_mumu12_family(self):
# 127.0.0.1:16XXX
return 16384 <= self.port <= 17408
@cached_property
def is_mumu_family(self):
# 127.0.0.1:7555
@ -130,9 +140,8 @@ class ConnectionAttr:
return self.serial == '127.0.0.1:7555' or self.is_mumu12_family
@cached_property
def is_mumu12_family(self):
# 127.0.0.1:16384 + 32*n
return len(self.serial) == 15 and self.serial.startswith('127.0.0.1:16')
def is_nox_family(self):
return 62001 <= self.port <= 63025
@cached_property
def is_emulator(self):
@ -178,7 +187,8 @@ class ConnectionAttr:
rf"SOFTWARE\BlueStacks_bgp64_hyperv\Guests\{folder_name}\Config") as key:
port = QueryValueEx(key, "BstAdbPort")[0]
except FileNotFoundError:
logger.error(rf'Unable to find registry HKEY_LOCAL_MACHINE\SOFTWARE\BlueStacks_bgp64_hyperv\Guests\{folder_name}\Config')
logger.error(
rf'Unable to find registry HKEY_LOCAL_MACHINE\SOFTWARE\BlueStacks_bgp64_hyperv\Guests\{folder_name}\Config')
logger.error('Please confirm that your are using BlueStack 4 hyper-v and not regular BlueStacks 4')
logger.error(r'Please check if there is any other emulator instances under '
r'registry HKEY_LOCAL_MACHINE\SOFTWARE\BlueStacks_bgp64_hyperv\Guests')

View File

@ -5,11 +5,12 @@ from module.base.utils import *
from module.device.method.hermit import Hermit
from module.device.method.maatouch import MaaTouch
from module.device.method.minitouch import Minitouch
from module.device.method.nemu_ipc import NemuIpc
from module.device.method.scrcpy import Scrcpy
from module.logger import logger
class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
class Control(Hermit, Minitouch, Scrcpy, MaaTouch, NemuIpc):
def handle_control_check(self, button):
# Will be overridden in Device
pass
@ -22,6 +23,7 @@ class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
'minitouch': self.click_minitouch,
'Hermit': self.click_hermit,
'MaaTouch': self.click_maatouch,
'nemu_ipc': self.click_nemu_ipc,
}
def click(self, button, control_check=True):
@ -78,6 +80,8 @@ class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
self.long_click_scrcpy(x, y, duration)
elif method == 'MaaTouch':
self.long_click_maatouch(x, y, duration)
elif method == 'nemu_ipc':
self.long_click_nemu_ipc(x, y, duration)
else:
self.swipe_adb((x, y), (x, y), duration)
@ -86,13 +90,9 @@ class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
p1, p2 = ensure_int(p1, p2)
duration = ensure_time(duration)
method = self.config.Emulator_ControlMethod
if method == 'minitouch':
logger.info('Swipe %s -> %s' % (point2str(*p1), point2str(*p2)))
elif method == 'uiautomator2':
if method == 'uiautomator2':
logger.info('Swipe %s -> %s, %s' % (point2str(*p1), point2str(*p2), duration))
elif method == 'scrcpy':
logger.info('Swipe %s -> %s' % (point2str(*p1), point2str(*p2)))
elif method == 'MaaTouch':
elif method in ['minitouch', 'MaaTouch', 'scrcpy', 'nemu_ipc']:
logger.info('Swipe %s -> %s' % (point2str(*p1), point2str(*p2)))
else:
# ADB needs to be slow, or swipe doesn't work
@ -114,6 +114,8 @@ class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
self.swipe_scrcpy(p1, p2)
elif method == 'MaaTouch':
self.swipe_maatouch(p1, p2)
elif method == 'nemu_ipc':
self.swipe_nemu_ipc(p1, p2)
else:
self.swipe_adb(p1, p2, duration=duration)
@ -163,6 +165,8 @@ class Control(Hermit, Minitouch, Scrcpy, MaaTouch):
self.drag_scrcpy(p1, p2, point_random=point_random)
elif method == 'MaaTouch':
self.drag_maatouch(p1, p2, point_random=point_random)
elif method == 'nemu_ipc':
self.drag_nemu_ipc(p1, p2, point_random=point_random)
else:
logger.warning(f'Control method {method} does not support drag well, '
f'falling back to ADB swipe may cause unexpected behaviour')

View File

@ -4,7 +4,6 @@ import itertools
from module.base.timer import Timer
from module.device.app_control import AppControl
from module.device.control import Control
from module.device.platform import Platform
from module.device.screenshot import Screenshot
from module.exception import (
EmulatorNotRunningError,
@ -56,7 +55,7 @@ def show_function_call():
logger.info('Function calls:' + ''.join(func_list))
class Device(Screenshot, Control, AppControl, Platform):
class Device(Screenshot, Control, AppControl):
_screen_size_checked = False
detect_record = set()
click_record = collections.deque(maxlen=30)
@ -83,11 +82,18 @@ class Device(Screenshot, Control, AppControl, Platform):
_ = self.emulator_instance
self.screenshot_interval_set()
self.method_check()
# Auto-select the fastest screenshot method
if not self.config.is_template_config and self.config.Emulator_ScreenshotMethod == 'auto':
self.run_simple_screenshot_benchmark()
# SRC only, use nemu_ipc if available
available = self.nemu_ipc_available()
logger.attr('nemu_ipc_available', available)
if available:
self.config.override(Emulator_ScreenshotMethod='nemu_ipc')
def run_simple_screenshot_benchmark(self):
"""
Perform a screenshot method benchmark, test 3 times on each method.
@ -101,7 +107,23 @@ class Device(Screenshot, Control, AppControl, Platform):
bench = Benchmark(config=self.config, device=self)
method = bench.run_simple_screenshot_benchmark()
# Set
self.config.Emulator_ScreenshotMethod = method
with self.config.multi_set():
self.config.Emulator_ScreenshotMethod = method
# if method == 'nemu_ipc':
# self.config.Emulator_ControlMethod = 'nemu_ipc'
def method_check(self):
"""
Check combinations of screenshot method and control methods
"""
# nemu_ipc should be together
# if self.config.Emulator_ScreenshotMethod == 'nemu_ipc' and self.config.Emulator_ControlMethod != 'nemu_ipc':
# logger.warning('When using nemu_ipc, both screenshot and control should use nemu_ipc')
# self.config.Emulator_ControlMethod = 'nemu_ipc'
# if self.config.Emulator_ScreenshotMethod != 'nemu_ipc' and self.config.Emulator_ControlMethod == 'nemu_ipc':
# logger.warning('When not using nemu_ipc, both screenshot and control should not use nemu_ipc')
# self.config.Emulator_ControlMethod = 'minitouch'
pass
def screenshot(self):
"""
@ -127,6 +149,8 @@ class Device(Screenshot, Control, AppControl, Platform):
# stop it during wait
if self.config.Emulator_ScreenshotMethod == 'scrcpy':
self._scrcpy_server_stop()
if self.config.Emulator_ScreenshotMethod == 'nemu_ipc':
self.nemu_ipc_release()
def stuck_record_add(self, button):
self.detect_record.add(str(button))

View File

@ -128,7 +128,7 @@ class Adb(Connection):
if image is None:
raise ImageTruncated('Empty image after cv2.imdecode')
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')

View File

@ -95,6 +95,8 @@ class DroidCast(Uiautomator2):
"""
_droidcast_port: int = 0
droidcast_width: int = 0
droidcast_height: int = 0
@cached_property
def droidcast_session(self):
@ -112,15 +114,37 @@ class DroidCast(Uiautomator2):
- /preview
To get PNG screenshots.
"""
def droidcast_url(self, url='/preview'):
if self.is_mumu_over_version_356:
w, h = self.droidcast_width, self.droidcast_height
if self.orientation == 0:
return f'http://127.0.0.1:{self._droidcast_port}{url}?width={w}&height={h}'
elif self.orientation == 1:
return f'http://127.0.0.1:{self._droidcast_port}{url}?width={h}&height={w}'
else:
# logger.warning('DroidCast receives invalid device orientation')
pass
return f'http://127.0.0.1:{self._droidcast_port}{url}'
def droidcast_raw_url(self, url='/screenshot'):
if self.is_mumu_over_version_356:
w, h = self.droidcast_width, self.droidcast_height
if self.orientation == 0:
return f'http://127.0.0.1:{self._droidcast_port}{url}?width={w}&height={h}'
elif self.orientation == 1:
return f'http://127.0.0.1:{self._droidcast_port}{url}?width={h}&height={w}'
else:
# logger.warning('DroidCast receives invalid device orientation')
pass
return f'http://127.0.0.1:{self._droidcast_port}{url}'
def droidcast_init(self):
logger.hr('DroidCast init')
self.droidcast_stop()
self._droidcast_update_resolution()
logger.info('Pushing DroidCast apk')
self.adb_push(self.config.DROIDCAST_FILEPATH_LOCAL, self.config.DROIDCAST_FILEPATH_REMOTE)
@ -150,10 +174,25 @@ class DroidCast(Uiautomator2):
else:
logger.error(f'Unknown DROIDCAST_VERSION: {self.config.DROIDCAST_VERSION}')
def _droidcast_update_resolution(self):
if self.is_mumu_over_version_356:
logger.info('Update droidcast resolution')
w, h = self.resolution_uiautomator2(cal_rotation=False)
self.get_orientation()
# 720, 1280
# mumu12 > 3.5.6 is always a vertical device
self.droidcast_width, self.droidcast_height = w, h
logger.info(f'Droicast resolution: {(w, h)}')
@retry
def screenshot_droidcast(self):
self.config.DROIDCAST_VERSION = 'DroidCast'
if self.is_mumu_over_version_356:
if not self.droidcast_width or not self.droidcast_height:
self._droidcast_update_resolution()
resp = self.droidcast_session.get(self.droidcast_url(), timeout=3)
if resp.status_code == 404:
raise DroidCastVersionIncompatible('DroidCast server does not have /preview')
image = resp.content
@ -173,16 +212,27 @@ class DroidCast(Uiautomator2):
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')
if self.is_mumu_over_version_356:
if self.orientation == 1:
image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
return image
@retry
def screenshot_droidcast_raw(self):
self.config.DROIDCAST_VERSION = 'DroidCast_raw'
shape = (720, 1280)
if self.is_mumu_over_version_356:
if not self.droidcast_width or not self.droidcast_height:
self._droidcast_update_resolution()
if self.droidcast_height and self.droidcast_width:
shape = (self.droidcast_height, self.droidcast_width)
image = self.droidcast_session.get(self.droidcast_raw_url(), timeout=3).content
# DroidCast_raw returns a RGB565 bitmap
try:
arr = np.frombuffer(image, dtype=np.uint16).reshape((720, 1280))
arr = np.frombuffer(image, dtype=np.uint16).reshape(shape)
except ValueError as e:
if len(image) < 500:
logger.warning(f'Unexpected screenshot: {image}')
@ -230,6 +280,11 @@ class DroidCast(Uiautomator2):
cv2.add(b, m, dst=b)
image = cv2.merge([r, g, b])
if self.is_mumu_over_version_356:
if self.orientation == 1:
image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
return image
def droidcast_wait_startup(self):

View File

@ -0,0 +1,541 @@
import asyncio
import ctypes
import os
import sys
from functools import partial, wraps
import cv2
import numpy as np
from module.base.decorator import cached_property, del_cached_property, has_cached_property
from module.base.utils import ensure_time
from module.device.method.minitouch import insert_swipe, random_rectangle_point
from module.device.method.utils import RETRY_TRIES, retry_sleep
from module.device.platform import Platform
from module.exception import RequestHumanTakeover
from module.logger import logger
class NemuIpcIncompatible(Exception):
pass
class NemuIpcError(Exception):
pass
class CaptureStd:
"""
Capture stdout and stderr from both python and C library
https://stackoverflow.com/questions/5081657/how-do-i-prevent-a-c-shared-library-to-print-on-stdout-in-python/17954769
```
with CaptureStd() as capture:
# String wasn't printed
print('whatever')
# But captured in ``capture.stdout``
print(f'Got stdout: "{capture.stdout}"')
print(f'Got stderr: "{capture.stderr}"')
```
"""
def __init__(self):
self.stdout = b''
self.stderr = b''
def _redirect_stdout(self, to):
sys.stdout.close()
os.dup2(to, self.fdout)
sys.stdout = os.fdopen(self.fdout, 'w')
def _redirect_stderr(self, to):
sys.stderr.close()
os.dup2(to, self.fderr)
sys.stderr = os.fdopen(self.fderr, 'w')
def __enter__(self):
self.fdout = sys.stdout.fileno()
self.fderr = sys.stderr.fileno()
self.reader_out, self.writer_out = os.pipe()
self.reader_err, self.writer_err = os.pipe()
self.old_stdout = os.dup(self.fdout)
self.old_stderr = os.dup(self.fderr)
file_out = os.fdopen(self.writer_out, 'w')
file_err = os.fdopen(self.writer_err, 'w')
self._redirect_stdout(to=file_out.fileno())
self._redirect_stderr(to=file_err.fileno())
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._redirect_stdout(to=self.old_stdout)
self._redirect_stderr(to=self.old_stderr)
os.close(self.old_stdout)
os.close(self.old_stderr)
self.stdout = self.recvall(self.reader_out)
self.stderr = self.recvall(self.reader_err)
os.close(self.reader_out)
os.close(self.reader_err)
@staticmethod
def recvall(reader, length=1024) -> bytes:
fragments = []
while 1:
chunk = os.read(reader, length)
if chunk:
fragments.append(chunk)
else:
break
output = b''.join(fragments)
return output
class CaptureNemuIpc(CaptureStd):
instance = None
def is_capturing(self):
"""
Only capture at the topmost wrapper to avoid nested capturing
If a capture is ongoing, this instance does nothing
"""
cls = self.__class__
return isinstance(cls.instance, cls) and cls.instance != self
def __enter__(self):
if self.is_capturing():
return self
super().__enter__()
CaptureNemuIpc.instance = self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.is_capturing():
return
CaptureNemuIpc.instance = None
super().__exit__(exc_type, exc_val, exc_tb)
self.check_stdout()
self.check_stderr()
def check_stdout(self):
if not self.stdout:
return
logger.info(f'NemuIpc stdout: {self.stdout}')
def check_stderr(self):
if not self.stderr:
return
logger.error(f'NemuIpc stderr: {self.stderr}')
# Calling an old MuMu12 player
# Tested on 3.4.0
# b'nemu_capture_display rpc error: 1783\r\n'
# Tested on 3.7.3
# b'nemu_capture_display rpc error: 1745\r\n'
if b'error: 1783' in self.stderr or b'error: 1745' in self.stderr:
raise NemuIpcIncompatible(
f'NemuIpc requires MuMu12 version >= 3.8.13, please check your version')
# contact_id incorrect
# b'nemu_capture_display cannot find rpc connection\r\n'
if b'cannot find rpc connection' in self.stderr:
raise NemuIpcError(self.stderr)
# Emulator died
# b'nemu_capture_display rpc error: 1722\r\n'
# MuMuVMMSVC.exe died
# b'nemu_capture_display rpc error: 1726\r\n'
# No idea how to handle yet
if b'error: 1722' in self.stderr or b'error: 1726' in self.stderr:
raise NemuIpcError('Emulator instance is probably dead')
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (NemuIpcImpl):
"""
init = None
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# Can't handle
except NemuIpcIncompatible as e:
logger.error(e)
break
# Function call timeout
except asyncio.TimeoutError:
logger.warning(f'Func {func.__name__}() call timeout, retrying: {_}')
def init():
self.reconnect()
# NemuIpcError
except NemuIpcError as e:
logger.error(e)
def init():
self.reconnect()
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
class NemuIpcImpl:
def __init__(self, nemu_folder: str, instance_id: int, display_id: int = 0):
"""
Args:
nemu_folder: Installation path of MuMu12, e.g. E:/ProgramFiles/MuMuPlayer-12.0
instance_id: Emulator instance ID, starting from 0
display_id: Always 0 if keep app alive was disabled
"""
self.nemu_folder: str = nemu_folder
self.instance_id: int = instance_id
self.display_id: int = display_id
ipc_dll = os.path.abspath(os.path.join(nemu_folder, './shell/sdk/external_renderer_ipc.dll'))
logger.info(
f'NemuIpcImpl init, '
f'nemu_folder={nemu_folder}, '
f'ipc_dll={ipc_dll}, '
f'instance_id={instance_id}, '
f'display_id={display_id}'
)
try:
self.lib = ctypes.CDLL(ipc_dll)
except OSError as e:
logger.error(e)
# OSError: [WinError 126] 找不到指定的模块。
if not os.path.exists(ipc_dll):
raise NemuIpcIncompatible(
f'ipc_dll={ipc_dll} does not exist, '
f'NemuIpc requires MuMu12 version >= 3.8.13, please check your version')
else:
raise NemuIpcIncompatible(
f'ipc_dll={ipc_dll} exists, but cannot be loaded')
self.connect_id: int = 0
self.width = 0
self.height = 0
def connect(self):
if self.connect_id > 0:
return
connect_id = self.ev_run_sync(
self.lib.nemu_connect,
self.nemu_folder, self.instance_id
)
if connect_id == 0:
raise NemuIpcError(
'Connection failed, please check if nemu_folder is correct and emulator is running'
)
self.connect_id = connect_id
# logger.info(f'NemuIpc connected: {self.connect_id}')
def disconnect(self):
if self.connect_id == 0:
return
self.ev_run_sync(
self.lib.nemu_disconnect,
self.connect_id
)
# logger.info(f'NemuIpc disconnected: {self.connect_id}')
self.connect_id = 0
def reconnect(self):
self.disconnect()
self.connect()
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
@cached_property
def _ev(self):
return asyncio.new_event_loop()
async def ev_run_async(self, func, *args, **kwargs):
"""
Args:
func: Sync function to call
*args:
**kwargs:
Raises:
asyncio.TimeoutError: If function call timeout
"""
func_wrapped = partial(func, *args, **kwargs)
# Increased timeout for slow PCs
# Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs
result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=0.15)
return result
def ev_run_sync(self, func, *args, **kwargs):
"""
Args:
func: Sync function to call
*args:
**kwargs:
Raises:
asyncio.TimeoutError: If function call timeout
NemuIpcIncompatible:
NemuIpcError
"""
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
err = False
if func.__name__ == 'nemu_connect':
if result == 0:
err = True
else:
if result > 0:
err = True
# Get to actual error message printed in std
if err:
logger.warning(f'Failed to call {func.__name__}, result={result}')
with CaptureNemuIpc():
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
return result
def get_resolution(self):
"""
Get emulator resolution, `self.width` and `self.height` will be set
"""
if self.connect_id == 0:
self.connect()
width_ptr = ctypes.pointer(ctypes.c_int(0))
height_ptr = ctypes.pointer(ctypes.c_int(0))
nullptr = ctypes.POINTER(ctypes.c_int)()
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, 0, width_ptr, height_ptr, nullptr
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during get_resolution()')
self.width = width_ptr.contents.value
self.height = height_ptr.contents.value
@retry
def screenshot(self):
"""
Returns:
np.ndarray: Image array in RGBA color space
Note that image is upside down
"""
if self.connect_id == 0:
self.connect()
self.get_resolution()
width_ptr = ctypes.pointer(ctypes.c_int(self.width))
height_ptr = ctypes.pointer(ctypes.c_int(self.height))
length = self.width * self.height * 4
pixels_pointer = ctypes.pointer((ctypes.c_ubyte * length)())
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, length, width_ptr, height_ptr, pixels_pointer
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during screenshot()')
# image = np.ctypeslib.as_array(pixels_pointer, shape=(self.height, self.width, 4))
image = np.ctypeslib.as_array(pixels_pointer.contents).reshape((self.height, self.width, 4))
return image
def convert_xy(self, x, y):
"""
Convert classic ADB coordinates to Nemu's
`self.height` must be updated before calling this method
Returns:
int, int
"""
x, y = int(x), int(y)
x, y = self.height - y, x
return x, y
@retry
def down(self, x, y):
"""
Contact down, continuous contact down will be considered as swipe
"""
if self.connect_id == 0:
self.connect()
if self.height == 0:
self.get_resolution()
x, y = self.convert_xy(x, y)
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_down,
self.connect_id, self.display_id, x, y
)
if ret > 0:
raise NemuIpcError('nemu_input_event_touch_down failed')
@retry
def up(self):
"""
Contact up
"""
if self.connect_id == 0:
self.connect()
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_up,
self.connect_id, self.display_id
)
if ret > 0:
raise NemuIpcError('nemu_input_event_touch_up failed')
def serial_to_id(serial: str):
"""
Predict instance ID from serial
E.g.
"127.0.0.1:16384" -> 0
"127.0.0.1:16416" -> 1
Returns:
int: instance_id, or None if failed to predict
"""
try:
port = int(serial.split(':')[1])
except (IndexError, ValueError):
return None
index, offset = divmod(port - 16384, 32)
if 0 <= index < 32 and offset in [0, 1, 2]:
return index
else:
return None
class NemuIpc(Platform):
@cached_property
def nemu_ipc(self) -> NemuIpcImpl:
"""
Initialize a nemu ipc implementation
"""
# Try existing settings first
if self.config.EmulatorInfo_path:
folder = os.path.abspath(os.path.join(self.config.EmulatorInfo_path, '../../'))
index = serial_to_id(self.serial)
if index is not None:
try:
return NemuIpcImpl(
nemu_folder=folder,
instance_id=index,
display_id=0
).__enter__()
except (NemuIpcIncompatible, NemuIpcError) as e:
logger.error(e)
logger.error('Emulator info incorrect')
# Search emulator instance
# with E:\ProgramFiles\MuMuPlayer-12.0\shell\MuMuPlayer.exe
# installation path is E:\ProgramFiles\MuMuPlayer-12.0
if self.emulator_instance is None:
logger.error('Unable to use NemuIpc because emulator instance not found')
raise RequestHumanTakeover
try:
return NemuIpcImpl(
nemu_folder=self.emulator_instance.emulator.abspath('../'),
instance_id=self.emulator_instance.MuMuPlayer12_id,
display_id=0
).__enter__()
except (NemuIpcIncompatible, NemuIpcError) as e:
logger.error(e)
logger.error('Unable to initialize NemuIpc')
raise RequestHumanTakeover
def nemu_ipc_available(self) -> bool:
if not self.is_mumu_family:
return False
if self.nemud_app_keep_alive == '':
return False
try:
_ = self.nemu_ipc
except RequestHumanTakeover:
return False
return True
def nemu_ipc_release(self):
if has_cached_property(self, 'nemu_ipc'):
self.nemu_ipc.disconnect()
del_cached_property(self, 'nemu_ipc')
logger.info('nemu_ipc released')
def screenshot_nemu_ipc(self):
image = self.nemu_ipc.screenshot()
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
cv2.flip(image, 0, dst=image)
return image
def click_nemu_ipc(self, x, y):
down = ensure_time((0.010, 0.020))
self.nemu_ipc.down(x, y)
self.sleep(down)
self.nemu_ipc.up()
self.sleep(0.050 - down)
def long_click_nemu_ipc(self, x, y, duration=1.0):
self.nemu_ipc.down(x, y)
self.sleep(duration)
self.nemu_ipc.up()
self.sleep(0.050)
def swipe_nemu_ipc(self, p1, p2):
points = insert_swipe(p0=p1, p3=p2)
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
self.nemu_ipc.up()
self.sleep(0.050)
def drag_nemu_ipc(self, p1, p2, point_random=(-10, -10, 10, 10)):
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
points = insert_swipe(p0=p1, p3=p2, speed=20)
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.up()
self.sleep(0.050)

View File

@ -243,7 +243,7 @@ class Uiautomator2(Connection):
return hierarchy
@retry
def resolution_uiautomator2(self) -> t.Tuple[int, int]:
def resolution_uiautomator2(self, cal_rotation=True) -> t.Tuple[int, int]:
"""
Faster u2.window_size(), cause that calls `dumpsys display` twice.
@ -252,9 +252,10 @@ class Uiautomator2(Connection):
"""
info = self.u2.http.get('/info').json()
w, h = info['display']['width'], info['display']['height']
rotation = self.get_orientation()
if (w > h) != (rotation % 2 == 1):
w, h = h, w
if cal_rotation:
rotation = self.get_orientation()
if (w > h) != (rotation % 2 == 1):
w, h = h, w
return w, h
def resolution_check_uiautomator2(self):

View File

@ -36,6 +36,21 @@ def get_serial_pair(serial):
return None, None
def remove_duplicated_path(paths):
"""
Args:
paths (list[str]):
Returns:
list[str]:
"""
paths = sorted(set(paths))
dic = {}
for path in paths:
dic.setdefault(path.lower(), path)
return list(dic.values())
@dataclass
class EmulatorInstanceBase:
# Serial for adb connection
@ -205,6 +220,14 @@ class EmulatorBase:
class EmulatorManagerBase:
@staticmethod
def iter_running_emulator():
"""
Yields:
str: Path to emulator executables, may contains duplicate values
"""
return
@cached_property
def all_emulators(self) -> t.List[EmulatorBase]:
"""

View File

@ -8,7 +8,8 @@ from dataclasses import dataclass
# module/device/platform/emulator_base.py
# module/device/platform/emulator_windows.py
# Will be used in Alas Easy Install, they shouldn't import any Alas modules.
from module.device.platform.emulator_base import EmulatorBase, EmulatorInstanceBase, EmulatorManagerBase
from module.device.platform.emulator_base import EmulatorBase, EmulatorInstanceBase, EmulatorManagerBase, \
remove_duplicated_path
from module.device.platform.utils import cached_property, iter_folder
@ -70,7 +71,7 @@ class Emulator(EmulatorBase):
def path_to_type(cls, path: str) -> str:
"""
Args:
path: Path to .exe file
path: Path to .exe file, case insensitive
Returns:
str: Emulator type, such as Emulator.NoxPlayer
@ -78,46 +79,49 @@ class Emulator(EmulatorBase):
folder, exe = os.path.split(path)
folder, dir1 = os.path.split(folder)
folder, dir2 = os.path.split(folder)
if exe == 'Nox.exe':
if dir2 == 'Nox':
exe = exe.lower()
dir1 = dir1.lower()
dir2 = dir2.lower()
if exe == 'nox.exe':
if dir2 == 'nox':
return cls.NoxPlayer
elif dir2 == 'Nox64':
elif dir2 == 'nox64':
return cls.NoxPlayer64
else:
return cls.NoxPlayer
if exe == 'Bluestacks.exe':
if dir1 in ['BlueStacks', 'BlueStacks_cn']:
if exe == 'bluestacks.exe':
if dir1 in ['bluestacks', 'bluestacks_cn']:
return cls.BlueStacks4
elif dir1 in ['BlueStacks_nxt', 'BlueStacks_nxt_cn']:
elif dir1 in ['bluestacks_nxt', 'bluestacks_nxt_cn']:
return cls.BlueStacks5
else:
return cls.BlueStacks4
if exe == 'HD-Player.exe':
if dir1 in ['BlueStacks', 'BlueStacks_cn']:
if exe == 'hd-player.exe':
if dir1 in ['bluestacks', 'bluestacks_cn']:
return cls.BlueStacks4
elif dir1 in ['BlueStacks_nxt', 'BlueStacks_nxt_cn']:
elif dir1 in ['bluestacks_nxt', 'bluestacks_nxt_cn']:
return cls.BlueStacks5
else:
return cls.BlueStacks5
if exe == 'dnplayer.exe':
if dir1 == 'LDPlayer':
if dir1 == 'ldplayer':
return cls.LDPlayer3
elif dir1 == 'LDPlayer4':
elif dir1 == 'ldplayer4':
return cls.LDPlayer4
elif dir1 == 'LDPlayer9':
elif dir1 == 'ldplayer9':
return cls.LDPlayer9
else:
return cls.LDPlayer3
if exe == 'NemuPlayer.exe':
if exe == 'nemuplayer.exe':
if dir2 == 'nemu':
return cls.MuMuPlayer
elif dir2 == 'nemu9':
return cls.MuMuPlayerX
else:
return cls.MuMuPlayer
if exe == 'MuMuPlayer.exe':
if exe == 'mumuplayer.exe':
return cls.MuMuPlayer12
if exe == 'MEmu.exe':
if exe == 'memu.exe':
return cls.MEmuPlayer
return ''
@ -143,7 +147,9 @@ class Emulator(EmulatorBase):
elif 'NemuMultiPlayer.exe' in exe:
yield exe.replace('NemuMultiPlayer.exe', 'NemuPlayer.exe')
elif 'MuMuMultiPlayer.exe' in exe:
yield exe.replace('MuMuMultiPlayer.exe', 'MuMuManager.exe')
yield exe.replace('MuMuMultiPlayer.exe', 'MuMuPlayer.exe')
elif 'MuMuManager.exe' in exe:
yield exe.replace('MuMuManager.exe', 'MuMuPlayer.exe')
elif 'MEmuConsole.exe' in exe:
yield exe.replace('MEmuConsole.exe', 'MEmu.exe')
else:
@ -316,7 +322,7 @@ class EmulatorManager(EmulatorManagerBase):
Get recently executed programs in UserAssist
https://github.com/forensicmatt/MonitorUserAssist
Returns:
Yields:
str: Path to emulator executables, may contains duplicate values
"""
path = r'Software\Microsoft\Windows\CurrentVersion\Explorer\UserAssist'
@ -447,6 +453,31 @@ class EmulatorManager(EmulatorManagerBase):
uninstall = res.group(1) if res else uninstall
yield uninstall
@staticmethod
def iter_running_emulator():
"""
Yields:
str: Path to emulator executables, may contains duplicate values
"""
try:
import psutil
except ModuleNotFoundError:
return
# Since this is a one-time-usage, we access psutil._psplatform.Process directly
# to bypass the call of psutil.Process.is_running().
# This only costs about 0.017s.
for pid in psutil.pids():
proc = psutil._psplatform.Process(pid)
try:
exe = proc.cmdline()
exe = exe[0].replace(r'\\', '/').replace('\\', '/')
except (psutil.AccessDenied, IndexError):
# psutil.AccessDenied
continue
if Emulator.is_emulator(exe):
yield exe
@cached_property
def all_emulators(self) -> t.List[Emulator]:
"""
@ -474,7 +505,7 @@ class EmulatorManager(EmulatorManagerBase):
exe.add(ld)
# Uninstall registry
for uninstall in self.iter_uninstall_registry():
for uninstall in EmulatorManager.iter_uninstall_registry():
# Find emulator executable from uninstaller
for file in iter_folder(abspath(os.path.dirname(uninstall)), ext='.exe'):
if Emulator.is_emulator(file) and os.path.exists(file):
@ -488,9 +519,14 @@ class EmulatorManager(EmulatorManagerBase):
if Emulator.is_emulator(file) and os.path.exists(file):
exe.add(file)
# Running
for file in EmulatorManager.iter_running_emulator():
if os.path.exists(file):
exe.add(file)
# De-redundancy
exe = [Emulator(path).path for path in exe if Emulator.is_emulator(path)]
exe = sorted(set(exe))
exe = [Emulator(path) for path in exe]
exe = [Emulator(path) for path in remove_duplicated_path(exe)]
return exe
@cached_property

View File

@ -6,7 +6,8 @@ from pydantic import BaseModel
from module.base.decorator import cached_property, del_cached_property
from module.base.utils import SelectedGrids
from module.device.connection import Connection
from module.device.platform.emulator_base import EmulatorInstanceBase, EmulatorManagerBase
from module.device.method.utils import get_serial_pair
from module.device.platform.emulator_base import EmulatorInstanceBase, EmulatorManagerBase, remove_duplicated_path
from module.logger import logger
@ -47,8 +48,20 @@ class PlatformBase(Connection, EmulatorManagerBase):
@cached_property
def emulator_info(self) -> EmulatorInfo:
emulator = self.config.EmulatorInfo_Emulator
name = str(self.config.EmulatorInfo_name).strip().replace('\n', '')
path = str(self.config.EmulatorInfo_path).strip().replace('\n', '')
if emulator == 'auto':
emulator = ''
def parse_info(value):
if isinstance(value, str):
value = value.strip().replace('\n', '')
if value in ['None', 'False', 'True']:
value = ''
return value
else:
return ''
name = parse_info(self.config.EmulatorInfo_name)
path = parse_info(self.config.EmulatorInfo_path)
return EmulatorInfo(
emulator=emulator,
@ -68,8 +81,14 @@ class PlatformBase(Connection, EmulatorManagerBase):
path=data.path,
name=data.name,
)
# Redirect emulator-5554 to 127.0.0.1:5555
serial = self.serial
port_serial, _ = get_serial_pair(self.serial)
if port_serial is not None:
serial = port_serial
instance = self.find_emulator_instance(
serial=str(self.config.Emulator_Serial).strip(),
serial=serial,
name=data.name,
path=data.path,
emulator=data.emulator,
@ -117,7 +136,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
# Search by serial
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instance with {search_args}')
logger.warning(f'No emulator instance with {search_args}, serial invalid')
return None
if select.count == 1:
instance = select[0]
@ -130,9 +149,9 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['name'] = name
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, name invalid')
search_args.pop('name')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
@ -143,9 +162,9 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['path'] = path
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, path invalid')
search_args.pop('path')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
@ -156,9 +175,28 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['type'] = emulator
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, type invalid')
search_args.pop('type')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
# Still too many instances, search from running emulators
running = remove_duplicated_path(list(self.iter_running_emulator()))
logger.info('Running emulators')
for exe in running:
logger.info(exe)
if len(running) == 1:
logger.info('Only one running emulator')
# Same as searching path
search_args['path'] = running[0]
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}, path invalid')
search_args.pop('path')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
@ -167,9 +205,3 @@ class PlatformBase(Connection, EmulatorManagerBase):
# Still too many instances
logger.warning(f'Found multiple emulator instances with {search_args}')
return None
if __name__ == '__main__':
self = PlatformBase('alas')
d = self.emulator_instance
print(d)

View File

@ -13,13 +13,14 @@ from module.base.utils import get_color, image_size, limit_in, save_image
from module.device.method.adb import Adb
from module.device.method.ascreencap import AScreenCap
from module.device.method.droidcast import DroidCast
from module.device.method.nemu_ipc import NemuIpc
from module.device.method.scrcpy import Scrcpy
from module.device.method.wsa import WSA
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy, NemuIpc):
_screen_size_checked = False
_screen_black_checked = False
_minicap_uninstalled = False
@ -38,6 +39,7 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
'DroidCast': self.screenshot_droidcast,
'DroidCast_raw': self.screenshot_droidcast_raw,
'scrcpy': self.screenshot_scrcpy,
'nemu_ipc': self.screenshot_nemu_ipc,
}
def screenshot(self):
@ -70,6 +72,10 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
return self.image
@property
def has_cached_image(self):
return hasattr(self, 'image') and self.image is not None
def _handle_orientated_image(self, image):
"""
Args:
@ -159,6 +165,9 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
if interval != origin:
logger.warning(f'Optimization.ScreenshotInterval {origin} is revised to {interval}')
self.config.Optimization_ScreenshotInterval = interval
# Allow nemu_ipc to have a lower default
if self.config.Emulator_ScreenshotMethod == 'nemu_ipc':
interval = limit_in(origin, 0.1, 0.2)
elif interval == 'combat':
origin = self.config.Optimization_CombatScreenshotInterval
interval = limit_in(origin, 0.3, 1.0)

View File

@ -160,6 +160,7 @@ class DraggableList:
logger.info(f'Insight row: {row}, index={row_index}')
last_buttons: set[OcrResultButton] = None
bottom_check = Timer(3, count=5).start()
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
@ -183,8 +184,11 @@ class DraggableList:
0, count=0), timeout=Timer(1.5, count=5))
skip_first_screenshot = True
if self.cur_buttons and last_buttons == set(self.cur_buttons):
logger.warning(f'No more rows in {self}')
return False
if bottom_check.reached():
logger.warning(f'No more rows in {self}')
return False
else:
bottom_check.reset()
last_buttons = set(self.cur_buttons)
return True

View File

@ -1,3 +1,4 @@
from module.logger import logger
from tasks.map.control.waypoint import Waypoint
from tasks.map.keywords.plane import Jarilo_CorridorofFadingEchoes
from tasks.rogue.route.base import RouteBase
@ -167,12 +168,23 @@ class Route(RouteBase):
enemy2right.straight_run(),
enemy2left.straight_run().set_threshold(5),
)
self.clear_enemy(
enemy2left.set_threshold(5),
node3.straight_run(),
node4.set_threshold(3).straight_run(),
enemy4.straight_run(),
)
if self.minimap.is_position_near(enemy2left.position, threshold=30):
logger.info('Near enemy2right')
self.clear_enemy(
enemy2left.set_threshold(5),
node3.straight_run(),
node4.set_threshold(3).straight_run(),
enemy4.straight_run(),
)
else:
logger.info('Not near enemy2right')
self.clear_enemy(
enemy2right.set_threshold(5),
enemy2left.set_threshold(5),
node3.straight_run(),
node4.set_threshold(3).straight_run(),
enemy4.straight_run(),
)
def Jarilo_CorridorofFadingEchoes_F1_X437Y122(self):
"""

View File

@ -386,7 +386,7 @@ class MapControl(Combat, AimDetectorMixin):
end_point = waypoints[-1]
end_point.expected_end.append('item')
self.goto(*waypoints)
return self.goto(*waypoints)
def clear_enemy(self, *waypoints):
"""
@ -403,7 +403,7 @@ class MapControl(Combat, AimDetectorMixin):
end_point = waypoints[-1]
end_point.expected_end.append('enemy')
self.goto(*waypoints)
return self.goto(*waypoints)
if __name__ == '__main__':

View File

@ -249,7 +249,7 @@ class RogueEntry(RouteBase, RogueRewardHandler, RoguePathHandler, DungeonUI):
self.device.click(WORLD_ENTER)
self.interval_reset(REWARD_ENTER, interval=2)
continue
if self.appear(LEVEL_CONFIRM, interval=2):
if self.match_template_color(LEVEL_CONFIRM, interval=2):
self.dungeon_update_stamina()
self.check_stop_condition()
self.device.click(LEVEL_CONFIRM)
@ -332,7 +332,8 @@ class RogueEntry(RouteBase, RogueRewardHandler, RoguePathHandler, DungeonUI):
"""
logger.info(f'RogueWorld_UseImmersifier={self.config.RogueWorld_UseImmersifier}, '
f'RogueWorld_UseStamina={self.config.RogueWorld_UseStamina}, '
f'RogueWorld_DoubleEvent={self.config.RogueWorld_DoubleEvent}'
f'RogueWorld_DoubleEvent={self.config.RogueWorld_DoubleEvent}, '
f'RogueWorld_WeeklyFarming={self.config.RogueWorld_WeeklyFarming}, '
f'RogueDebug_DebugMode={self.config.RogueDebug_DebugMode}')
# This shouldn't happen
if self.config.RogueWorld_UseStamina and not self.config.RogueWorld_UseImmersifier:
@ -345,13 +346,23 @@ class RogueEntry(RouteBase, RogueRewardHandler, RoguePathHandler, DungeonUI):
if self.config.RogueDebug_DebugMode:
# Always run
return
if self.config.stored.SimulatedUniverseElite.is_expired():
# Expired, reset farming counter
self.config.stored.SimulatedUniverseElite.farm_reset()
if self.config.stored.SimulatedUniverse.is_expired():
# Expired, do rogue
pass
elif self.config.stored.SimulatedUniverse.is_full():
if self.config.RogueWorld_UseImmersifier and self.config.stored.Immersifier.value > 0:
logger.info('Reached weekly point limit but still have immersifiers left, continue to use them')
logger.info(
'Reached weekly point limit but still have immersifiers left, continue to use them')
elif self.config.RogueWorld_WeeklyFarming and self.config.stored.SimulatedUniverseElite.farm_not_complete():
logger.info(
'Reached weekly point limit but still continue to farm materials')
logger.attr(
"Farming Counter", self.config.stored.SimulatedUniverseElite.farm_get_remain())
else:
raise RogueReachedWeeklyPointLimit
else:

View File

@ -205,7 +205,7 @@ class RoguePathHandler(RogueUI):
logger.info('rogue_path_select ended at page_main')
break
if self.appear(ROGUE_LAUNCH, interval=2):
if self.match_template_color(ROGUE_LAUNCH, interval=2):
if not self._is_team_prepared():
raise RogueTeamNotPrepared
self.device.click(ROGUE_LAUNCH)

View File

@ -170,6 +170,14 @@ class RouteBase(RouteBase_, RogueExit, RogueEvent, RogueReward):
pass
result = super().clear_enemy(*waypoints)
# logger.attr("result",result)
if 'enemy' in result:
# runs when one elite battle finishes, and decreases rogue farming count by 1
if self.config.RogueWorld_WeeklyFarming and self.config.stored.SimulatedUniverseElite.farm_not_complete():
self.config.stored.SimulatedUniverseElite.farm_dec()
logger.info(
f"Cleared elite boss, decreasing farming count by 1, now {self.config.stored.SimulatedUniverseElite.farm_get_remain()}")
return result
def _domain_event_expected_end(self):