diff --git a/list.json b/list.json
index 056c6c6..d3f5058 100644
--- a/list.json
+++ b/list.json
@@ -329,6 +329,16 @@
"supported": true,
"des-short": "仅联系人可以发送语音消息给我。",
"des": "仅联系人可以发送语音消息给我。"
+ },
+ {
+ "name": "session",
+ "version": "1.0",
+ "section": "profile",
+ "maintainer": "xtaodada",
+ "size": "4.36 kb",
+ "supported": true,
+ "des-short": "注销/查询已登录的会话。",
+ "des": "注销/查询已登录的会话。\n指令:,session"
}
]
}
diff --git a/session/main.py b/session/main.py
new file mode 100644
index 0000000..d23259f
--- /dev/null
+++ b/session/main.py
@@ -0,0 +1,104 @@
+import contextlib
+
+from typing import List, Optional
+
+from datetime import datetime
+
+from pyrogram.enums import ChatType
+from pyrogram.raw.functions.account import GetAuthorizations, ResetAuthorization
+from pyrogram.raw.types import Authorization
+
+from pagermaid.listener import listener
+from pagermaid.single_utils import Message
+from pagermaid.utils import alias_command
+from pagermaid import bot
+
+
+async def get_all_session() -> List[Authorization]:
+ data = await bot.invoke(GetAuthorizations())
+ return data.authorizations
+
+
+async def filter_session(hash_start: str) -> Optional[Authorization]:
+ try:
+ hash_start = int(hash_start)
+ if len(str(hash_start)) != 6 and hash_start != 0:
+ return None
+ except ValueError:
+ return None
+ return next((session for session in await get_all_session() if str(session.hash).startswith(str(hash_start))), None)
+
+
+async def kick_session(session: Authorization) -> bool:
+ if session.hash != 0:
+ with contextlib.suppress(Exception):
+ return await bot.invoke(ResetAuthorization(hash=session.hash))
+ return False
+
+
+def format_timestamp(timestamp: int) -> str:
+ datetime_obj = datetime.fromtimestamp(timestamp)
+ return datetime_obj.strftime("%Y-%m-%d %H:%M:%S")
+
+
+def format_session(session: Authorization, private: bool = True) -> str:
+ text = f"标识符:{str(session.hash)[:6]}
\n" \
+ f"设备型号:{session.device_model}
\n" \
+ f"设备平台:{session.platform}
\n" \
+ f"系统版本:{session.system_version}
\n" \
+ f"应用名称:{session.app_name}
\n" \
+ f"应用版本:{session.app_version}
\n" \
+ f"官方应用:{'是' if session.official_app else '否'}
\n" \
+ f"登录时间:{format_timestamp(session.date_created)}
\n" \
+ f"在线时间:{format_timestamp(session.date_active)}
"
+ if private:
+ text += f"\nIP:{session.ip}
\n" \
+ f"地理位置:{session.country}
"
+ if session.hash != 0:
+ text += f"\n\n使用命令 ,{alias_command('session')} 注销 {str(session.hash)[:6]}
可以注销此会话。"
+ return text
+
+
+async def count_platform(private: bool = True) -> str:
+ sessions = await get_all_session()
+ if not sessions:
+ return "无任何在线设备?"
+ platform_count = {}
+ text = f"共有 {len(sessions)} 台设备在线,分别是:\n\n"
+ for session in sessions:
+ if session.platform in platform_count:
+ platform_count[session.platform] += 1
+ else:
+ platform_count[session.platform] = 1
+ text += f"{str(session.hash)[:6]}
- {session.device_model}
"
+ if private:
+ text += f" - {session.app_name}
"
+ text += f"\n"
+ text += "\n"
+ text += "\n".join(f"{platform}:{count} 台" for platform, count in platform_count.items())
+ return text
+
+
+@listener(command="session",
+ need_admin=True,
+ parameters="注销/查询",
+ description="注销/查询已登录的会话")
+async def session_manage(message: Message):
+ if not message.arguments:
+ return await message.edit(await count_platform(private=message.chat.type in [ChatType.PRIVATE, ChatType.BOT]))
+ if len(message.parameter) != 2:
+ return await message.edit_text("请输入 `注销/查询 标识符` 来查询或注销会话")
+ if message.parameter[0] == "查询":
+ session = await filter_session(message.parameter[1])
+ if session:
+ return await message.edit(format_session(
+ session,
+ private=message.chat.type in [ChatType.PRIVATE, ChatType.BOT]))
+ return await message.edit_text("请输入正确的标识符!")
+ if message.parameter[0] == "注销":
+ session = await filter_session(message.parameter[1])
+ if session:
+ success = await kick_session(session)
+ return await message.edit("注销成功!" if success else "注销失败!")
+ return await message.edit_text("请输入正确的标识符!")
+ return await message.edit_text("请输入 `注销/查询 标识符` 来查询或注销会话")