# !/usr/bin/env python3 import asyncio import json import threading import configparser import logging, subprocess from time import time, sleep from challenge import Challenge from pyrogram import (Client, Filters, Message, User, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery, ChatPermissions) from pyrogram.errors import ChatAdminRequired, ChannelPrivate, ChannelInvalid from Timer import Timer _app: Client = None _channel: str = None _start_message: str = None # _challenge_scheduler = sched.scheduler(time, sleep) _current_challenges = dict() _cch_lock = threading.Lock() _config = dict() logging.basicConfig(level=logging.INFO) # 设置一下日志记录,能够在诸如 systemctl status captchabot 这样的地方获得详细输出。 conf = configparser.ConfigParser() def load_config(): global _config with open("config.json", encoding="utf-8") as f: _config = json.load(f) def save_config(): with open("config.json", "w") as f: json.dump(_config, f, indent=4) def _update(app): @app.on_message(Filters.command("ping") & Filters.private) async def ping_command(client: Client, message: Message): await message.reply("poi~poi~poi~") @app.on_message(Filters.command("upload") & Filters.group) async def upload_command(client: Client, message: Message): if message.chat.id == -1001413542074: conf.read("tk.json") qustion_all = conf.get("ans", "all") qustion_a = conf.get("ans", "A") qustion_b = conf.get("ans", "B") qustion_c = conf.get("ans", "C") qustion_d = conf.get("ans", "D") if message.reply_to_message.document.mime_type == "image/jpeg": await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".jpg") elif message.reply_to_message.document.mime_type == "image/png": await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".png") if message.text.split()[-1] == "A": conf.set("ans", "A", qustion_a + " " + str(int(qustion_all) + 1)) conf.set("ans", "all", str(int(qustion_all) + 1)) conf.write(open("tk.json", "w")) if message.text.split()[-1] == "B": conf.set("ans", "B", qustion_b + " " + str(int(qustion_all) + 1)) conf.set("ans", "all", str(int(qustion_all) + 1)) conf.write(open("tk.json", "w")) if message.text.split()[-1] == "C": conf.set("ans", "C", qustion_c + " " + str(int(qustion_all) + 1)) conf.set("ans", "all", str(int(qustion_all) + 1)) conf.write(open("tk.json", "w")) if message.text.split()[-1] == "D": conf.set("ans", "D", qustion_d + " " + str(int(qustion_all) + 1)) conf.set("ans", "all", str(int(qustion_all) + 1)) conf.write(open("tk.json", "w")) await message.reply("done") @app.on_message(Filters.command("start") & Filters.private) async def start_command(client: Client, message: Message): await message.reply("" + message.from_user.first_name + " " + _start_message) @app.on_message(Filters.command("answer")) async def answer_command(client: Client, message: Message): if message.from_user.id == int(347437156) or message.from_user.id == int( 616760897) or message.from_user.id == int(441229454): qustion_id = message.text.split()[-1] conf.read("tk.json") qustion_a = conf.get("ans", "A").split() qustion_b = conf.get("ans", "B").split() qustion_c = conf.get("ans", "C").split() qustion_d = conf.get("ans", "D").split() if qustion_id in qustion_a: await message.reply("" + message.from_user.first_name + " 此题的正确答案为:`A`") elif qustion_id in qustion_b: await message.reply("" + message.from_user.first_name + " 此题的正确答案为:`B`") elif qustion_id in qustion_c: await message.reply("" + message.from_user.first_name + " 此题的正确答案为:`C`") elif qustion_id in qustion_d: await message.reply("" + message.from_user.first_name + " 此题的正确答案为:`D`") @app.on_message(Filters.command("leave") & Filters.private) async def leave_command(client: Client, message: Message): chat_id = message.text.split()[-1] if message.from_user.id == _config["manage_user"]: try: await client.send_message(int(chat_id), _config["msg_leave_msg"]) await client.leave_chat(int(chat_id), True) except: await message.reply("指令出错了!可能是bot不在参数所在群里。") else: await message.reply("已离开群组: `" + chat_id + "`", parse_mode="Markdown") _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_leave_group"].format( botid=str(_me.id), groupid=chat_id, ), parse_mode="Markdown") except Exception as e: logging.error(str(e)) else: pass @app.on_callback_query() async def challenge_callback(client: Client, callback_query: CallbackQuery): query_data = str(callback_query.data) query_id = callback_query.id chat_id = callback_query.message.chat.id user_id = callback_query.from_user.id msg_id = callback_query.message.message_id chat_title = callback_query.message.chat.title user_name = callback_query.from_user.first_name group_config = _config.get(str(chat_id), _config["*"]) if query_data in ["+", "-"]: admins = await client.get_chat_members(chat_id, filter="administrators") if not any([ admin.user.id == user_id and (admin.status == "creator" or admin.can_restrict_members) for admin in admins ]): await client.answer_callback_query( query_id, group_config["msg_permission_denied"]) return ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id) _cch_lock.acquire() # target: int = None challenge, target, timeout_event = _current_challenges.get( ch_id, (None, None, None)) if ch_id in _current_challenges: del _current_challenges[ch_id] _cch_lock.release() timeout_event.stop() if query_data == "+": try: await client.restrict_chat_member( chat_id, target, permissions=ChatPermissions( can_send_other_messages=True, can_send_messages=True, can_send_media_messages=True, can_add_web_page_previews=True, )) except ChatAdminRequired: await client.answer_callback_query( query_id, group_config["msg_bot_no_permission"]) return await client.edit_message_caption( chat_id, msg_id, group_config["msg_approved"].format(user=user_name), reply_markup=None, ) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_passed_admin"].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) else: try: await client.kick_chat_member(chat_id, target) except ChatAdminRequired: await client.answer_callback_query( query_id, group_config["msg_bot_no_permission"]) return await client.edit_message_caption( chat_id, msg_id, group_config["msg_refused"].format(user=user_name), reply_markup=None, ) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_failed_admin"].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) await client.answer_callback_query(query_id) return ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id) _cch_lock.acquire() challenge, target, timeout_event = _current_challenges.get( ch_id, (None, None, None)) _cch_lock.release() if user_id != target: await client.answer_callback_query( query_id, group_config["msg_challenge_not_for_you"]) return None timeout_event.stop() try: await client.restrict_chat_member( chat_id, target, permissions=ChatPermissions(can_send_other_messages=True, can_send_messages=True, can_send_media_messages=True, can_add_web_page_previews=True, can_send_polls=True)) except ChatAdminRequired: pass correct = str(challenge.ans()) == query_data if correct: await client.edit_message_caption( chat_id, msg_id, group_config["msg_challenge_passed"], reply_markup=None) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_passed_answer"].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) else: if not group_config["use_strict_mode"]: await client.edit_message_caption( chat_id, msg_id, group_config["msg_challenge_mercy_passed"], reply_markup=None, ) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_passed_mercy"].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) else: try: await client.edit_message_caption( chat_id, msg_id, group_config["msg_challenge_failed"].format(challengeans=challenge.ans()), reply_markup=None, ) # await client.restrict_chat_member(chat_id, target) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_failed_answer"].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) except ChatAdminRequired: return if group_config["challenge_timeout_action"] == "ban": await client.kick_chat_member(chat_id, user_id) elif group_config["challenge_timeout_action"] == "kick": await client.kick_chat_member(chat_id, user_id) await client.unban_chat_member(chat_id, user_id) elif group_config["challenge_timeout_action"] == "mute": await client.restrict_chat_member( chat_id=chat_id, user_id=user_id, permissions=ChatPermissions(can_send_other_messages=False, can_send_messages=False, can_send_media_messages=False, can_add_web_page_previews=False, can_send_polls=False)) else: pass if group_config["delete_failed_challenge"]: Timer( client.delete_messages(chat_id, msg_id), group_config["delete_failed_challenge_interval"], ) if group_config["delete_passed_challenge"]: Timer( client.delete_messages(chat_id, msg_id), group_config["delete_passed_challenge_interval"], ) @app.on_message(Filters.new_chat_members) async def challenge_user(client: Client, message: Message): target = message.new_chat_members[0] if message.from_user.id != target.id: if target.is_self: group_config = _config.get(str(message.chat.id), _config["*"]) try: await client.send_message( message.chat.id, group_config["msg_self_introduction"]) _me: User = await client.get_me() try: await client.send_message( int(_channel), _config["msg_into_group"].format( botid=str(_me.id), groupid=str(message.chat.id), grouptitle=str(message.chat.title), ), parse_mode="Markdown", ) except Exception as e: logging.error(str(e)) except ChannelPrivate: return return try: await client.restrict_chat_member( chat_id=message.chat.id, user_id=target.id, permissions=ChatPermissions(can_send_other_messages=False, can_send_messages=False, can_send_media_messages=False, can_add_web_page_previews=False, can_send_polls=False)) except ChatAdminRequired: return group_config = _config.get(str(message.chat.id), _config["*"]) challenge = Challenge() def generate_challenge_button(e): choices = [] answers = [] for c in e.choices(): answers.append( InlineKeyboardButton(str(c), callback_data=bytes( str(c), encoding="utf-8"))) choices.append(answers) return choices + [[ InlineKeyboardButton(group_config["msg_approve_manually"], callback_data=b"+"), InlineKeyboardButton(group_config["msg_refuse_manually"], callback_data=b"-"), ]] timeout = group_config["challenge_timeout"] reply_message = await client.send_photo( message.chat.id, photo = challenge.pu(), caption = group_config["msg_challenge"].format(target=target.first_name, target_id=target.id, timeout=timeout, challenge=challenge.qus()), reply_to_message_id=message.message_id, reply_markup=InlineKeyboardMarkup( generate_challenge_button(challenge)), ) _me: User = await client.get_me() chat_id = message.chat.id chat_title = message.chat.title target = message.from_user.id await client.send_message( int(_channel), _config['msg_attempt_try'].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), )) timeout_event = Timer( challenge_timeout(client, message.chat.id, message.from_user.id, message.chat.title, reply_message.message_id), timeout=group_config["challenge_timeout"], ) _cch_lock.acquire() _current_challenges["{chat}|{msg}".format( chat=message.chat.id, msg=reply_message.message_id)] = (challenge, message.from_user.id, timeout_event) _cch_lock.release() async def challenge_timeout(client: Client, chat_id, from_id, chat_title, reply_id): global _current_challenges _me: User = await client.get_me() group_config = _config.get(str(chat_id), _config["*"]) _cch_lock.acquire() del _current_challenges["{chat}|{msg}".format(chat=chat_id, msg=reply_id)] _cch_lock.release() # TODO try catch await client.edit_message_caption( chat_id=chat_id, message_id=reply_id, caption=group_config["msg_challenge_timeout"], reply_markup=None, ) await client.send_message(chat_id=_config["channel"], text=_config["msg_failed_timeout"].format( botid=str(_me.id), targetuser=str(from_id), grouptitle=str(chat_title), groupid=str(chat_id))) if group_config["challenge_timeout_action"] == "ban": await client.kick_chat_member(chat_id, from_id) elif group_config["challenge_timeout_action"] == "kick": await client.kick_chat_member(chat_id, from_id) await client.unban_chat_member(chat_id, from_id) elif group_config["challenge_timeout_action"] == "mute": await client.restrict_chat_member( chat_id=chat_id, user_id=user_id, permissions=ChatPermissions(can_send_other_messages=False, can_send_messages=False, can_send_media_messages=False, can_add_web_page_previews=False, can_send_polls=False)) else: pass if group_config["delete_failed_challenge"]: Timer( client.delete_messages(chat_id, reply_id), group_config["delete_failed_challenge_interval"], ) @app.on_message(Filters.command("challenge")) async def challenge_user(client: Client, message: Message): admins = await client.get_chat_members(message.chat.id, filter="administrators") if not any([ admin.user.id == message.from_user.id and (admin.status == "creator" or admin.can_restrict_members) for admin in admins ]): target = message.from_user group_config = _config.get(str(message.chat.id), _config["*"]) challenge = Challenge() def generate_challenge_button(e): choices = [] answers = [] for c in e.choices(): answers.append( InlineKeyboardButton(str(c), callback_data=bytes( str(c), encoding="utf-8"))) choices.append(answers) return choices timeout = group_config["challenge_timeout"] reply_message = await client.send_photo( message.chat.id, photo = challenge.pu(), caption = group_config["msg_challeng"].format(target=target.first_name, target_id=target.id, timeout=timeout, challenge=challenge.qus()), reply_to_message_id=message.message_id, reply_markup=InlineKeyboardMarkup( generate_challenge_button(challenge)), ) _me: User = await client.get_me() chat_id = message.chat.id chat_title = message.chat.title target = message.from_user.id await client.send_message( int(_channel), _config['msg_challenge_try'].format( botid=str(_me.id), targetuser=str(target), groupid=str(chat_id), grouptitle=str(chat_title), )) timeout_event = Timer( challeng_timeout(client, message.chat.id, message.from_user.id, message.chat.title, reply_message.message_id), timeout=group_config["challenge_timeout"], ) _cch_lock.acquire() _current_challenges["{chat}|{msg}".format( chat=message.chat.id, msg=reply_message.message_id)] = (challenge, message.from_user.id, timeout_event) _cch_lock.release() async def challeng_timeout(client: Client, chat_id, from_id, chat_title, reply_id): global _current_challenges _me: User = await client.get_me() group_config = _config.get(str(chat_id), _config["*"]) _cch_lock.acquire() del _current_challenges["{chat}|{msg}".format(chat=chat_id, msg=reply_id)] _cch_lock.release() # TODO try catch await client.edit_message_caption( chat_id=chat_id, message_id=reply_id, caption=group_config["msg_challeng_timeout"], reply_markup=None, ) await client.send_message(chat_id=_config["channel"], text=_config["msg_challenge_time"].format( botid=str(_me.id), targetuser=str(from_id), grouptitle=str(chat_title), groupid=str(chat_id))) if group_config["delete_failed_challenge"]: Timer( client.delete_messages(chat_id, reply_id), group_config["delete_failed_challenge_interval"], ) def _main(): global _app, _channel, _start_message, _config load_config() _api_id = _config["api_id"] _api_hash = _config["api_hash"] _token = _config["token"] _channel = _config["channel"] _start_message = _config["msg_start_message"] _proxy_ip = _config["proxy_addr"].strip() _proxy_port = _config["proxy_port"].strip() if _proxy_ip and _proxy_port: _app = Client("bot", bot_token=_token, api_id=_api_id, api_hash=_api_hash, proxy=dict(hostname=_proxy_ip, port=int(_proxy_port))) else: _app = Client("bot", bot_token=_token, api_id=_api_id, api_hash=_api_hash) try: _update(_app) _app.run() except KeyboardInterrupt: quit() except Exception as e: logging.error(e) _main() if __name__ == "__main__": _main()