601 lines
26 KiB
Python
601 lines
26 KiB
Python
|
# !/usr/bin/env python3
|
|||
|
import asyncio
|
|||
|
import json
|
|||
|
import threading
|
|||
|
import configparser
|
|||
|
import logging, subprocess
|
|||
|
from time import time, sleep
|
|||
|
from challenge import Challenge
|
|||
|
from pyrogram import (Client, Filters, Message, User, InlineKeyboardButton,
|
|||
|
InlineKeyboardMarkup, CallbackQuery, ChatPermissions)
|
|||
|
from pyrogram.errors import ChatAdminRequired, ChannelPrivate, ChannelInvalid
|
|||
|
from Timer import Timer
|
|||
|
|
|||
|
_app: Client = None
|
|||
|
_channel: str = None
|
|||
|
_start_message: str = None
|
|||
|
# _challenge_scheduler = sched.scheduler(time, sleep)
|
|||
|
_current_challenges = dict()
|
|||
|
_cch_lock = threading.Lock()
|
|||
|
_config = dict()
|
|||
|
logging.basicConfig(level=logging.INFO)
|
|||
|
# 设置一下日志记录,能够在诸如 systemctl status captchabot 这样的地方获得详细输出。
|
|||
|
conf = configparser.ConfigParser()
|
|||
|
|
|||
|
def load_config():
|
|||
|
global _config
|
|||
|
with open("config.json", encoding="utf-8") as f:
|
|||
|
_config = json.load(f)
|
|||
|
|
|||
|
|
|||
|
def save_config():
|
|||
|
with open("config.json", "w") as f:
|
|||
|
json.dump(_config, f, indent=4)
|
|||
|
|
|||
|
def _update(app):
|
|||
|
@app.on_message(Filters.command("ping") & Filters.private)
|
|||
|
async def ping_command(client: Client, message: Message):
|
|||
|
await message.reply("poi~poi~poi~")
|
|||
|
|
|||
|
@app.on_message(Filters.command("upload") & Filters.group)
|
|||
|
async def upload_command(client: Client, message: Message):
|
|||
|
if message.chat.id == -1001413542074:
|
|||
|
conf.read("tk.json")
|
|||
|
qustion_all = conf.get("ans", "all")
|
|||
|
qustion_a = conf.get("ans", "A")
|
|||
|
qustion_b = conf.get("ans", "B")
|
|||
|
qustion_c = conf.get("ans", "C")
|
|||
|
qustion_d = conf.get("ans", "D")
|
|||
|
if message.reply_to_message.document.mime_type == "image/jpeg":
|
|||
|
await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".jpg")
|
|||
|
elif message.reply_to_message.document.mime_type == "image/png":
|
|||
|
await client.download_media(message.reply_to_message, file_name="pic/" + str(int(qustion_all) + 1) + ".png")
|
|||
|
if message.text.split()[-1] == "A":
|
|||
|
conf.set("ans", "A", qustion_a + " " + str(int(qustion_all) + 1))
|
|||
|
conf.set("ans", "all", str(int(qustion_all) + 1))
|
|||
|
conf.write(open("tk.json", "w"))
|
|||
|
if message.text.split()[-1] == "B":
|
|||
|
conf.set("ans", "B", qustion_b + " " + str(int(qustion_all) + 1))
|
|||
|
conf.set("ans", "all", str(int(qustion_all) + 1))
|
|||
|
conf.write(open("tk.json", "w"))
|
|||
|
if message.text.split()[-1] == "C":
|
|||
|
conf.set("ans", "C", qustion_c + " " + str(int(qustion_all) + 1))
|
|||
|
conf.set("ans", "all", str(int(qustion_all) + 1))
|
|||
|
conf.write(open("tk.json", "w"))
|
|||
|
if message.text.split()[-1] == "D":
|
|||
|
conf.set("ans", "D", qustion_d + " " + str(int(qustion_all) + 1))
|
|||
|
conf.set("ans", "all", str(int(qustion_all) + 1))
|
|||
|
conf.write(open("tk.json", "w"))
|
|||
|
await message.reply("done")
|
|||
|
|
|||
|
@app.on_message(Filters.command("start") & Filters.private)
|
|||
|
async def start_command(client: Client, message: Message):
|
|||
|
await message.reply("<a href='tg://user?id=" + str(message.from_user.id) + "'>" + message.from_user.first_name + "</a> " + _start_message)
|
|||
|
|
|||
|
@app.on_message(Filters.command("answer"))
|
|||
|
async def answer_command(client: Client, message: Message):
|
|||
|
if message.from_user.id == int(347437156) or message.from_user.id == int(
|
|||
|
616760897) or message.from_user.id == int(441229454):
|
|||
|
qustion_id = message.text.split()[-1]
|
|||
|
conf.read("tk.json")
|
|||
|
qustion_a = conf.get("ans", "A").split()
|
|||
|
qustion_b = conf.get("ans", "B").split()
|
|||
|
qustion_c = conf.get("ans", "C").split()
|
|||
|
qustion_d = conf.get("ans", "D").split()
|
|||
|
if qustion_id in qustion_a:
|
|||
|
await message.reply("<a href='tg://user?id=" + str(message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`A`")
|
|||
|
elif qustion_id in qustion_b:
|
|||
|
await message.reply("<a href='tg://user?id=" + str(
|
|||
|
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`B`")
|
|||
|
elif qustion_id in qustion_c:
|
|||
|
await message.reply("<a href='tg://user?id=" + str(
|
|||
|
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`C`")
|
|||
|
elif qustion_id in qustion_d:
|
|||
|
await message.reply("<a href='tg://user?id=" + str(
|
|||
|
message.from_user.id) + "'>" + message.from_user.first_name + "</a> 此题的正确答案为:`D`")
|
|||
|
|
|||
|
@app.on_message(Filters.command("leave") & Filters.private)
|
|||
|
async def leave_command(client: Client, message: Message):
|
|||
|
chat_id = message.text.split()[-1]
|
|||
|
if message.from_user.id == _config["manage_user"]:
|
|||
|
try:
|
|||
|
await client.send_message(int(chat_id),
|
|||
|
_config["msg_leave_msg"])
|
|||
|
await client.leave_chat(int(chat_id), True)
|
|||
|
except:
|
|||
|
await message.reply("指令出错了!可能是bot不在参数所在群里。")
|
|||
|
else:
|
|||
|
await message.reply("已离开群组: `" + chat_id + "`",
|
|||
|
parse_mode="Markdown")
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_leave_group"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
groupid=chat_id,
|
|||
|
),
|
|||
|
parse_mode="Markdown")
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
else:
|
|||
|
pass
|
|||
|
|
|||
|
@app.on_callback_query()
|
|||
|
async def challenge_callback(client: Client,
|
|||
|
callback_query: CallbackQuery):
|
|||
|
query_data = str(callback_query.data)
|
|||
|
query_id = callback_query.id
|
|||
|
chat_id = callback_query.message.chat.id
|
|||
|
user_id = callback_query.from_user.id
|
|||
|
msg_id = callback_query.message.message_id
|
|||
|
chat_title = callback_query.message.chat.title
|
|||
|
user_name = callback_query.from_user.first_name
|
|||
|
group_config = _config.get(str(chat_id), _config["*"])
|
|||
|
if query_data in ["+", "-"]:
|
|||
|
admins = await client.get_chat_members(chat_id,
|
|||
|
filter="administrators")
|
|||
|
if not any([
|
|||
|
admin.user.id == user_id and
|
|||
|
(admin.status == "creator" or admin.can_restrict_members)
|
|||
|
for admin in admins
|
|||
|
]):
|
|||
|
await client.answer_callback_query(
|
|||
|
query_id, group_config["msg_permission_denied"])
|
|||
|
return
|
|||
|
|
|||
|
ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id)
|
|||
|
_cch_lock.acquire()
|
|||
|
# target: int = None
|
|||
|
challenge, target, timeout_event = _current_challenges.get(
|
|||
|
ch_id, (None, None, None))
|
|||
|
if ch_id in _current_challenges:
|
|||
|
del _current_challenges[ch_id]
|
|||
|
_cch_lock.release()
|
|||
|
timeout_event.stop()
|
|||
|
|
|||
|
if query_data == "+":
|
|||
|
try:
|
|||
|
await client.restrict_chat_member(
|
|||
|
chat_id,
|
|||
|
target,
|
|||
|
permissions=ChatPermissions(
|
|||
|
can_send_other_messages=True,
|
|||
|
can_send_messages=True,
|
|||
|
can_send_media_messages=True,
|
|||
|
can_add_web_page_previews=True,
|
|||
|
))
|
|||
|
except ChatAdminRequired:
|
|||
|
await client.answer_callback_query(
|
|||
|
query_id, group_config["msg_bot_no_permission"])
|
|||
|
return
|
|||
|
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id,
|
|||
|
msg_id,
|
|||
|
group_config["msg_approved"].format(user=user_name),
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_passed_admin"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
else:
|
|||
|
try:
|
|||
|
await client.kick_chat_member(chat_id, target)
|
|||
|
except ChatAdminRequired:
|
|||
|
await client.answer_callback_query(
|
|||
|
query_id, group_config["msg_bot_no_permission"])
|
|||
|
return
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id,
|
|||
|
msg_id,
|
|||
|
group_config["msg_refused"].format(user=user_name),
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_failed_admin"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
await client.answer_callback_query(query_id)
|
|||
|
return
|
|||
|
|
|||
|
ch_id = "{chat}|{msg}".format(chat=chat_id, msg=msg_id)
|
|||
|
_cch_lock.acquire()
|
|||
|
challenge, target, timeout_event = _current_challenges.get(
|
|||
|
ch_id, (None, None, None))
|
|||
|
_cch_lock.release()
|
|||
|
if user_id != target:
|
|||
|
await client.answer_callback_query(
|
|||
|
query_id, group_config["msg_challenge_not_for_you"])
|
|||
|
return None
|
|||
|
timeout_event.stop()
|
|||
|
try:
|
|||
|
await client.restrict_chat_member(
|
|||
|
chat_id,
|
|||
|
target,
|
|||
|
permissions=ChatPermissions(can_send_other_messages=True,
|
|||
|
can_send_messages=True,
|
|||
|
can_send_media_messages=True,
|
|||
|
can_add_web_page_previews=True,
|
|||
|
can_send_polls=True))
|
|||
|
except ChatAdminRequired:
|
|||
|
pass
|
|||
|
|
|||
|
correct = str(challenge.ans()) == query_data
|
|||
|
if correct:
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id,
|
|||
|
msg_id,
|
|||
|
group_config["msg_challenge_passed"],
|
|||
|
reply_markup=None)
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_passed_answer"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
else:
|
|||
|
if not group_config["use_strict_mode"]:
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id,
|
|||
|
msg_id,
|
|||
|
group_config["msg_challenge_mercy_passed"],
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_passed_mercy"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
else:
|
|||
|
try:
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id,
|
|||
|
msg_id,
|
|||
|
group_config["msg_challenge_failed"].format(challengeans=challenge.ans()),
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
# await client.restrict_chat_member(chat_id, target)
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_failed_answer"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
except ChatAdminRequired:
|
|||
|
return
|
|||
|
|
|||
|
if group_config["challenge_timeout_action"] == "ban":
|
|||
|
await client.kick_chat_member(chat_id, user_id)
|
|||
|
elif group_config["challenge_timeout_action"] == "kick":
|
|||
|
await client.kick_chat_member(chat_id, user_id)
|
|||
|
await client.unban_chat_member(chat_id, user_id)
|
|||
|
elif group_config["challenge_timeout_action"] == "mute":
|
|||
|
await client.restrict_chat_member(
|
|||
|
chat_id=chat_id,
|
|||
|
user_id=user_id,
|
|||
|
permissions=ChatPermissions(can_send_other_messages=False,
|
|||
|
can_send_messages=False,
|
|||
|
can_send_media_messages=False,
|
|||
|
can_add_web_page_previews=False,
|
|||
|
can_send_polls=False))
|
|||
|
else:
|
|||
|
pass
|
|||
|
|
|||
|
if group_config["delete_failed_challenge"]:
|
|||
|
Timer(
|
|||
|
client.delete_messages(chat_id, msg_id),
|
|||
|
group_config["delete_failed_challenge_interval"],
|
|||
|
)
|
|||
|
if group_config["delete_passed_challenge"]:
|
|||
|
Timer(
|
|||
|
client.delete_messages(chat_id, msg_id),
|
|||
|
group_config["delete_passed_challenge_interval"],
|
|||
|
)
|
|||
|
|
|||
|
@app.on_message(Filters.new_chat_members)
|
|||
|
async def challenge_user(client: Client, message: Message):
|
|||
|
target = message.new_chat_members[0]
|
|||
|
if message.from_user.id != target.id:
|
|||
|
if target.is_self:
|
|||
|
group_config = _config.get(str(message.chat.id), _config["*"])
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
message.chat.id, group_config["msg_self_introduction"])
|
|||
|
_me: User = await client.get_me()
|
|||
|
try:
|
|||
|
await client.send_message(
|
|||
|
int(_channel),
|
|||
|
_config["msg_into_group"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
groupid=str(message.chat.id),
|
|||
|
grouptitle=str(message.chat.title),
|
|||
|
),
|
|||
|
parse_mode="Markdown",
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logging.error(str(e))
|
|||
|
except ChannelPrivate:
|
|||
|
return
|
|||
|
return
|
|||
|
try:
|
|||
|
await client.restrict_chat_member(
|
|||
|
chat_id=message.chat.id,
|
|||
|
user_id=target.id,
|
|||
|
permissions=ChatPermissions(can_send_other_messages=False,
|
|||
|
can_send_messages=False,
|
|||
|
can_send_media_messages=False,
|
|||
|
can_add_web_page_previews=False,
|
|||
|
can_send_polls=False))
|
|||
|
except ChatAdminRequired:
|
|||
|
return
|
|||
|
group_config = _config.get(str(message.chat.id), _config["*"])
|
|||
|
challenge = Challenge()
|
|||
|
|
|||
|
def generate_challenge_button(e):
|
|||
|
choices = []
|
|||
|
answers = []
|
|||
|
for c in e.choices():
|
|||
|
answers.append(
|
|||
|
InlineKeyboardButton(str(c),
|
|||
|
callback_data=bytes(
|
|||
|
str(c), encoding="utf-8")))
|
|||
|
choices.append(answers)
|
|||
|
return choices + [[
|
|||
|
InlineKeyboardButton(group_config["msg_approve_manually"],
|
|||
|
callback_data=b"+"),
|
|||
|
InlineKeyboardButton(group_config["msg_refuse_manually"],
|
|||
|
callback_data=b"-"),
|
|||
|
]]
|
|||
|
|
|||
|
timeout = group_config["challenge_timeout"]
|
|||
|
reply_message = await client.send_photo(
|
|||
|
message.chat.id,
|
|||
|
photo = challenge.pu(),
|
|||
|
caption = group_config["msg_challenge"].format(target=target.first_name,
|
|||
|
target_id=target.id,
|
|||
|
timeout=timeout,
|
|||
|
challenge=challenge.qus()),
|
|||
|
reply_to_message_id=message.message_id,
|
|||
|
reply_markup=InlineKeyboardMarkup(
|
|||
|
generate_challenge_button(challenge)),
|
|||
|
)
|
|||
|
_me: User = await client.get_me()
|
|||
|
chat_id = message.chat.id
|
|||
|
chat_title = message.chat.title
|
|||
|
target = message.from_user.id
|
|||
|
await client.send_message(
|
|||
|
int(_channel), _config['msg_attempt_try'].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
))
|
|||
|
timeout_event = Timer(
|
|||
|
challenge_timeout(client, message.chat.id, message.from_user.id, message.chat.title,
|
|||
|
reply_message.message_id),
|
|||
|
timeout=group_config["challenge_timeout"],
|
|||
|
)
|
|||
|
_cch_lock.acquire()
|
|||
|
_current_challenges["{chat}|{msg}".format(
|
|||
|
chat=message.chat.id,
|
|||
|
msg=reply_message.message_id)] = (challenge, message.from_user.id,
|
|||
|
timeout_event)
|
|||
|
_cch_lock.release()
|
|||
|
|
|||
|
async def challenge_timeout(client: Client, chat_id, from_id, chat_title, reply_id):
|
|||
|
global _current_challenges
|
|||
|
_me: User = await client.get_me()
|
|||
|
group_config = _config.get(str(chat_id), _config["*"])
|
|||
|
|
|||
|
_cch_lock.acquire()
|
|||
|
del _current_challenges["{chat}|{msg}".format(chat=chat_id,
|
|||
|
msg=reply_id)]
|
|||
|
_cch_lock.release()
|
|||
|
|
|||
|
# TODO try catch
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id=chat_id,
|
|||
|
message_id=reply_id,
|
|||
|
caption=group_config["msg_challenge_timeout"],
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
await client.send_message(chat_id=_config["channel"],
|
|||
|
text=_config["msg_failed_timeout"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(from_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
groupid=str(chat_id)))
|
|||
|
if group_config["challenge_timeout_action"] == "ban":
|
|||
|
await client.kick_chat_member(chat_id, from_id)
|
|||
|
elif group_config["challenge_timeout_action"] == "kick":
|
|||
|
await client.kick_chat_member(chat_id, from_id)
|
|||
|
await client.unban_chat_member(chat_id, from_id)
|
|||
|
elif group_config["challenge_timeout_action"] == "mute":
|
|||
|
await client.restrict_chat_member(
|
|||
|
chat_id=chat_id,
|
|||
|
user_id=user_id,
|
|||
|
permissions=ChatPermissions(can_send_other_messages=False,
|
|||
|
can_send_messages=False,
|
|||
|
can_send_media_messages=False,
|
|||
|
can_add_web_page_previews=False,
|
|||
|
can_send_polls=False))
|
|||
|
else:
|
|||
|
pass
|
|||
|
|
|||
|
if group_config["delete_failed_challenge"]:
|
|||
|
Timer(
|
|||
|
client.delete_messages(chat_id, reply_id),
|
|||
|
group_config["delete_failed_challenge_interval"],
|
|||
|
)
|
|||
|
|
|||
|
@app.on_message(Filters.command("challenge"))
|
|||
|
async def challenge_user(client: Client, message: Message):
|
|||
|
admins = await client.get_chat_members(message.chat.id,
|
|||
|
filter="administrators")
|
|||
|
if not any([
|
|||
|
admin.user.id == message.from_user.id and
|
|||
|
(admin.status == "creator" or admin.can_restrict_members)
|
|||
|
for admin in admins
|
|||
|
]):
|
|||
|
target = message.from_user
|
|||
|
group_config = _config.get(str(message.chat.id), _config["*"])
|
|||
|
challenge = Challenge()
|
|||
|
|
|||
|
def generate_challenge_button(e):
|
|||
|
choices = []
|
|||
|
answers = []
|
|||
|
for c in e.choices():
|
|||
|
answers.append(
|
|||
|
InlineKeyboardButton(str(c),
|
|||
|
callback_data=bytes(
|
|||
|
str(c), encoding="utf-8")))
|
|||
|
choices.append(answers)
|
|||
|
return choices
|
|||
|
|
|||
|
timeout = group_config["challenge_timeout"]
|
|||
|
reply_message = await client.send_photo(
|
|||
|
message.chat.id,
|
|||
|
photo = challenge.pu(),
|
|||
|
caption = group_config["msg_challeng"].format(target=target.first_name,
|
|||
|
target_id=target.id,
|
|||
|
timeout=timeout,
|
|||
|
challenge=challenge.qus()),
|
|||
|
reply_to_message_id=message.message_id,
|
|||
|
reply_markup=InlineKeyboardMarkup(
|
|||
|
generate_challenge_button(challenge)),
|
|||
|
)
|
|||
|
_me: User = await client.get_me()
|
|||
|
chat_id = message.chat.id
|
|||
|
chat_title = message.chat.title
|
|||
|
target = message.from_user.id
|
|||
|
await client.send_message(
|
|||
|
int(_channel), _config['msg_challenge_try'].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(target),
|
|||
|
groupid=str(chat_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
))
|
|||
|
timeout_event = Timer(
|
|||
|
challeng_timeout(client, message.chat.id, message.from_user.id, message.chat.title,
|
|||
|
reply_message.message_id),
|
|||
|
timeout=group_config["challenge_timeout"],
|
|||
|
)
|
|||
|
_cch_lock.acquire()
|
|||
|
_current_challenges["{chat}|{msg}".format(
|
|||
|
chat=message.chat.id,
|
|||
|
msg=reply_message.message_id)] = (challenge, message.from_user.id,
|
|||
|
timeout_event)
|
|||
|
_cch_lock.release()
|
|||
|
|
|||
|
async def challeng_timeout(client: Client, chat_id, from_id, chat_title, reply_id):
|
|||
|
global _current_challenges
|
|||
|
_me: User = await client.get_me()
|
|||
|
group_config = _config.get(str(chat_id), _config["*"])
|
|||
|
|
|||
|
_cch_lock.acquire()
|
|||
|
del _current_challenges["{chat}|{msg}".format(chat=chat_id,
|
|||
|
msg=reply_id)]
|
|||
|
_cch_lock.release()
|
|||
|
|
|||
|
# TODO try catch
|
|||
|
await client.edit_message_caption(
|
|||
|
chat_id=chat_id,
|
|||
|
message_id=reply_id,
|
|||
|
caption=group_config["msg_challeng_timeout"],
|
|||
|
reply_markup=None,
|
|||
|
)
|
|||
|
await client.send_message(chat_id=_config["channel"],
|
|||
|
text=_config["msg_challenge_time"].format(
|
|||
|
botid=str(_me.id),
|
|||
|
targetuser=str(from_id),
|
|||
|
grouptitle=str(chat_title),
|
|||
|
groupid=str(chat_id)))
|
|||
|
|
|||
|
if group_config["delete_failed_challenge"]:
|
|||
|
Timer(
|
|||
|
client.delete_messages(chat_id, reply_id),
|
|||
|
group_config["delete_failed_challenge_interval"],
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
def _main():
|
|||
|
global _app, _channel, _start_message, _config
|
|||
|
load_config()
|
|||
|
_api_id = _config["api_id"]
|
|||
|
_api_hash = _config["api_hash"]
|
|||
|
_token = _config["token"]
|
|||
|
_channel = _config["channel"]
|
|||
|
_start_message = _config["msg_start_message"]
|
|||
|
_proxy_ip = _config["proxy_addr"].strip()
|
|||
|
_proxy_port = _config["proxy_port"].strip()
|
|||
|
if _proxy_ip and _proxy_port:
|
|||
|
_app = Client("bot",
|
|||
|
bot_token=_token,
|
|||
|
api_id=_api_id,
|
|||
|
api_hash=_api_hash,
|
|||
|
proxy=dict(hostname=_proxy_ip, port=int(_proxy_port)))
|
|||
|
else:
|
|||
|
_app = Client("bot",
|
|||
|
bot_token=_token,
|
|||
|
api_id=_api_id,
|
|||
|
api_hash=_api_hash)
|
|||
|
try:
|
|||
|
_update(_app)
|
|||
|
_app.run()
|
|||
|
except KeyboardInterrupt:
|
|||
|
quit()
|
|||
|
except Exception as e:
|
|||
|
logging.error(e)
|
|||
|
_main()
|
|||
|
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
_main()
|