import os from functools import wraps import lz4.block from adbutils.errors import AdbError from module.base.utils import * from module.device.connection import Connection from module.device.method.utils import (RETRY_TRIES, retry_sleep, handle_adb_error, ImageTruncated) from module.exception import RequestHumanTakeover, ScriptError from module.logger import logger class AscreencapError(Exception): pass def retry(func): @wraps(func) def retry_wrapper(self, *args, **kwargs): """ Args: self (AScreenCap): """ init = None for _ in range(RETRY_TRIES): try: if callable(init): retry_sleep(_) init() return func(self, *args, **kwargs) # Can't handle except RequestHumanTakeover: break # When adb server was killed except ConnectionResetError as e: logger.error(e) def init(): self.adb_reconnect() # When ascreencap is not installed except AscreencapError as e: logger.error(e) def init(): self.ascreencap_init() # AdbError except AdbError as e: if handle_adb_error(e): def init(): self.adb_reconnect() else: break # ImageTruncated except ImageTruncated as e: logger.error(e) def init(): pass # Unknown except Exception as e: logger.exception(e) def init(): pass logger.critical(f'Retry {func.__name__}() failed') raise RequestHumanTakeover return retry_wrapper class AScreenCap(Connection): __screenshot_method = [0, 1, 2] __screenshot_method_fixed = [0, 1, 2] __bytepointer = 0 ascreencap_available = True def ascreencap_init(self): logger.hr('aScreenCap init') self.__bytepointer = 0 self.ascreencap_available = True arc = self.cpu_abi sdk = self.sdk_ver logger.info(f'cpu_arc: {arc}, sdk_ver: {sdk}') if sdk in range(21, 26): ver = "Android_5.x-7.x" elif sdk in range(26, 28): ver = "Android_8.x" elif sdk == 28: ver = "Android_9.x" else: ver = "0" filepath = os.path.join(self.config.ASCREENCAP_FILEPATH_LOCAL, ver, arc, 'ascreencap') if not os.path.exists(filepath): self.ascreencap_available = False logger.error('No suitable version of aScreenCap lib available for this device, ' 'please use other screenshot methods instead') raise RequestHumanTakeover logger.info(f'pushing {filepath}') self.adb_push(filepath, self.config.ASCREENCAP_FILEPATH_REMOTE) logger.info(f'chmod 0777 {self.config.ASCREENCAP_FILEPATH_REMOTE}') self.adb_shell(['chmod', '0777', self.config.ASCREENCAP_FILEPATH_REMOTE]) def uninstall_ascreencap(self): logger.info('Removing ascreencap') self.adb_shell(['rm', self.config.ASCREENCAP_FILEPATH_REMOTE]) def _ascreencap_reposition_byte_pointer(self, byte_array): """Method to return the sanitized version of ascreencap stdout for devices that suffers from linker warnings. The correct pointer location will be saved for subsequent screen refreshes """ while byte_array[self.__bytepointer:self.__bytepointer + 4] != b'BMZ1': self.__bytepointer += 1 if self.__bytepointer >= len(byte_array): text = 'Repositioning byte pointer failed, corrupted aScreenCap data received' logger.warning(text) if len(byte_array) < 500: logger.warning(f'Unexpected screenshot: {byte_array}') raise AscreencapError(text) return byte_array[self.__bytepointer:] def __load_screenshot(self, screenshot, method): if method == 0: return screenshot elif method == 1: return screenshot.replace(b'\r\n', b'\n') elif method == 2: return screenshot.replace(b'\r\r\n', b'\n') else: raise ScriptError(f'Unknown method to load screenshots: {method}') def __uncompress(self, screenshot): raw_compressed_data = self._ascreencap_reposition_byte_pointer(screenshot) # See headers in: # https://github.com/ClnViewer/Android-fast-screen-capture#streamimage-compressed---header-format-using compressed_data_header = np.frombuffer(raw_compressed_data[0:20], dtype=np.uint32) if compressed_data_header[0] != 828001602: compressed_data_header = compressed_data_header.byteswap() if compressed_data_header[0] != 828001602: text = f'aScreenCap header verification failure, corrupted image received. ' \ f'HEADER IN HEX = {compressed_data_header.tobytes().hex()}' logger.warning(text) raise AscreencapError(text) _, uncompressed_size, _, width, height = compressed_data_header channel = 3 data = lz4.block.decompress(raw_compressed_data[20:], uncompressed_size=uncompressed_size) image = np.frombuffer(data, dtype=np.uint8) if image is None: raise ImageTruncated('Empty image after reading from buffer') # Equivalent to cv2.imdecode() try: image = image[-int(width * height * channel):].reshape(height, width, channel) except ValueError as e: # ValueError: cannot reshape array of size 0 into shape (720,1280,4) raise ImageTruncated(str(e)) image = cv2.flip(image, 0) if image is None: raise ImageTruncated('Empty image after cv2.flip') image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if image is None: raise ImageTruncated('Empty image after cv2.cvtColor') return image def __process_screenshot(self, screenshot): for method in self.__screenshot_method_fixed: try: result = self.__load_screenshot(screenshot, method=method) result = self.__uncompress(result) self.__screenshot_method_fixed = [method] + self.__screenshot_method return result except lz4.block.LZ4BlockError: self.__bytepointer = 0 continue self.__screenshot_method_fixed = self.__screenshot_method if len(screenshot) < 500: logger.warning(f'Unexpected screenshot: {screenshot}') raise OSError(f'cannot load screenshot') @retry def screenshot_ascreencap(self): content = self.adb_shell([self.config.ASCREENCAP_FILEPATH_REMOTE, '--pack', '2', '--stdout'], stream=True) return self.__process_screenshot(content) @retry def screenshot_ascreencap_nc(self): data = self.adb_shell_nc([self.config.ASCREENCAP_FILEPATH_REMOTE, '--pack', '2', '--stdout']) if len(data) < 500: logger.warning(f'Unexpected screenshot: {data}') return self.__uncompress(data)