PagerMaid-Modify/pagermaid/modules/sticker.py
Xtao_dada dc70679fe8
🐛📈 Support sticker set search, fix a analytic bug. (#109)
 支持关键词搜索贴纸包。
🐛📈 修复用户体验计划无法关闭的问题。
2021-07-18 13:52:17 +08:00

545 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

""" PagerMaid module to handle sticker collection. """
import certifi
import ssl
import requests
from bs4 import BeautifulSoup
from asyncio import sleep
from os import remove
from urllib import request
from io import BytesIO
from telethon.tl.types import DocumentAttributeFilename, MessageMediaPhoto
from telethon.tl.functions.contacts import UnblockRequest
from telethon.errors.common import AlreadyInConversationError
from PIL import Image, ImageOps
from math import floor
from pagermaid import bot, redis, redis_status
from pagermaid.listener import listener
from pagermaid.utils import lang, alias_command
from pagermaid import log
@listener(is_plugin=False, outgoing=True, command=alias_command("s"),
description=lang('sticker_des'),
parameters="<emoji>")
async def sticker(context):
""" Fetches images/stickers and add them to your pack. """
# 首先解封 sticker Bot
try:
await context.client(UnblockRequest(id=429000))
except:
pass
pic_round = False
is_batch = False
package_name = ""
if redis_status():
if redis.get("sticker.round"):
pic_round = True
if len(context.parameter) >= 1:
# s merge
await log(f"{context.parameter}")
if context.parameter[0] == "merge" or context.parameter[0] == "m":
is_batch = True
# s merge png <package_name> <number>
try:
if context.parameter[3].isnumeric():
if "png" in context.parameter[1]:
pic_round = False
else:
pic_round = True
package_name = context.parameter[2]
except:
# 异常,多半是数组越界,不处理,继续参数校验
pass
try:
# s merge <package_name> <number>
if context.parameter[2].isnumeric():
if "png" in context.parameter[1]:
pic_round = False
package_name = context.parameter[2]
else:
package_name = context.parameter[1]
# s merge png <package_name>
elif "png" in context.parameter[1]:
pic_round = False
package_name = context.parameter[2]
# s merge <package_name> <number>
else:
package_name = context.parameter[1]
except:
# 异常,多半是数组越界
# s merge <package_name>
try:
if "png" in context.parameter[1]:
raise Exception()
package_name = context.parameter[1]
except:
# 命令错误
try:
await context.edit(lang('merge_command_error'))
except:
pass
return
# s <png | number> | error
else:
if context.parameter[0] == "set_round":
if pic_round:
redis.delete("sticker.round")
try:
await context.edit(lang('us_change_rounding_false'))
except:
pass
else:
redis.set("sticker.round", "true")
try:
await context.edit(lang('us_change_rounding_true'))
except:
pass
return
elif "png" in context.parameter[0]:
pic_round = False
# s <number>
elif context.parameter[0].isnumeric():
pass
elif isEmoji(context.parameter[0]) or len(context.parameter[0]) == 1:
await log(f"emoji{context.parameter[0]}")
pass
else:
try:
await context.edit(lang('merge_command_error'))
except:
pass
return
user = await bot.get_me()
if not user.username:
user.username = user.first_name
custom_emoji = False
animated = False
emoji = ""
if is_batch:
# 多张
""" merge every single sticker after the message you replied to. """
if not context.reply_to_msg_id:
await context.edit(lang('not_reply'))
return
input_chat = await context.get_input_chat()
count = 0
scount = 0
result = ""
if context.parameter[0] == "m":
message = await context.get_reply_message()
await single_sticker(animated, context, custom_emoji, emoji, message, pic_round, user,
package_name)
else:
async for message in context.client.iter_messages(input_chat, min_id=context.reply_to_msg_id):
count += 1
if message and message.media:
scount += 1
try:
await log(f"{lang('merge_processing_left')}{count}{lang('merge_processing_right')}")
await context.edit(f"{lang('merge_processing_left')}{count}{lang('merge_processing_right')}")
except:
pass
result = await single_sticker(animated, context, custom_emoji, emoji, message, pic_round, user,
package_name)
await sleep(.5)
try:
await context.edit(f"{result}\n"
f"{lang('merge_total_processed_left')}{scount}{lang('merge_total_processed_right')}", parse_mode='md')
except:
pass
await sleep(5)
try:
await context.delete()
except:
pass
else:
# 单张收集图片
message = await context.get_reply_message()
await single_sticker(animated, context, custom_emoji, emoji, message, pic_round, user, "")
async def single_sticker(animated, context, custom_emoji, emoji, message, pic_round, user, package_name):
try:
await context.edit(lang('sticker_processing'))
except:
pass
if message and message.media:
try:
temp = message.media.document.mime_type.split('/')
except AttributeError:
try:
await context.edit(lang('sticker_type_not_support'))
except:
pass
if isinstance(message.media, MessageMediaPhoto):
photo = BytesIO()
photo = await bot.download_media(message.photo, photo)
elif "image" in message.media.document.mime_type.split('/'):
photo = BytesIO()
try:
await context.edit(lang('sticker_downloading'))
except:
pass
await bot.download_file(message.media.document, photo)
if (DocumentAttributeFilename(file_name='sticker.webp') in
message.media.document.attributes):
emoji = message.media.document.attributes[1].alt
custom_emoji = True
elif (DocumentAttributeFilename(file_name='AnimatedSticker.tgs') in
message.media.document.attributes):
photo = BytesIO()
await bot.download_file(message.media.document, "AnimatedSticker.tgs")
for index in range(len(message.media.document.attributes)):
try:
emoji = message.media.document.attributes[index].alt
break
except:
pass
custom_emoji = True
animated = True
photo = 1
else:
try:
await context.edit(lang('sticker_type_not_support'))
except:
pass
return
else:
try:
await context.edit(lang('sticker_reply_not_sticker'))
except:
pass
return
if photo:
split_strings = context.text.split()
if not custom_emoji:
emoji = "👀"
pack = 1
sticker_already = False
if package_name:
# 批量处理贴纸无法指定emoji只获取第几个pack
# s merge png <package_name> <number>
if len(split_strings) == 5:
pack = split_strings[4]
# s merge <package_name> <number>
elif len(split_strings) == 4:
pack = split_strings[3]
else:
if len(split_strings) == 3:
# s png <number|emoji>
pack = split_strings[2]
if split_strings[1].replace("png", "") != "":
emoji = split_strings[1].replace("png", "")
elif len(split_strings) == 2:
# s <number|emoji>
if split_strings[1].isnumeric():
pack = int(split_strings[1])
else:
if split_strings[1].replace("png", "") != "":
emoji = split_strings[1].replace("png", "")
if not isinstance(pack, int):
pack = 1
if package_name:
# merge指定package_name
pack_name = f"{user.username}_{package_name}_{pack}"
pack_title = f"@{user.username} {lang('sticker_pack_title')} ({package_name}) ({pack})"
else:
pack_name = f"{user.username}_{pack}"
pack_title = f"@{user.username} {lang('sticker_pack_title')} ({pack})"
command = '/newpack'
file = BytesIO()
if not animated:
try:
await context.edit(lang('sticker_resizing'))
except:
pass
image = await resize_image(photo)
if pic_round:
try:
await context.edit(lang('us_static_rounding'))
except:
pass
image = await rounded_image(image)
file.name = "sticker.png"
image.save(file, "PNG")
else:
pack_name += "_animated"
pack_title += " (animated)"
command = '/newanimated'
try:
response = request.urlopen(
request.Request(f'http://t.me/addstickers/{pack_name}'),
context=ssl.create_default_context(cafile=certifi.where()))
except UnicodeEncodeError:
pack_name = 's' + hex(context.sender_id)[2:]
if animated:
pack_name = 's' + hex(context.sender_id)[2:] + '_animated'
response = request.urlopen(
request.Request(f'http://t.me/addstickers/{pack_name}'),
context=ssl.create_default_context(cafile=certifi.where()))
if not response.status == 200:
try:
await context.edit(lang('sticker_telegram_server_error'))
except:
pass
return
http_response = response.read().decode("utf8").split('\n')
if " A <strong>Telegram</strong> user has created the <strong>Sticker&nbsp;Set</strong>." not in \
http_response:
for _ in range(20): # 最多重试20次
try:
async with bot.conversation('Stickers') as conversation:
await conversation.send_message('/addsticker')
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.send_message(pack_name)
chat_response = await conversation.get_response()
while chat_response.text == "Whoa! That's probably enough stickers for one pack, give it a break. \
A pack can't have more than 120 stickers at the moment.":
pack += 1
if package_name:
# merge指定package_name
pack_name = f"{user.username}_{package_name}_{pack}"
pack_title = f"@{user.username} {lang('sticker_pack_title')} ({package_name}) ({pack})"
else:
pack_name = f"{user.username}_{pack}"
pack_title = f"@{user.username} {lang('sticker_pack_title')} ({pack})"
try:
if package_name:
await context.edit(
lang('sticker_change_pack_to') + str(package_name) + str(pack) + lang(
'sticker_last_is_full'))
else:
await context.edit(
lang('sticker_change_pack_to') + str(pack) + lang('sticker_last_is_full'))
except:
pass
await conversation.send_message(pack_name)
chat_response = await conversation.get_response()
if chat_response.text == "Invalid pack selected.":
await add_sticker(conversation, command, pack_title, pack_name, animated, message,
context, file, emoji)
try:
await context.edit(
f"{lang('sticker_has_been_added')} [{lang('sticker_this')}](t.me/addstickers/{pack_name}) {lang('sticker_pack')}",
parse_mode='md')
except:
pass
return
await upload_sticker(animated, message, context, file, conversation)
await conversation.get_response()
await conversation.send_message(emoji)
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.get_response()
await conversation.send_message('/done')
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
break
except AlreadyInConversationError:
if not sticker_already:
try:
await context.edit(lang('sticker_another_running'))
except:
pass
sticker_already = True
else:
pass
await sleep(.5)
except Exception:
raise
else:
try:
await context.edit(lang('sticker_no_pack_exist_creating'))
except:
pass
async with bot.conversation('Stickers') as conversation:
await add_sticker(conversation, command, pack_title, pack_name, animated, message,
context, file, emoji)
try:
await context.edit(
f"{lang('sticker_has_been_added')} [{lang('sticker_this')}](t.me/addstickers/{pack_name}) {lang('sticker_pack')}",
parse_mode='md')
except:
pass
if package_name:
return f"{lang('sticker_has_been_added')} [{lang('sticker_this')}](t.me/addstickers/{pack_name}) {lang('sticker_pack')}"
else:
await sleep(5)
try:
await context.delete()
except:
pass
async def add_sticker(conversation, command, pack_title, pack_name, animated, message, context, file, emoji):
await conversation.send_message(command)
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.send_message(pack_title)
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
await upload_sticker(animated, message, context, file, conversation)
await conversation.get_response()
await conversation.send_message(emoji)
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.get_response()
await conversation.send_message("/publish")
if animated:
await conversation.get_response()
await conversation.send_message(f"<{pack_title}>")
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.send_message("/skip")
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.get_response()
await conversation.send_message(pack_name)
await bot.send_read_acknowledge(conversation.chat_id)
await conversation.get_response()
await bot.send_read_acknowledge(conversation.chat_id)
async def upload_sticker(animated, message, context, file, conversation):
if animated:
try:
await context.edit(lang('us_animated_uploading'))
except:
pass
await conversation.send_file("AnimatedSticker.tgs", force_document=True)
remove("AnimatedSticker.tgs")
else:
file.seek(0)
try:
await context.edit(lang('us_static_uploading'))
except:
pass
await conversation.send_file(file, force_document=True)
async def resize_image(photo):
image = Image.open(photo)
maxsize = (512, 512)
if (image.width and image.height) < 512:
size1 = image.width
size2 = image.height
if image.width > image.height:
scale = 512 / size1
size1new = 512
size2new = size2 * scale
else:
scale = 512 / size2
size1new = size1 * scale
size2new = 512
size1new = floor(size1new)
size2new = floor(size2new)
size_new = (size1new, size2new)
image = image.resize(size_new)
else:
image.thumbnail(maxsize)
return image
async def rounded_image(image):
w = image.width
h = image.height
resize_size = 0
# 比较长宽
if w > h:
resize_size = h
else:
resize_size = w
half_size = floor(resize_size / 2)
# 获取圆角模版切割成4个角
tl = (0, 0, 256, 256)
tr = (256, 0, 512, 256)
bl = (0, 256, 256, 512)
br = (256, 256, 512, 512)
border = Image.open('pagermaid/static/images/rounded.png').convert('L')
tlp = border.crop(tl)
trp = border.crop(tr)
blp = border.crop(bl)
brp = border.crop(br)
# 缩放四个圆角
tlp = tlp.resize((half_size, half_size))
trp = trp.resize((half_size, half_size))
blp = blp.resize((half_size, half_size))
brp = brp.resize((half_size, half_size))
# 扩展四个角大小到目标图大小
# tlp = ImageOps.expand(tlp, (0, 0, w - tlp.width, h - tlp.height))
# trp = ImageOps.expand(trp, (w - trp.width, 0, 0, h - trp.height))
# blp = ImageOps.expand(blp, (0, h - blp.height, w - blp.width, 0))
# brp = ImageOps.expand(brp, (w - brp.width, h - brp.height, 0, 0))
# 四个角合并到一张新图上
ni = Image.new('RGB', (w, h), (0, 0, 0)).convert('L')
ni.paste(tlp, (0, 0))
ni.paste(trp, (w - trp.width, 0))
ni.paste(blp, (0, h - blp.height))
ni.paste(brp, (w - brp.width, h - brp.height))
# 合并圆角和原图
image.putalpha(ImageOps.invert(ni))
return image
def isEmoji(content):
if not content:
return False
if u"\U0001F600" <= content <= u"\U0001F64F":
return True
elif u"\U0001F300" <= content <= u"\U0001F5FF":
return True
elif u"\U0001F680" <= content <= u"\U0001F6FF":
return True
elif u"\U0001F1E0" <= content <= u"\U0001F1FF":
return True
else:
return False
@listener(is_plugin=False, outgoing=True, command=alias_command("sticker"),
description=lang('sticker_search_des'),
parameters="<query>")
async def sticker_search(context):
if len(context.parameter) == 0:
await context.edit(lang('arg_error'))
return
await context.edit(lang('google_processing'))
query = context.parameter[0]
try:
html = requests.get("https://combot.org/telegram/stickers?q=" + query).text
except:
return await context.edit(lang('sticker_telegram_server_error'))
xml = BeautifulSoup(html, "lxml")
titles = xml.find_all("div", "sticker-pack__title")
links = xml.find_all("a", target="_blank")
if not links:
return await context.edit(lang('sticker_search_no'))
text = f"{lang('sticker_search_result')}\n"
for title, link in zip(titles, links):
temp = title.get_text()
temp1 = link["href"]
temp2 = f"\n[{temp}]({temp1})"
if temp2 not in text:
text += temp2
await context.edit(text)