PamGram/core/base/aiobrowser.py

42 lines
1.4 KiB
Python
Raw Normal View History

from typing import Optional
from playwright.async_api import Browser, Playwright, async_playwright
from core.service import Service
from utils.log import logger
class AioBrowser(Service):
def __init__(self, loop=None):
self.browser: Optional[Browser] = None
self._playwright: Optional[Playwright] = None
self._loop = loop
async def start(self):
if self._playwright is None:
logger.info("正在尝试启动 [blue]Playwright[/]", extra={'markup': True})
self._playwright = await async_playwright().start()
logger.success("[blue]Playwright[/] 启动成功", extra={'markup': True})
if self.browser is None:
logger.info("正在尝试启动 [blue]Browser[/]", extra={'markup': True})
try:
self.browser = await self._playwright.chromium.launch(timeout=5000)
logger.success("[blue]Browser[/] 启动成功", extra={'markup': True})
except TimeoutError as err:
logger.warning("[blue]Browser[/] 启动失败", extra={'markup': True})
raise err
return self.browser
async def stop(self):
if self.browser is not None:
await self.browser.close()
if self._playwright is not None:
await self._playwright.stop()
async def get_browser(self) -> Browser:
if self.browser is None:
await self.start()
return self.browser