use request to download file

This commit is contained in:
XYenon 2022-05-28 16:05:26 +08:00
parent 265959ddda
commit b86e558312

View File

@ -1,13 +1,15 @@
import logging import logging
import tempfile import tempfile
import urllib.request
from typing import IO, Optional from typing import IO, Optional
from urllib.error import ContentTooShortError, HTTPError, URLError from urllib.error import ContentTooShortError, HTTPError, URLError
import pilk import pilk
import pydub import pydub
import requests
from ehforwarderbot import Message, coordinator from ehforwarderbot import Message, coordinator
logger = logging.getLogger(__name__)
# created by JogleLew and jqqqqqqqqqq, optimized based on Tim's emoji support, updated by xzsk2 to mobileqq v8.8.11 # created by JogleLew and jqqqqqqqqqq, optimized based on Tim's emoji support, updated by xzsk2 to mobileqq v8.8.11
qq_emoji_list = { qq_emoji_list = {
0: "😮", 0: "😮",
@ -641,14 +643,15 @@ qq_sface_list = {
def cq_get_image(image_link: str) -> Optional[IO]: # Download image from QQ def cq_get_image(image_link: str) -> Optional[IO]: # Download image from QQ
file = tempfile.NamedTemporaryFile()
try: try:
urllib.request.urlretrieve(image_link, file.name) resp = requests.get(image_link)
except (URLError, HTTPError, ContentTooShortError) as e: except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Image download failed.") logger.warning("Image download failed.")
logging.getLogger(__name__).warning(str(e)) logger.warning(str(e))
return None return None
else: else:
file = tempfile.NamedTemporaryFile()
file.write(resp.content)
if file.seek(0, 2) <= 0: if file.seek(0, 2) <= 0:
raise EOFError("File downloaded is Empty") raise EOFError("File downloaded is Empty")
file.seek(0) file.seek(0)
@ -699,15 +702,14 @@ def param_spliter(str_param):
def download_file(download_url): def download_file(download_url):
file = tempfile.NamedTemporaryFile()
try: try:
opener = urllib.request.build_opener() resp = requests.get(download_url)
urllib.request.install_opener(opener)
urllib.request.urlretrieve(download_url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e: except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) logger.warning("Error occurs when downloading files: " + str(e))
return ("Error occurs when downloading files: ") + str(e) return ("Error occurs when downloading files: ") + str(e)
else: else:
file = tempfile.NamedTemporaryFile()
file.write(resp.content)
if file.seek(0, 2) <= 0: if file.seek(0, 2) <= 0:
raise EOFError("File downloaded is Empty") raise EOFError("File downloaded is Empty")
file.seek(0) file.seek(0)
@ -715,66 +717,63 @@ def download_file(download_url):
def download_user_avatar(uid: str): def download_user_avatar(uid: str):
file = tempfile.NamedTemporaryFile()
url = "https://q1.qlogo.cn/g?b=qq&nk={}&s=0".format(uid) url = "https://q1.qlogo.cn/g?b=qq&nk={}&s=0".format(uid)
try: try:
opener = urllib.request.build_opener() resp = requests.get(url)
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e: except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) logger.warning("Error occurs when downloading files: " + str(e))
return ("Error occurs when downloading files: ") + str(e) return ("Error occurs when downloading files: ") + str(e)
if file.seek(0, 2) <= 0: else:
raise EOFError("File downloaded is Empty") file = tempfile.NamedTemporaryFile()
file.seek(0) file.write(resp.content)
return file if file.seek(0, 2) <= 0:
raise EOFError("File downloaded is Empty")
file.seek(0)
return file
def download_group_avatar(uid: str): def download_group_avatar(uid: str):
file = tempfile.NamedTemporaryFile()
url = "https://p.qlogo.cn/gh/{}/{}/".format(uid, uid) url = "https://p.qlogo.cn/gh/{}/{}/".format(uid, uid)
try: try:
opener = urllib.request.build_opener() resp = requests.get(url)
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e: except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) logger.warning("Error occurs when downloading files: " + str(e))
return ("Error occurs when downloading files: ") + str(e) return ("Error occurs when downloading files: ") + str(e)
if file.seek(0, 2) <= 0: else:
raise EOFError("File downloaded is Empty") file = tempfile.NamedTemporaryFile()
file.seek(0) file.write(resp.content)
return file if file.seek(0, 2) <= 0:
raise EOFError("File downloaded is Empty")
file.seek(0)
return file
def download_voice(voice_url: str): def download_voice(voice_url: str):
origin_file = tempfile.NamedTemporaryFile()
try: try:
opener = urllib.request.build_opener() resp = requests.get(voice_url)
urllib.request.install_opener(opener)
urllib.request.urlretrieve(voice_url, origin_file.name)
except Exception as e: except Exception as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e)) logger.warning("Error occurs when downloading files: " + str(e))
origin_file.close()
raise e raise e
finally:
opener.close()
if origin_file.seek(0, 2) <= 0:
origin_file.close()
raise EOFError("File downloaded is Empty")
origin_file.seek(0)
silk_header = origin_file.read(10)
origin_file.seek(0)
if b"#!SILK_V3" in silk_header:
with tempfile.NamedTemporaryFile() as pcm_file:
pilk.decode(origin_file.name, pcm_file.name)
origin_file.close()
audio_file = tempfile.NamedTemporaryFile()
pydub.AudioSegment.from_raw(file=pcm_file, sample_width=2, frame_rate=24000, channels=1).export(
audio_file, format="ogg", codec="libopus", parameters=["-vbr", "on"]
)
else: else:
audio_file = origin_file origin_file = tempfile.NamedTemporaryFile()
return audio_file origin_file.write(resp.content)
if origin_file.seek(0, 2) <= 0:
origin_file.close()
raise EOFError("File downloaded is Empty")
origin_file.seek(0)
silk_header = origin_file.read(10)
origin_file.seek(0)
if b"#!SILK_V3" in silk_header:
with tempfile.NamedTemporaryFile() as pcm_file:
pilk.decode(origin_file.name, pcm_file.name)
origin_file.close()
audio_file = tempfile.NamedTemporaryFile()
pydub.AudioSegment.from_raw(file=pcm_file, sample_width=2, frame_rate=24000, channels=1).export(
audio_file, format="ogg", codec="libopus", parameters=["-vbr", "on"]
)
else:
audio_file = origin_file
return audio_file
def strf_time(seconds: int) -> str: def strf_time(seconds: int) -> str: