PagerMaid-Modify/pagermaid/modules/account.py
Xtao_dada d2f224d9cc
🐛⬆️ Fix user doesn't exist and update sentry version. (#133)
🐛 修复用户不存在时程序报错。
⬆️ 升级 sentry 依赖包版本号。
2021-09-24 22:30:48 +08:00

358 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

""" This module contains utils to configure your account. """
from os import remove
from telethon.errors import ImageProcessFailedError, PhotoCropSizeSmallError
from telethon.errors.rpcerrorlist import PhotoExtInvalidError, UsernameOccupiedError, AboutTooLongError, \
FirstNameInvalidError, UsernameInvalidError, UsernameNotModifiedError
from telethon.tl.functions.account import UpdateProfileRequest, UpdateUsernameRequest
from telethon.tl.functions.photos import DeletePhotosRequest, GetUserPhotosRequest, UploadProfilePhotoRequest
from telethon.tl.functions.users import GetFullUserRequest
from telethon.tl.functions.contacts import BlockRequest, UnblockRequest
from telethon.tl.types import InputPhoto, MessageMediaPhoto, MessageEntityMentionName, MessageEntityPhone
from struct import error as StructError
from pagermaid import bot, log
from pagermaid.utils import lang, alias_command
from pagermaid.listener import listener
@listener(is_plugin=False, outgoing=True, command=alias_command('username'),
description=lang('username_des'),
parameters="<username>")
async def username(context):
""" Reconfigure your username. """
if len(context.parameter) > 1:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
if len(context.parameter) == 1:
result = context.parameter[0]
else:
result = ""
try:
await bot(UpdateUsernameRequest(result))
except UsernameOccupiedError:
await context.edit(f"{lang('error_prefix')}{lang('username_exist')}")
return
except UsernameInvalidError:
await context.edit(f"{lang('error_prefix')}{lang('username_vaild')}")
return
except UsernameNotModifiedError:
await context.edit(f"{lang('error_prefix')}{lang('username_exist')}")
return
await context.edit(lang('username_set'))
if result == "":
await log(lang('username_cancel'))
return
await log(f"{lang('username_whatset')}`{result}`")
@listener(is_plugin=False, outgoing=True, command=alias_command('name'),
description=lang('name_des'),
parameters="<first name> <last name>")
async def name(context):
""" Updates your display name. """
if len(context.parameter) == 2:
first_name = context.parameter[0]
last_name = context.parameter[1]
elif len(context.parameter) == 1:
first_name = context.parameter[0]
last_name = " "
else:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
try:
await bot(UpdateProfileRequest(
first_name=first_name,
last_name=last_name))
except FirstNameInvalidError:
await context.edit(f"{lang('error_prefix')}{lang('name_vaild')}")
return
await context.edit(lang('name_set'))
if last_name != " ":
await log(f"{lang('name_whatset')}`{first_name} {last_name}`.")
else:
await log(f"{lang('name_whatset')}`{first_name}`.")
@listener(is_plugin=False, outgoing=True, command=alias_command('pfp'),
description=lang('pfp_des'))
async def pfp(context):
""" Sets your profile picture. """
reply = await context.get_reply_message()
photo = None
await context.edit(lang('pfp_process'))
if reply:
if reply.media:
if isinstance(reply.media, MessageMediaPhoto):
photo = await bot.download_media(message=reply.photo)
elif "image" in reply.media.document.mime_type.split('/'):
photo = await bot.download_file(reply.media.document)
else:
await context.edit(f"{lang('error_prefix')}{lang('pfp_e_notp')}")
if photo:
try:
await bot(UploadProfilePhotoRequest(
await bot.upload_file(photo)
))
try:
remove(photo)
except:
pass
await context.edit("头像修改成功啦 ~")
return
except PhotoCropSizeSmallError:
await context.edit(f"{lang('error_prefix')}{lang('pfp_e_size')}")
except ImageProcessFailedError:
await context.edit(f"{lang('error_prefix')}{lang('pfp_e_img')}")
except PhotoExtInvalidError:
await context.edit(f"{lang('error_prefix')}{lang('pfp_e_notp')}")
await context.edit(f"{lang('error_prefix')}{lang('pfp_e_notp')}")
return
@listener(is_plugin=False, outgoing=True, command=alias_command('bio'),
description="设置咱的公开情报",
parameters="<string>")
async def bio(context):
""" Sets your bio. """
try:
await bot(UpdateProfileRequest(about=context.arguments))
except AboutTooLongError:
await context.edit(f"{lang('error_prefix')}{lang('bio_too_lang')}")
return
await context.edit(lang('bio_set'))
if context.arguments == "":
await log(lang('bio_cancel'))
return
await log(f"{lang('bio_whatset')}`{context.arguments}`.")
@listener(is_plugin=False, outgoing=True, command=alias_command('rmpfp'),
description=lang('rmpfp_des'),
parameters=f"<{lang('int')}>")
async def rmpfp(context):
""" Removes your profile picture. """
group = context.text[8:]
if group == 'all':
limit = 0
elif group.isdigit():
limit = int(group)
else:
limit = 1
pfp_list = await bot(GetUserPhotosRequest(
user_id=context.from_id,
offset=0,
max_id=0,
limit=limit))
input_photos = []
for sep in pfp_list.photos:
input_photos.append(
InputPhoto(
id=sep.id,
access_hash=sep.access_hash,
file_reference=sep.file_reference
)
)
await bot(DeletePhotosRequest(id=input_photos))
await context.edit(f"`{lang('rmpfp_p')}{len(input_photos)} {lang('rmpfp_l')}`")
@listener(is_plugin=False, outgoing=True, command=alias_command('profile'),
description=lang('profile_des'),
parameters="<username>")
async def profile(context):
""" Queries profile of a user. """
if len(context.parameter) > 1:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
await context.edit(lang('profile_process'))
if context.reply_to_msg_id:
reply_message = await context.get_reply_message()
user = reply_message.from_id
target_user = await context.client(GetFullUserRequest(user))
else:
if len(context.parameter) == 1:
user = context.parameter[0]
if user.isnumeric():
user = int(user)
else:
user_object = await context.client.get_me()
user = user_object.id
if context.message.entities is not None:
if isinstance(context.message.entities[0], MessageEntityMentionName):
user = context.message.entities[0].user_id
elif isinstance(context.message.entities[0], MessageEntityPhone):
user = int(context.parameter[0])
else:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
try:
user_object = await context.client.get_entity(user)
target_user = await context.client(GetFullUserRequest(user_object.id))
except (TypeError, ValueError, OverflowError, StructError) as exception:
if str(exception).startswith("Cannot find any entity corresponding to"):
await context.edit(f"{lang('error_prefix')}{lang('profile_e_no')}")
return
if str(exception).startswith("No user has"):
await context.edit(f"{lang('error_prefix')}{lang('profile_e_nou')}")
return
if str(exception).startswith("Could not find the input entity for") or isinstance(exception, StructError):
await context.edit(f"{lang('error_prefix')}{lang('profile_e_nof')}")
return
if isinstance(exception, OverflowError):
await context.edit(f"{lang('error_prefix')}{lang('profile_e_long')}")
return
raise exception
user_type = "Bot" if target_user.user.bot else lang('profile_user')
username_system = f"@{target_user.user.username}" if target_user.user.username is not None else (
lang('profile_noset'))
if not target_user.user.first_name:
await context.edit(f"{lang('error_prefix')}{lang('profile_e_no')}")
return
first_name = target_user.user.first_name.replace("\u2060", "")
last_name = target_user.user.last_name.replace("\u2060", "") if target_user.user.last_name is not None else (
lang('profile_noset')
)
biography = target_user.about if target_user.about is not None else lang('profile_nobio')
verified = lang('profile_yes') if target_user.user.verified else lang('profile_no')
restricted = lang('profile_yes') if target_user.user.restricted else lang('profile_no')
caption = f"**{lang('profile_name')}:** \n" \
f"{lang('profile_username')}: {username_system} \n" \
f"ID: {target_user.user.id} \n" \
f"{lang('profile_fname')}: {first_name} \n" \
f"{lang('profile_lname')}: {last_name} \n" \
f"{lang('profile_bio')}: {biography} \n" \
f"{lang('profile_gic')}: {target_user.common_chats_count} \n" \
f"{lang('profile_verified')}: {verified} \n" \
f"{lang('profile_restricted')}: {restricted} \n" \
f"{lang('profile_type')}: {user_type} \n" \
f"[{first_name}](tg://user?id={target_user.user.id})"
photo = await context.client.download_profile_photo(
target_user.user.id,
"./" + str(target_user.user.id) + ".jpg",
download_big=True
)
try:
reply_to = context.message.reply_to_msg_id
try:
await context.client.send_file(
context.chat_id,
photo,
caption=caption,
link_preview=False,
force_document=False,
reply_to=reply_to
)
if not photo.startswith("http"):
remove(photo)
await context.delete()
try:
remove(photo)
except:
pass
return
except TypeError:
await context.edit(caption)
except:
try:
await context.client.send_file(
context.chat_id,
photo,
caption=caption,
link_preview=False,
force_document=False
)
if not photo.startswith("http"):
remove(photo)
await context.delete()
try:
remove(photo)
except:
pass
return
except TypeError:
await context.edit(caption)
@listener(is_plugin=False, outgoing=True, command=alias_command('block'),
description=lang('block_des'),
parameters="<username/uid/reply>")
async def block_user(context):
""" Block an user. """
if len(context.parameter) > 1:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
await context.edit(lang('block_process'))
if context.reply_to_msg_id:
reply_message = await context.get_reply_message()
user = reply_message.sender_id
else:
if len(context.parameter) == 1:
user = context.parameter[0]
if user.isnumeric():
user = int(user)
else:
user_object = await context.client.get_me()
user = user_object.id
if context.message.entities is not None:
if isinstance(context.message.entities[0], MessageEntityMentionName):
user = context.message.entities[0].user_id
else:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
result = None
if not user:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
try:
result = await context.client(BlockRequest(id=user))
except Exception:
pass
if result:
await context.edit(f"{lang('block_success')} `{user}`")
else:
await context.edit(f"`{user}` {lang('block_exist')}")
@listener(is_plugin=False, outgoing=True, command=alias_command('unblock'),
description=lang('unblock_des'),
parameters="<username/uid/reply>")
async def unblock_user(context):
""" Unblock an user. """
if len(context.parameter) > 1:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
await context.edit(lang('unblock_process'))
if context.reply_to_msg_id:
reply_message = await context.get_reply_message()
user = reply_message.from_id.user_id
else:
if len(context.parameter) == 1:
user = context.parameter[0]
if user.isnumeric():
user = int(user)
else:
user_object = await context.client.get_me()
user = user_object.id
if context.message.entities is not None:
if isinstance(context.message.entities[0], MessageEntityMentionName):
user = context.message.entities[0].user_id
else:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
result = None
if not user:
await context.edit(f"{lang('error_prefix')}{lang('arg_error')}")
return
try:
result = await context.client(UnblockRequest(id=user))
except Exception:
pass
if result:
await context.edit(f"{lang('unblock_success')} `{user}`")
else:
await context.edit(f"`{user}` {lang('unblock_exist')}")