diff --git a/efb_qq_plugin_go_cqhttp/Utils.py b/efb_qq_plugin_go_cqhttp/Utils.py index cd4aaf7..1cf082e 100644 --- a/efb_qq_plugin_go_cqhttp/Utils.py +++ b/efb_qq_plugin_go_cqhttp/Utils.py @@ -1,13 +1,15 @@ import logging import tempfile -import urllib.request from typing import IO, Optional from urllib.error import ContentTooShortError, HTTPError, URLError import pilk import pydub +import requests from ehforwarderbot import Message, coordinator +logger = logging.getLogger(__name__) + # created by JogleLew and jqqqqqqqqqq, optimized based on Tim's emoji support, updated by xzsk2 to mobileqq v8.8.11 qq_emoji_list = { 0: "😮", @@ -641,14 +643,15 @@ qq_sface_list = { def cq_get_image(image_link: str) -> Optional[IO]: # Download image from QQ - file = tempfile.NamedTemporaryFile() try: - urllib.request.urlretrieve(image_link, file.name) + resp = requests.get(image_link) except (URLError, HTTPError, ContentTooShortError) as e: - logging.getLogger(__name__).warning("Image download failed.") - logging.getLogger(__name__).warning(str(e)) + logger.warning("Image download failed.") + logger.warning(str(e)) return None else: + file = tempfile.NamedTemporaryFile() + file.write(resp.content) if file.seek(0, 2) <= 0: raise EOFError("File downloaded is Empty") file.seek(0) @@ -699,15 +702,14 @@ def param_spliter(str_param): def download_file(download_url): - file = tempfile.NamedTemporaryFile() try: - opener = urllib.request.build_opener() - urllib.request.install_opener(opener) - urllib.request.urlretrieve(download_url, file.name) + resp = requests.get(download_url) except (URLError, HTTPError, ContentTooShortError) as e: - logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) + logger.warning("Error occurs when downloading files: " + str(e)) return ("Error occurs when downloading files: ") + str(e) else: + file = tempfile.NamedTemporaryFile() + file.write(resp.content) if file.seek(0, 2) <= 0: raise EOFError("File downloaded is Empty") file.seek(0) @@ -715,66 +717,63 @@ def download_file(download_url): def download_user_avatar(uid: str): - file = tempfile.NamedTemporaryFile() url = "https://q1.qlogo.cn/g?b=qq&nk={}&s=0".format(uid) try: - opener = urllib.request.build_opener() - urllib.request.install_opener(opener) - urllib.request.urlretrieve(url, file.name) + resp = requests.get(url) except (URLError, HTTPError, ContentTooShortError) as e: - logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) + logger.warning("Error occurs when downloading files: " + str(e)) return ("Error occurs when downloading files: ") + str(e) - if file.seek(0, 2) <= 0: - raise EOFError("File downloaded is Empty") - file.seek(0) - return file + else: + file = tempfile.NamedTemporaryFile() + file.write(resp.content) + if file.seek(0, 2) <= 0: + raise EOFError("File downloaded is Empty") + file.seek(0) + return file def download_group_avatar(uid: str): - file = tempfile.NamedTemporaryFile() url = "https://p.qlogo.cn/gh/{}/{}/".format(uid, uid) try: - opener = urllib.request.build_opener() - urllib.request.install_opener(opener) - urllib.request.urlretrieve(url, file.name) + resp = requests.get(url) except (URLError, HTTPError, ContentTooShortError) as e: - logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) + logger.warning("Error occurs when downloading files: " + str(e)) return ("Error occurs when downloading files: ") + str(e) - if file.seek(0, 2) <= 0: - raise EOFError("File downloaded is Empty") - file.seek(0) - return file + else: + file = tempfile.NamedTemporaryFile() + file.write(resp.content) + if file.seek(0, 2) <= 0: + raise EOFError("File downloaded is Empty") + file.seek(0) + return file def download_voice(voice_url: str): - origin_file = tempfile.NamedTemporaryFile() try: - opener = urllib.request.build_opener() - urllib.request.install_opener(opener) - urllib.request.urlretrieve(voice_url, origin_file.name) + resp = requests.get(voice_url) except Exception as e: - logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) - origin_file.close() + logger.warning("Error occurs when downloading files: " + str(e)) raise e - finally: - opener.close() - if origin_file.seek(0, 2) <= 0: - origin_file.close() - raise EOFError("File downloaded is Empty") - origin_file.seek(0) - silk_header = origin_file.read(10) - origin_file.seek(0) - if b"#!SILK_V3" in silk_header: - with tempfile.NamedTemporaryFile() as pcm_file: - pilk.decode(origin_file.name, pcm_file.name) - origin_file.close() - audio_file = tempfile.NamedTemporaryFile() - pydub.AudioSegment.from_raw(file=pcm_file, sample_width=2, frame_rate=24000, channels=1).export( - audio_file, format="ogg", codec="libopus", parameters=["-vbr", "on"] - ) else: - audio_file = origin_file - return audio_file + origin_file = tempfile.NamedTemporaryFile() + origin_file.write(resp.content) + if origin_file.seek(0, 2) <= 0: + origin_file.close() + raise EOFError("File downloaded is Empty") + origin_file.seek(0) + silk_header = origin_file.read(10) + origin_file.seek(0) + if b"#!SILK_V3" in silk_header: + with tempfile.NamedTemporaryFile() as pcm_file: + pilk.decode(origin_file.name, pcm_file.name) + origin_file.close() + audio_file = tempfile.NamedTemporaryFile() + pydub.AudioSegment.from_raw(file=pcm_file, sample_width=2, frame_rate=24000, channels=1).export( + audio_file, format="ogg", codec="libopus", parameters=["-vbr", "on"] + ) + else: + audio_file = origin_file + return audio_file def strf_time(seconds: int) -> str: