mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2025-01-07 15:02:26 +00:00
343 lines
10 KiB
Python
343 lines
10 KiB
Python
import ctypes
|
|
import os
|
|
import subprocess
|
|
import typing as t
|
|
from dataclasses import dataclass
|
|
from functools import wraps
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from module.base.decorator import cached_property
|
|
from module.device.method.utils import RETRY_TRIES, get_serial_pair, retry_sleep
|
|
from module.device.platform import Platform
|
|
from module.exception import RequestHumanTakeover
|
|
from module.logger import logger
|
|
|
|
|
|
class LDOpenGLIncompatible(Exception):
|
|
pass
|
|
|
|
|
|
class LDOpenGLError(Exception):
|
|
pass
|
|
|
|
|
|
def bytes_to_str(b: bytes) -> str:
|
|
for encoding in ['utf-8', 'gbk']:
|
|
try:
|
|
return b.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
pass
|
|
return str(b)
|
|
|
|
|
|
@dataclass
|
|
class DataLDPlayerInfo:
|
|
# Emulator instance index, starting from 0
|
|
index: int
|
|
# Instance name
|
|
name: str
|
|
# Handle of top window
|
|
topWnd: int
|
|
# Handle of bind window
|
|
bndWnd: int
|
|
# If instance is running, 1 for True, 0 for False
|
|
sysboot: int
|
|
# PID of the instance process, or -1 if instance is not running
|
|
playerpid: int
|
|
# PID of the vbox process, or -1 if instance is not running
|
|
vboxpid: int
|
|
# Resolution
|
|
width: int
|
|
height: int
|
|
dpi: int
|
|
|
|
def __post_init__(self):
|
|
self.index = int(self.index)
|
|
self.name = bytes_to_str(self.name)
|
|
self.topWnd = int(self.topWnd)
|
|
self.bndWnd = int(self.bndWnd)
|
|
self.sysboot = int(self.sysboot)
|
|
self.playerpid = int(self.playerpid)
|
|
self.vboxpid = int(self.vboxpid)
|
|
self.width = int(self.width)
|
|
self.height = int(self.height)
|
|
self.dpi = int(self.dpi)
|
|
|
|
|
|
class LDConsole:
|
|
def __init__(self, ld_folder: str):
|
|
"""
|
|
Args:
|
|
ld_folder: Installation path of MuMu12, e.g. E:/ProgramFiles/LDPlayer9
|
|
which should have a `ldconsole.exe` in it.
|
|
"""
|
|
self.ld_console = os.path.abspath(os.path.join(ld_folder, './ldconsole.exe'))
|
|
|
|
def subprocess_run(self, cmd, timeout=10):
|
|
"""
|
|
Args:
|
|
cmd (list):
|
|
timeout (int):
|
|
|
|
Returns:
|
|
bytes:
|
|
"""
|
|
cmd = [self.ld_console] + cmd
|
|
logger.info(f'Execute: {cmd}')
|
|
|
|
try:
|
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=False)
|
|
except FileNotFoundError as e:
|
|
logger.warning(f'warning when calling {cmd}, {str(e)}')
|
|
raise LDOpenGLIncompatible(f'ld_folder does not have ldconsole.exe')
|
|
try:
|
|
stdout, stderr = process.communicate(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
stdout, stderr = process.communicate()
|
|
logger.warning(f'TimeoutExpired when calling {cmd}, stdout={stdout}, stderr={stderr}')
|
|
return stdout
|
|
|
|
def list2(self) -> t.List[DataLDPlayerInfo]:
|
|
"""
|
|
> ldconsole.exe list2
|
|
0,雷电模拟器,28053900,42935798,1,59776,36816,1280,720,240
|
|
1,雷电模拟器-1,0,0,0,-1,-1,1280,720,240
|
|
"""
|
|
out = []
|
|
data = self.subprocess_run(['list2'])
|
|
for row in data.strip().split(b'\n'):
|
|
info = row.strip().split(b',')
|
|
info = DataLDPlayerInfo(*info)
|
|
out.append(info)
|
|
return out
|
|
|
|
|
|
class IScreenShotClass:
|
|
def __init__(self, ptr):
|
|
self.ptr = ptr
|
|
|
|
# Define in class since ctypes.WINFUNCTYPE is windows only
|
|
cap_type = ctypes.WINFUNCTYPE(ctypes.c_void_p)
|
|
release_type = ctypes.WINFUNCTYPE(None)
|
|
self.class_cap = cap_type(1, "IScreenShotClass_Cap")
|
|
# Keep reference count
|
|
# so __del__ won't have an empty IScreenShotClass_Cap
|
|
self.class_release = release_type(2, "IScreenShotClass_Release")
|
|
|
|
def cap(self):
|
|
return self.class_cap(self.ptr)
|
|
|
|
def __del__(self):
|
|
self.class_release(self.ptr)
|
|
|
|
|
|
def retry(func):
|
|
@wraps(func)
|
|
def retry_wrapper(self, *args, **kwargs):
|
|
"""
|
|
Args:
|
|
self (NemuIpcImpl):
|
|
"""
|
|
init = None
|
|
for _ in range(RETRY_TRIES):
|
|
try:
|
|
if callable(init):
|
|
retry_sleep(_)
|
|
init()
|
|
return func(self, *args, **kwargs)
|
|
# Can't handle
|
|
except RequestHumanTakeover:
|
|
break
|
|
# Can't handle
|
|
except LDOpenGLIncompatible as e:
|
|
logger.error(e)
|
|
break
|
|
# NemuIpcError
|
|
except LDOpenGLError as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
pass
|
|
# Unknown, probably a trucked image
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
def init():
|
|
pass
|
|
|
|
logger.critical(f'Retry {func.__name__}() failed')
|
|
raise RequestHumanTakeover
|
|
|
|
return retry_wrapper
|
|
|
|
|
|
class LDOpenGLImpl:
|
|
def __init__(self, ld_folder: str, instance_id: int):
|
|
"""
|
|
Args:
|
|
ld_folder: Installation path of MuMu12, e.g. E:/ProgramFiles/LDPlayer9
|
|
instance_id: Emulator instance ID, starting from 0
|
|
"""
|
|
ldopengl_dll = os.path.abspath(os.path.join(ld_folder, './ldopengl64.dll'))
|
|
logger.info(
|
|
f'LDOpenGL init, '
|
|
f'ld_folder={ld_folder}, '
|
|
f'ldopengl_dll={ldopengl_dll}, '
|
|
f'instance_id={instance_id}'
|
|
)
|
|
# Load dll
|
|
try:
|
|
self.lib = ctypes.WinDLL(ldopengl_dll)
|
|
except OSError as e:
|
|
logger.error(e)
|
|
if not os.path.exists(ldopengl_dll):
|
|
raise LDOpenGLIncompatible(
|
|
f'ldopengl_dll={ldopengl_dll} does not exist, '
|
|
f'ldopengl requires LDPlayer >= 9.0.78, please check your version'
|
|
)
|
|
else:
|
|
raise LDOpenGLIncompatible(
|
|
f'ldopengl_dll={ldopengl_dll} exist, '
|
|
f'but cannot be loaded'
|
|
)
|
|
# Get info after loading DLL, so DLL existence can act as a version check
|
|
self.console = LDConsole(ld_folder)
|
|
self.info = self.get_player_info_by_index(instance_id)
|
|
|
|
self.lib.CreateScreenShotInstance.restype = ctypes.c_void_p
|
|
|
|
# Get screenshot instance
|
|
instance_ptr = ctypes.c_void_p(self.lib.CreateScreenShotInstance(instance_id, self.info.playerpid))
|
|
self.screenshot_instance = IScreenShotClass(instance_ptr)
|
|
|
|
def get_player_info_by_index(self, instance_id: int):
|
|
"""
|
|
Args:
|
|
instance_id:
|
|
|
|
Returns:
|
|
DataLDPlayerInfo:
|
|
|
|
Raises:
|
|
LDOpenGLError:
|
|
"""
|
|
for info in self.console.list2():
|
|
if info.index == instance_id:
|
|
logger.info(f'Match LDPlayer instance: {info}')
|
|
if not info.sysboot:
|
|
raise LDOpenGLError('Trying to connect LDPlayer instance but emulator is not running')
|
|
return info
|
|
raise LDOpenGLError(f'No LDPlayer instance with index {instance_id}')
|
|
|
|
@retry
|
|
def screenshot(self):
|
|
"""
|
|
Returns:
|
|
np.ndarray: Image array in BGR color space
|
|
Note that image is upside down
|
|
"""
|
|
width, height = self.info.width, self.info.height
|
|
|
|
img_ptr = self.screenshot_instance.cap()
|
|
# ValueError: NULL pointer access
|
|
if img_ptr is None:
|
|
raise LDOpenGLError('Empty image pointer')
|
|
|
|
img = ctypes.cast(img_ptr, ctypes.POINTER(ctypes.c_ubyte * (height * width * 3))).contents
|
|
|
|
image = np.ctypeslib.as_array(img).reshape((height, width, 3))
|
|
return image
|
|
|
|
@staticmethod
|
|
def serial_to_id(serial: str):
|
|
"""
|
|
Predict instance ID from serial
|
|
E.g.
|
|
"127.0.0.1:5555" -> 0
|
|
"127.0.0.1:5557" -> 1
|
|
"emulator-5554" -> 0
|
|
|
|
Returns:
|
|
int: instance_id, or None if failed to predict
|
|
"""
|
|
serial, _ = get_serial_pair(serial)
|
|
if serial is None:
|
|
return None
|
|
try:
|
|
port = int(serial.split(':')[1])
|
|
except (IndexError, ValueError):
|
|
return None
|
|
if 5555 <= port <= 5555 + 32:
|
|
return int((port - 5555) // 2)
|
|
return None
|
|
|
|
|
|
class LDOpenGL(Platform):
|
|
@cached_property
|
|
def ldopengl(self):
|
|
"""
|
|
Initialize a ldopengl implementation
|
|
"""
|
|
# Try existing settings first
|
|
if self.config.EmulatorInfo_path:
|
|
folder = os.path.abspath(os.path.join(self.config.EmulatorInfo_path, '../'))
|
|
index = LDOpenGLImpl.serial_to_id(self.serial)
|
|
if index is not None:
|
|
try:
|
|
return LDOpenGLImpl(
|
|
ld_folder=folder,
|
|
instance_id=index,
|
|
)
|
|
except (LDOpenGLIncompatible, LDOpenGLError) as e:
|
|
logger.error(e)
|
|
logger.error('Emulator info incorrect')
|
|
|
|
# Search emulator instance
|
|
# with E:/ProgramFiles/LDPlayer9/dnplayer.exe
|
|
# installation path is E:/ProgramFiles/LDPlayer9
|
|
if self.emulator_instance is None:
|
|
logger.error('Unable to use LDOpenGL because emulator instance not found')
|
|
raise RequestHumanTakeover
|
|
try:
|
|
return LDOpenGLImpl(
|
|
ld_folder=self.emulator_instance.emulator.abspath('./'),
|
|
instance_id=self.emulator_instance.LDPlayer_id,
|
|
)
|
|
except (LDOpenGLIncompatible, LDOpenGLError) as e:
|
|
logger.error(e)
|
|
logger.error('Unable to initialize LDOpenGL')
|
|
raise RequestHumanTakeover
|
|
|
|
def ldopengl_available(self) -> bool:
|
|
if not self.is_ldplayer_bluestacks_family:
|
|
return False
|
|
logger.attr('EmulatorInfo_Emulator', self.config.EmulatorInfo_Emulator)
|
|
if self.config.EmulatorInfo_Emulator not in ['LDPlayer9']:
|
|
return False
|
|
|
|
try:
|
|
_ = self.ldopengl
|
|
except RequestHumanTakeover:
|
|
return False
|
|
return True
|
|
|
|
def screenshot_ldopengl(self):
|
|
image = self.ldopengl.screenshot()
|
|
|
|
image = cv2.flip(image, 0)
|
|
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
|
|
return image
|
|
|
|
|
|
if __name__ == '__main__':
|
|
ld = LDOpenGLImpl('E:/ProgramFiles/LDPlayer9', instance_id=1)
|
|
for _ in range(5):
|
|
import time
|
|
|
|
start = time.time()
|
|
ld.screenshot()
|
|
print(time.time() - start)
|