601 lines
26 KiB
Python
601 lines
26 KiB
Python
# !/usr/bin/env python3
|
||
import asyncio
|
||
import json
|
||
import threading
|
||
import configparser
|
||
import logging, subprocess
|
||
from time import time, sleep
|
||
from challenge import Challenge
|
||
from pyrogram import (Client, Filters, Message, User, InlineKeyboardButton,
|
||
InlineKeyboardMarkup, CallbackQuery, ChatPermissions)
|
||
from pyrogram.errors import ChatAdminRequired, ChannelPrivate, ChannelInvalid
|
||
from Timer import Timer
|
||
|
||
_app: Client = None
|
||
_channel: str = None
|
||
_start_message: str = None
|
||
# _challenge_scheduler = sched.scheduler(time, sleep)
|
||
_current_challenges = dict()
|
||
_cch_lock = threading.Lock()
|
||
_config = dict()
|
||
logging.basicConfig(level=logging.INFO)
|
||
# 设置一下日志记录,能够在诸如 systemctl status captchabot 这样的地方获得详细输出。
|
||
conf = configparser.ConfigParser()
|
||
|
||
def load_config():
|
||
global _config
|
||
with open("config.json", encoding="utf-8") as f:
|
||
_config = json.load(f)
|
||
|
||
|
||
def save_config():
|
||
with open("config.json", "w") as f:
|
||
json.dump(_config, f, indent=4)
|
||
|
||
def _update(app):
|
||
@app.on_message(Filters.command("ping") & Filters.private)
|
||
async def ping_command(client: Client, message: Message):
|
||
await message.reply("poi~poi~poi~")
|
||
|
||
@app.on_message(Filters.command("upload") & Filters.group)
|
||
async def upload_command(client: Client, message: Message):
|
||
if message.chat.id == -1001413542074:
|
||
conf.read("tk.json")
|
||
qustion_all = conf.get("ans", "all")
|
||
qustion_a = conf.get("ans", "A")
|
||
qustion_b = conf.get("ans", "B")
|
||
qustion_c = conf.get("ans", "C")
|
||
qustion_d = conf.get("ans", "D")
|
||
if message.reply_to_message.document.mime_type == "image/jpeg":
|
||
await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".jpg")
|
||
elif message.reply_to_message.document.mime_type == "image/png":
|
||
await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".png")
|
||
if message.text.split()[-1] == "A":
|
||
conf.set("ans", "A", qustion_a + " " + str(int(qustion_all) + 1))
|
||
conf.set("ans", "all", str(int(qustion_all) + 1))
|
||
conf.write(open("tk.json", "w"))
|
||
if message.text.split()[-1] == "B":
|
||
conf.set("ans", "B", qustion_b + " " + str(int(qustion_all) + 1))
|
||
conf.set("ans", "all", str(int(qustion_all) + 1))
|
||
conf.write(open("tk.json", "w"))
|
||
if message.text.split()[-1] == "C":
|
||
conf.set("ans", "C", qustion_c + " " + str(int(qustion_all) + 1))
|
||
conf.set("ans", "all", str(int(qustion_all) + 1))
|
||
conf.write(open("tk.json", "w"))
|
||
if message.text.split()[-1] == "D":
|
||
conf.set("ans", "D", qustion_d + " " + str(int(qustion_all) + 1))
|
||
conf.set("ans", "all", str(int(qustion_all) + 1))
|
||
conf.write(open("tk.json", "w"))
|
||
await message.reply("done")
|
||
|
||
@app.on_message(Filters.command("start") & Filters.private)
|
||
async def start_command(client: Client, message: Message):
|
||
await message.reply("<a href='tg://user?id=" + str(message.from_user.id) + "'>" + message.from_user.first_name + "</a> " + _start_message)
|
||
|
||
@app.on_message(Filters.command("answer"))
|
||
async def answer_command(client: Client, message: Message):
|
||
if message.from_user.id == int(347437156) or message.from_user.id == int(
|
||
616760897) or message.from_user.id == int(441229454):
|
||
qustion_id = message.text.split()[-1]
|
||
conf.read("tk.json")
|
||
qustion_a = conf.get("ans", "A").split()
|
||
qustion_b = conf.get("ans", "B").split()
|
||
qustion_c = conf.get("ans", "C").split()
|
||
qustion_d = conf.get("ans", "D").split()
|
||
if qustion_id in qustion_a:
|
||
await message.reply("<a href='tg://user?id=" + str(message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`A`")
|
||
elif qustion_id in qustion_b:
|
||
await message.reply("<a href='tg://user?id=" + str(
|
||
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`B`")
|
||
elif qustion_id in qustion_c:
|
||
await message.reply("<a href='tg://user?id=" + str(
|
||
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`C`")
|
||
elif qustion_id in qustion_d:
|
||
await message.reply("<a href='tg://user?id=" + str(
|
||
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`D`")
|
||
|
||
@app.on_message(Filters.command("leave") & Filters.private)
|
||
async def leave_command(client: Client, message: Message):
|
||
chat_id = message.text.split()[-1]
|
||
if message.from_user.id == _config["manage_user"]:
|
||
try:
|
||
await client.send_message(int(chat_id),
|
||
_config["msg_leave_msg"])
|
||
await client.leave_chat(int(chat_id), True)
|
||
except:
|
||
await message.reply("指令出错了!可能是bot不在参数所在群里。")
|
||
else:
|
||
await message.reply("已离开群组: `" + chat_id + "`",
|
||
parse_mode="Markdown")
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_leave_group"].format(
|
||
botid=str(_me.id),
|
||
groupid=chat_id,
|
||
),
|
||
parse_mode="Markdown")
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
else:
|
||
pass
|
||
|
||
@app.on_callback_query()
|
||
async def challenge_callback(client: Client,
|
||
callback_query: CallbackQuery):
|
||
query_data = str(callback_query.data)
|
||
query_id = callback_query.id
|
||
chat_id = callback_query.message.chat.id
|
||
user_id = callback_query.from_user.id
|
||
msg_id = callback_query.message.message_id
|
||
chat_title = callback_query.message.chat.title
|
||
user_name = callback_query.from_user.first_name
|
||
group_config = _config.get(str(chat_id), _config["*"])
|
||
if query_data in ["+", "-"]:
|
||
admins = await client.get_chat_members(chat_id,
|
||
filter="administrators")
|
||
if not any([
|
||
admin.user.id == user_id and
|
||
(admin.status == "creator" or admin.can_restrict_members)
|
||
for admin in admins
|
||
]):
|
||
await client.answer_callback_query(
|
||
query_id, group_config["msg_permission_denied"])
|
||
return
|
||
|
||
ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id)
|
||
_cch_lock.acquire()
|
||
# target: int = None
|
||
challenge, target, timeout_event = _current_challenges.get(
|
||
ch_id, (None, None, None))
|
||
if ch_id in _current_challenges:
|
||
del _current_challenges[ch_id]
|
||
_cch_lock.release()
|
||
timeout_event.stop()
|
||
|
||
if query_data == "+":
|
||
try:
|
||
await client.restrict_chat_member(
|
||
chat_id,
|
||
target,
|
||
permissions=ChatPermissions(
|
||
can_send_other_messages=True,
|
||
can_send_messages=True,
|
||
can_send_media_messages=True,
|
||
can_add_web_page_previews=True,
|
||
))
|
||
except ChatAdminRequired:
|
||
await client.answer_callback_query(
|
||
query_id, group_config["msg_bot_no_permission"])
|
||
return
|
||
|
||
await client.edit_message_caption(
|
||
chat_id,
|
||
msg_id,
|
||
group_config["msg_approved"].format(user=user_name),
|
||
reply_markup=None,
|
||
)
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_passed_admin"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
else:
|
||
try:
|
||
await client.kick_chat_member(chat_id, target)
|
||
except ChatAdminRequired:
|
||
await client.answer_callback_query(
|
||
query_id, group_config["msg_bot_no_permission"])
|
||
return
|
||
await client.edit_message_caption(
|
||
chat_id,
|
||
msg_id,
|
||
group_config["msg_refused"].format(user=user_name),
|
||
reply_markup=None,
|
||
)
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_failed_admin"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
await client.answer_callback_query(query_id)
|
||
return
|
||
|
||
ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id)
|
||
_cch_lock.acquire()
|
||
challenge, target, timeout_event = _current_challenges.get(
|
||
ch_id, (None, None, None))
|
||
_cch_lock.release()
|
||
if user_id != target:
|
||
await client.answer_callback_query(
|
||
query_id, group_config["msg_challenge_not_for_you"])
|
||
return None
|
||
timeout_event.stop()
|
||
try:
|
||
await client.restrict_chat_member(
|
||
chat_id,
|
||
target,
|
||
permissions=ChatPermissions(can_send_other_messages=True,
|
||
can_send_messages=True,
|
||
can_send_media_messages=True,
|
||
can_add_web_page_previews=True,
|
||
can_send_polls=True))
|
||
except ChatAdminRequired:
|
||
pass
|
||
|
||
correct = str(challenge.ans()) == query_data
|
||
if correct:
|
||
await client.edit_message_caption(
|
||
chat_id,
|
||
msg_id,
|
||
group_config["msg_challenge_passed"],
|
||
reply_markup=None)
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_passed_answer"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
else:
|
||
if not group_config["use_strict_mode"]:
|
||
await client.edit_message_caption(
|
||
chat_id,
|
||
msg_id,
|
||
group_config["msg_challenge_mercy_passed"],
|
||
reply_markup=None,
|
||
)
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_passed_mercy"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
else:
|
||
try:
|
||
await client.edit_message_caption(
|
||
chat_id,
|
||
msg_id,
|
||
group_config["msg_challenge_failed"].format(challengeans=challenge.ans()),
|
||
reply_markup=None,
|
||
)
|
||
# await client.restrict_chat_member(chat_id, target)
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_failed_answer"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
except ChatAdminRequired:
|
||
return
|
||
|
||
if group_config["challenge_timeout_action"] == "ban":
|
||
await client.kick_chat_member(chat_id, user_id)
|
||
elif group_config["challenge_timeout_action"] == "kick":
|
||
await client.kick_chat_member(chat_id, user_id)
|
||
await client.unban_chat_member(chat_id, user_id)
|
||
elif group_config["challenge_timeout_action"] == "mute":
|
||
await client.restrict_chat_member(
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
permissions=ChatPermissions(can_send_other_messages=False,
|
||
can_send_messages=False,
|
||
can_send_media_messages=False,
|
||
can_add_web_page_previews=False,
|
||
can_send_polls=False))
|
||
else:
|
||
pass
|
||
|
||
if group_config["delete_failed_challenge"]:
|
||
Timer(
|
||
client.delete_messages(chat_id, msg_id),
|
||
group_config["delete_failed_challenge_interval"],
|
||
)
|
||
if group_config["delete_passed_challenge"]:
|
||
Timer(
|
||
client.delete_messages(chat_id, msg_id),
|
||
group_config["delete_passed_challenge_interval"],
|
||
)
|
||
|
||
@app.on_message(Filters.new_chat_members)
|
||
async def challenge_user(client: Client, message: Message):
|
||
target = message.new_chat_members[0]
|
||
if message.from_user.id != target.id:
|
||
if target.is_self:
|
||
group_config = _config.get(str(message.chat.id), _config["*"])
|
||
try:
|
||
await client.send_message(
|
||
message.chat.id, group_config["msg_self_introduction"])
|
||
_me: User = await client.get_me()
|
||
try:
|
||
await client.send_message(
|
||
int(_channel),
|
||
_config["msg_into_group"].format(
|
||
botid=str(_me.id),
|
||
groupid=str(message.chat.id),
|
||
grouptitle=str(message.chat.title),
|
||
),
|
||
parse_mode="Markdown",
|
||
)
|
||
except Exception as e:
|
||
logging.error(str(e))
|
||
except ChannelPrivate:
|
||
return
|
||
return
|
||
try:
|
||
await client.restrict_chat_member(
|
||
chat_id=message.chat.id,
|
||
user_id=target.id,
|
||
permissions=ChatPermissions(can_send_other_messages=False,
|
||
can_send_messages=False,
|
||
can_send_media_messages=False,
|
||
can_add_web_page_previews=False,
|
||
can_send_polls=False))
|
||
except ChatAdminRequired:
|
||
return
|
||
group_config = _config.get(str(message.chat.id), _config["*"])
|
||
challenge = Challenge()
|
||
|
||
def generate_challenge_button(e):
|
||
choices = []
|
||
answers = []
|
||
for c in e.choices():
|
||
answers.append(
|
||
InlineKeyboardButton(str(c),
|
||
callback_data=bytes(
|
||
str(c), encoding="utf-8")))
|
||
choices.append(answers)
|
||
return choices + [[
|
||
InlineKeyboardButton(group_config["msg_approve_manually"],
|
||
callback_data=b"+"),
|
||
InlineKeyboardButton(group_config["msg_refuse_manually"],
|
||
callback_data=b"-"),
|
||
]]
|
||
|
||
timeout = group_config["challenge_timeout"]
|
||
reply_message = await client.send_photo(
|
||
message.chat.id,
|
||
photo = challenge.pu(),
|
||
caption = group_config["msg_challenge"].format(target=target.first_name,
|
||
target_id=target.id,
|
||
timeout=timeout,
|
||
challenge=challenge.qus()),
|
||
reply_to_message_id=message.message_id,
|
||
reply_markup=InlineKeyboardMarkup(
|
||
generate_challenge_button(challenge)),
|
||
)
|
||
_me: User = await client.get_me()
|
||
chat_id = message.chat.id
|
||
chat_title = message.chat.title
|
||
target = message.from_user.id
|
||
await client.send_message(
|
||
int(_channel), _config['msg_attempt_try'].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
))
|
||
timeout_event = Timer(
|
||
challenge_timeout(client, message.chat.id, message.from_user.id, message.chat.title,
|
||
reply_message.message_id),
|
||
timeout=group_config["challenge_timeout"],
|
||
)
|
||
_cch_lock.acquire()
|
||
_current_challenges["{chat}|{msg}".format(
|
||
chat=message.chat.id,
|
||
msg=reply_message.message_id)] = (challenge, message.from_user.id,
|
||
timeout_event)
|
||
_cch_lock.release()
|
||
|
||
async def challenge_timeout(client: Client, chat_id, from_id, chat_title, reply_id):
|
||
global _current_challenges
|
||
_me: User = await client.get_me()
|
||
group_config = _config.get(str(chat_id), _config["*"])
|
||
|
||
_cch_lock.acquire()
|
||
del _current_challenges["{chat}|{msg}".format(chat=chat_id,
|
||
msg=reply_id)]
|
||
_cch_lock.release()
|
||
|
||
# TODO try catch
|
||
await client.edit_message_caption(
|
||
chat_id=chat_id,
|
||
message_id=reply_id,
|
||
caption=group_config["msg_challenge_timeout"],
|
||
reply_markup=None,
|
||
)
|
||
await client.send_message(chat_id=_config["channel"],
|
||
text=_config["msg_failed_timeout"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(from_id),
|
||
grouptitle=str(chat_title),
|
||
groupid=str(chat_id)))
|
||
if group_config["challenge_timeout_action"] == "ban":
|
||
await client.kick_chat_member(chat_id, from_id)
|
||
elif group_config["challenge_timeout_action"] == "kick":
|
||
await client.kick_chat_member(chat_id, from_id)
|
||
await client.unban_chat_member(chat_id, from_id)
|
||
elif group_config["challenge_timeout_action"] == "mute":
|
||
await client.restrict_chat_member(
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
permissions=ChatPermissions(can_send_other_messages=False,
|
||
can_send_messages=False,
|
||
can_send_media_messages=False,
|
||
can_add_web_page_previews=False,
|
||
can_send_polls=False))
|
||
else:
|
||
pass
|
||
|
||
if group_config["delete_failed_challenge"]:
|
||
Timer(
|
||
client.delete_messages(chat_id, reply_id),
|
||
group_config["delete_failed_challenge_interval"],
|
||
)
|
||
|
||
@app.on_message(Filters.command("challenge"))
|
||
async def challenge_user(client: Client, message: Message):
|
||
admins = await client.get_chat_members(message.chat.id,
|
||
filter="administrators")
|
||
if not any([
|
||
admin.user.id == message.from_user.id and
|
||
(admin.status == "creator" or admin.can_restrict_members)
|
||
for admin in admins
|
||
]):
|
||
target = message.from_user
|
||
group_config = _config.get(str(message.chat.id), _config["*"])
|
||
challenge = Challenge()
|
||
|
||
def generate_challenge_button(e):
|
||
choices = []
|
||
answers = []
|
||
for c in e.choices():
|
||
answers.append(
|
||
InlineKeyboardButton(str(c),
|
||
callback_data=bytes(
|
||
str(c), encoding="utf-8")))
|
||
choices.append(answers)
|
||
return choices
|
||
|
||
timeout = group_config["challenge_timeout"]
|
||
reply_message = await client.send_photo(
|
||
message.chat.id,
|
||
photo = challenge.pu(),
|
||
caption = group_config["msg_challeng"].format(target=target.first_name,
|
||
target_id=target.id,
|
||
timeout=timeout,
|
||
challenge=challenge.qus()),
|
||
reply_to_message_id=message.message_id,
|
||
reply_markup=InlineKeyboardMarkup(
|
||
generate_challenge_button(challenge)),
|
||
)
|
||
_me: User = await client.get_me()
|
||
chat_id = message.chat.id
|
||
chat_title = message.chat.title
|
||
target = message.from_user.id
|
||
await client.send_message(
|
||
int(_channel), _config['msg_challenge_try'].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(target),
|
||
groupid=str(chat_id),
|
||
grouptitle=str(chat_title),
|
||
))
|
||
timeout_event = Timer(
|
||
challeng_timeout(client, message.chat.id, message.from_user.id, message.chat.title,
|
||
reply_message.message_id),
|
||
timeout=group_config["challenge_timeout"],
|
||
)
|
||
_cch_lock.acquire()
|
||
_current_challenges["{chat}|{msg}".format(
|
||
chat=message.chat.id,
|
||
msg=reply_message.message_id)] = (challenge, message.from_user.id,
|
||
timeout_event)
|
||
_cch_lock.release()
|
||
|
||
async def challeng_timeout(client: Client, chat_id, from_id, chat_title, reply_id):
|
||
global _current_challenges
|
||
_me: User = await client.get_me()
|
||
group_config = _config.get(str(chat_id), _config["*"])
|
||
|
||
_cch_lock.acquire()
|
||
del _current_challenges["{chat}|{msg}".format(chat=chat_id,
|
||
msg=reply_id)]
|
||
_cch_lock.release()
|
||
|
||
# TODO try catch
|
||
await client.edit_message_caption(
|
||
chat_id=chat_id,
|
||
message_id=reply_id,
|
||
caption=group_config["msg_challeng_timeout"],
|
||
reply_markup=None,
|
||
)
|
||
await client.send_message(chat_id=_config["channel"],
|
||
text=_config["msg_challenge_time"].format(
|
||
botid=str(_me.id),
|
||
targetuser=str(from_id),
|
||
grouptitle=str(chat_title),
|
||
groupid=str(chat_id)))
|
||
|
||
if group_config["delete_failed_challenge"]:
|
||
Timer(
|
||
client.delete_messages(chat_id, reply_id),
|
||
group_config["delete_failed_challenge_interval"],
|
||
)
|
||
|
||
|
||
def _main():
|
||
global _app, _channel, _start_message, _config
|
||
load_config()
|
||
_api_id = _config["api_id"]
|
||
_api_hash = _config["api_hash"]
|
||
_token = _config["token"]
|
||
_channel = _config["channel"]
|
||
_start_message = _config["msg_start_message"]
|
||
_proxy_ip = _config["proxy_addr"].strip()
|
||
_proxy_port = _config["proxy_port"].strip()
|
||
if _proxy_ip and _proxy_port:
|
||
_app = Client("bot",
|
||
bot_token=_token,
|
||
api_id=_api_id,
|
||
api_hash=_api_hash,
|
||
proxy=dict(hostname=_proxy_ip, port=int(_proxy_port)))
|
||
else:
|
||
_app = Client("bot",
|
||
bot_token=_token,
|
||
api_id=_api_id,
|
||
api_hash=_api_hash)
|
||
try:
|
||
_update(_app)
|
||
_app.run()
|
||
except KeyboardInterrupt:
|
||
quit()
|
||
except Exception as e:
|
||
logging.error(e)
|
||
_main()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
_main()
|