import contextlib
from typing import List, Optional
from datetime import datetime
from pyrogram.enums import ChatType
from pyrogram.raw.functions.account import GetAuthorizations, ResetAuthorization
from pyrogram.raw.types import Authorization
from pagermaid.enums import Message
from pagermaid.listener import listener
from pagermaid.services import bot
from pagermaid.utils import alias_command
async def get_all_session() -> List[Authorization]:
data = await bot.invoke(GetAuthorizations())
return data.authorizations
async def filter_session(hash_start: str) -> Optional[Authorization]:
try:
hash_start = int(hash_start)
if len(str(hash_start)) != 6 and hash_start != 0:
return None
except ValueError:
return None
return next(
(
session
for session in await get_all_session()
if str(session.hash).startswith(str(hash_start))
),
None,
)
async def kick_session(session: Authorization) -> bool:
if session.hash != 0:
with contextlib.suppress(Exception):
return await bot.invoke(ResetAuthorization(hash=session.hash))
return False
def format_timestamp(timestamp: int) -> str:
datetime_obj = datetime.fromtimestamp(timestamp)
return datetime_obj.strftime("%Y-%m-%d %H:%M:%S")
def format_session(session: Authorization, private: bool = True) -> str:
text = (
f"标识符:{str(session.hash)[:6]}
\n"
f"设备型号:{session.device_model}
\n"
f"设备平台:{session.platform}
\n"
f"系统版本:{session.system_version}
\n"
f"应用名称:{session.app_name}
\n"
f"应用版本:{session.app_version}
\n"
f"官方应用:{'是' if session.official_app else '否'}
\n"
f"登录时间:{format_timestamp(session.date_created)}
\n"
f"在线时间:{format_timestamp(session.date_active)}
"
)
if private:
text += (
f"\nIP:{session.ip}
\n" f"地理位置:{session.country}
"
)
if session.hash != 0:
text += f"\n\n使用命令 ,{alias_command('session')} 注销 {str(session.hash)[:6]}
可以注销此会话。"
return text
async def count_platform(private: bool = True) -> str:
sessions = await get_all_session()
if not sessions:
return "无任何在线设备?"
platform_count = {}
text = f"共有 {len(sessions)} 台设备在线,分别是:\n\n"
for session in sessions:
if session.platform in platform_count:
platform_count[session.platform] += 1
else:
platform_count[session.platform] = 1
text += f"{str(session.hash)[:6]}
- {session.device_model}
"
if private:
text += f" - {session.app_name}
"
text += f"\n"
text += "\n"
text += "\n".join(
f"{platform}:{count} 台" for platform, count in platform_count.items()
)
return text
@listener(
command="session", need_admin=True, parameters="注销/查询", description="注销/查询已登录的会话"
)
async def session_manage(message: Message):
if not message.arguments:
return await message.edit(
await count_platform(
private=message.chat.type in [ChatType.PRIVATE, ChatType.BOT]
)
)
if len(message.parameter) != 2:
return await message.edit_text("请输入 `注销/查询 标识符` 来查询或注销会话")
if message.parameter[0] == "查询":
session = await filter_session(message.parameter[1])
if session:
return await message.edit(
format_session(
session,
private=message.chat.type in [ChatType.PRIVATE, ChatType.BOT],
)
)
return await message.edit_text("请输入正确的标识符!")
if message.parameter[0] == "注销":
session = await filter_session(message.parameter[1])
if session:
success = await kick_session(session)
return await message.edit("注销成功!" if success else "注销失败!")
return await message.edit_text("请输入正确的标识符!")
return await message.edit_text("请输入 `注销/查询 标识符` 来查询或注销会话")