PagerMaid-Modify/pagermaid/bots/external.py

179 lines
6.2 KiB
Python
Raw Normal View History

""" PagerMaid features that uses external HTTP APIs other than Telegram. """
from pygoogletranslation import Translator
from os import remove
from magic_google import MagicGoogle
from gtts import gTTS
from gtts.tts import gTTSError
from re import compile as regex_compile
from pagermaid import log
from pagermaid.listener import listener, config
from pagermaid.utils import clear_emojis, attach_log, fetch_youtube_audio, lang, alias_command
@listener(is_plugin=False, incoming=True, command=alias_command('translate'),
description=lang('translate_des'),
parameters=lang('translate_parameters'))
async def translate(context):
""" PagerMaid universal translator. """
translator = Translator()
reply = await context.get_reply_message()
message = context.arguments
ap_lang = config['application_language']
if message:
pass
elif reply:
message = reply.text
else:
await context.reply(lang('arg_error'))
return
msg = await context.reply(lang('translate_processing'))
try:
try:
result = translator.translate(clear_emojis(message), dest=ap_lang)
except:
from translate import Translator as trans
result = trans(to_lang=ap_lang.replace('zh-cn', 'zh')).translate(clear_emojis(message))
except ValueError:
await msg.edit(lang('translate_ValueError'))
return
try:
source_lang = result.src
source_text = result.origin
trans_lang = result.dest
except AttributeError:
await msg.edit(lang('google_connection_error'))
return
result = f"**{lang('translate_hits')}**\n{lang('translate_original_lang')}: {source_lang}\n{source_text} -> {result.text}"
if len(result) > 4096:
await msg.edit(lang('translate_tg_limit_uploading_file'))
await attach_log(result, context.chat_id, "translation.txt", context.id)
return
await msg.edit(result)
if len(result) <= 4096:
await log(f"{lang('translate_get')}: `{source_text}` \n{lang('translate_from')} {source_lang} {lang('translate_to')} {trans_lang}")
else:
await log(f"{lang('translate_get')}{translate('translate_from')} {source_lang} {lang('translate_to')} {trans_lang}.")
@listener(is_plugin=False, incoming=True, command=alias_command('tts'),
description=lang('tts_des'),
parameters="<string>")
async def tts(context):
""" Send TTS stuff as voice message. """
reply = await context.get_reply_message()
message = context.arguments
ap_lang = config['application_tts']
if message:
pass
elif reply:
message = reply.text
else:
await context.reply(lang('arg_error'))
return
try:
gTTS(message, lang=ap_lang)
msg = await context.reply(lang('tts_processing'))
except AssertionError:
await context.reply(lang('tts_AssertionError'))
return
except ValueError:
await context.reply(lang('tts_ValueError'))
return
except RuntimeError:
await context.reply(lang('tts_RuntimeError'))
return
google_tts = gTTS(message, lang=ap_lang)
try:
google_tts.save("vocals.mp3")
except AssertionError:
await msg.edit(lang('tts_AssertionError'))
return
except ConnectionError:
await msg.edit(lang('tts_RuntimeError'))
return
except gTTSError:
await msg.edit(lang('tts_RuntimeError'))
return
with open("vocals.mp3", "rb") as audio:
line_list = list(audio)
line_count = len(line_list)
if line_count == 1:
google_tts = gTTS(message, lang=ap_lang)
google_tts.save("vocals.mp3")
with open("vocals.mp3", "r"):
try:
await context.client.send_file(context.chat_id, "vocals.mp3", voice_note=True)
remove("vocals.mp3")
except:
pass
if len(message) <= 4096:
await log(f"{lang('tts_success')}: `{message}`.")
else:
await log(lang('tts_success'))
await msg.delete()
@listener(is_plugin=False, incoming=True, command=alias_command('google'),
description=lang('google_des'),
parameters="<query>")
async def googletest(context):
""" Searches Google for a string. """
mg = MagicGoogle()
reply = await context.get_reply_message()
query = context.arguments
if query:
pass
elif reply:
query = reply.text
else:
await context.reply(lang('arg_error'))
return
query = query.replace(' ', '+')
msg = await context.reply(lang('google_processing'))
results = ""
for i in mg.search(query=query, num=int(config['result_length'])):
try:
title = i['text'][0:30] + '...'
link = i['url']
results += f"\n[{title}]({link}) \n"
except:
await msg.edit(lang('google_connection_error'))
return
await msg.edit(f"**Google** |`{query}`| 🎙 🔍 \n"
f"{results}",
link_preview=False)
await log(f"{lang('google_success')} `{query}`")
@listener(is_plugin=False, incoming=True, command=alias_command('fetchaudio'),
description=lang('fetchaudio_des'),
parameters="<url>,<string>")
async def fetchaudio(context):
if context.arguments:
if ',' in context.arguments:
url, string_2 = context.arguments.split(',', 1)
else:
url = context.arguments
string_2 = "#audio "
else:
await context.reply(lang('fetchaudio_error_grammer'))
return
""" Fetches audio from provided URL. """
reply = await context.get_reply_message()
reply_id = None
msg = await context.reply(lang('fetchaudio_processing'))
if reply:
reply_id = reply.id
if url is None:
await msg.edit(lang('arg_error'))
return
youtube_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?youtu(be|.be)?(\.com)?/.+")
if youtube_pattern.match(url):
if not await fetch_youtube_audio(url, context.chat_id, reply_id, string_2):
await msg.edit(lang('fetchaudio_error_downloading'))
await log(f"{lang('fetchaudio_success')}, {lang('fetchaudio_link')}: {url}.")
await msg.delete()