# Environments
# Pyre type checker
+# data
+import sqlite3
+from os.path import exists
+from configparser import RawConfigParser
+from typing import Optional
+from pyrogram import Client
+# [basic]
+BOT_TOKEN: Optional[str] = None
+ADMINS = ""
+ config = RawConfigParser()
+ config.read("config.ini")
+ # [basic]
+ BOT_TOKEN = config["basic"].get("bot_token")
+ ADMINS = config["basic"].get("admins", ADMINS).split(",")
+except Exception as e:
+ raise RuntimeError(f"Read data from config.ini error: {e}")
+# check data.db
+if not exists("data.db"):
+ raise FileNotFoundError("data.db not found.")
+# check data.json
+if not exists("data.json"):
+ raise FileNotFoundError("data.json not found.")
+app = Client("bot", bot_token=BOT_TOKEN)
+with app:
+ me = app.get_me()
+api_id = 1111
+api_hash = abcd
+root = plugins
+include =
+ handlers.command
+ handlers.invite
+ handlers.inline
+bot_token = 123:abc
+# admins = 1234567,45634567
+admins = 1234567
+from ci import app, me
+print(f"Bot @{me.username} 开始运行")
+import sqlite3
+import json
+from pyrogram import Client
+from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, ChatMemberUpdated, ChatJoinRequest
+from ci import me
+def check_aff_id(aff: int):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from aff where id=?", (aff,))
+ data_ = cursor.fetchone()
+ conn.close()
+ return data_
+def set_aff(uid: int, aff: int):
+ data_ = check_aff_id(aff)
+ if not data_:
+ return
+ with open("data.json", "r") as f:
+ data = json.load(f)
+ data[str(uid)] = aff
+ with open("data.json", "w") as f:
+ json.dump(data, f)
+ return data_[2]
+def remove_aff(uid: int):
+ with open("data.json", "r") as f:
+ data = json.load(f)
+ try:
+ del data[str(uid)]
+ except KeyError:
+ return
+ with open("data.json", "w") as f:
+ json.dump(data, f)
+def check_aff(uid: int):
+ with open("data.json", "r") as f:
+ data = json.load(f)
+ try:
+ return data[str(uid)]
+ except KeyError:
+ return False
+def group_check(cid: int):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from link where cid=? and status=?", (cid, "active",))
+ data = cursor.fetchone()
+ conn.close()
+ if data:
+ return data
+ return False
+async def get_aff(message: Message, uid: int, cid: int):
+ data = group_check(cid)
+ if not data:
+ return
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from aff where uid=? and cid=?", (uid, cid,))
+ data_ = cursor.fetchone()
+ conn.close()
+ if not data_:
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("INSERT INTO aff VALUES (NULL,?,?)", (uid, cid))
+ conn.commit()
+ cursor.execute("select * from aff where uid=? and cid=?", (uid, cid,))
+ data_ = cursor.fetchone()
+ conn.close()
+ aff = data_[0]
+ await message.reply(f"您在当前群组的专属邀请链接是:https://t.me/{me.username}?start={aff}", quote=True,
+ reply_markup=InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton("分享给好友", switch_inline_query=f"{aff}")]
+ ])
+ )
+ return
+async def send_invite(message: Message, cid: int):
+ data = group_check(cid)
+ if not data:
+ return
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from link where cid=? and status=?", (cid, "active"))
+ data_ = cursor.fetchone()
+ conn.close()
+ if not data_:
+ await message.reply("暂无可用的邀请链接。", quote=True)
+ else:
+ await message.reply("请点击下方按钮申请入群。", reply_markup=InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton("点击入群", url=data_[1])]
+ ]), quote=True)
+async def gen_link(client: Client, update: ChatMemberUpdated):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from link where cid=?", (update.chat.id,))
+ data_ = cursor.fetchone()
+ conn.close()
+ if data_:
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("update link set status = 'active' where cid=?", (update.chat.id,))
+ conn.commit()
+ conn.close()
+ else:
+ data = await client.create_chat_invite_link(update.chat.id, name="Bot", creates_join_request=True)
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("INSERT INTO link VALUES (?,?,?)", (update.chat.id, data.invite_link, "active"))
+ conn.commit()
+ conn.close()
+def invoke_link(cid: int):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from link where cid=?", (cid,))
+ data_ = cursor.fetchone()
+ conn.close()
+ if data_:
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("update link set status = 'stop' where cid=?", (cid,))
+ conn.commit()
+ conn.close()
+async def invite_check(client: Client, request: ChatJoinRequest):
+ data = group_check(request.chat.id)
+ if not data:
+ return False
+ link = data[1] # noqa
+ if request.invite_link.invite_link != link:
+ return False
+ if not check_aff(request.from_user.id):
+ await client.decline_chat_join_request(request.chat.id, request.from_user.id)
+ return False
+ return True
+def add_invite(cid: int, uid: int):
+ aff = check_aff(uid)
+ remove_aff(uid)
+ data = check_aff_id(aff)
+ uid = data[1]
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from count where uid=? and cid=?", (uid, cid))
+ data_ = cursor.fetchone()
+ conn.close()
+ if data_:
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("update count set count=? where uid=? and cid=?", (data_[2] + 1, uid, cid))
+ conn.commit()
+ conn.close()
+ else:
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("INSERT INTO count VALUES (?,?,?)", (uid, cid, 1))
+ conn.commit()
+ conn.close()
+def get_list(cid: int):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from count where cid=?", (cid,))
+ data_ = cursor.fetchall()
+ conn.close()
+ data = []
+ if data_:
+ data = {}
+ for i in data_:
+ if i[2] > 0:
+ data[i[0]] = i[2]
+ # 排序
+ data = sorted(data.items(), key=lambda x: x[1], reverse=True)
+ return data
+def get_count(cid: int, uid: int):
+ conn = sqlite3.connect("data.db")
+ cursor = conn.cursor()
+ cursor.execute("select * from count where uid=? and cid=?", (uid, cid))
+ data_ = cursor.fetchone()
+ conn.close()
+ if data_:
+ return data_[2]
+ return 0
+from pyrogram import Client, filters
+from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
+from ci import me
+from plugins.defs import set_aff, get_aff, send_invite, get_list, get_count
+HELP_MSG = """Invite Challenge Bot
+/start - 查看此帮助信息
+/ping - 我还活着吗?
+AFF_MSG = """请点击下方按钮获取专属邀请链接。"""
+@Client.on_message(filters.command("start") & filters.private)
+async def start_command(client: Client, message: Message):
+ if len(message.command) == 1:
+ return await message.reply(HELP_MSG, quote=True)
+ if not message.command[1].isnumeric():
+ if "get" in message.command[1]:
+ try:
+ chat_id = int(message.command[1].replace("get", ""))
+ except ValueError:
+ return
+ await get_aff(message, message.from_user.id, chat_id)
+ else:
+ await message.reply(HELP_MSG, quote=True)
+ return
+ aff_num = int(message.command[1])
+ chat_id = set_aff(message.from_user.id, aff_num)
+ if chat_id:
+ await send_invite(message, chat_id)
+@Client.on_message(filters.command("ping") & filters.private)
+async def ping_command(client: Client, message: Message):
+ await message.reply("pong~", quote=True)
+@Client.on_message(filters.command(["aff", f"aff@{me.username}"]) & filters.group)
+async def aff_command(client: Client, message: Message):
+ await message.reply(AFF_MSG, reply_markup=InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton("点击申请", url=f"https://t.me/{me.username}?start=get{message.chat.id}")]
+ ]))
+@Client.on_message(filters.command(["affs", f"affs@{me.username}"]) & filters.group)
+async def aff_list_command(client: Client, message: Message):
+ if not message.from_user:
+ return await message.reply("请先解除匿名模式。")
+ data = get_list(message.chat.id)
+ count = get_count(message.chat.id, message.from_user.id)
+ if not data:
+ return await message.reply("没有任何人邀请过人。")
+ text = []
+ for i in range(min(5, len(data))):
+ text.append(f"{i + 1}. {data[i][0]}
+ await message.reply(f"[您](tg://user?id={message.from_user.id})的邀请数为:{count}\n\n"
+ f"本群 AFF 排行如下:\n\n" + "\n".join(text))
+from pyrogram import Client, emoji
+from pyrogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardMarkup, \
+ InlineKeyboardButton
+from ci import me
+from plugins.defs import check_aff_id
+async def answer_inline(client: Client, query: InlineQuery):
+ aff = 0
+ try:
+ aff = int(query.query)
+ except ValueError:
+ await query.answer(
+ results=[],
+ cache_time=0,
+ switch_pm_text=f'{emoji.CROSS_MARK} No aff for "{query.query}"',
+ switch_pm_parameter="okay",
+ )
+ data = check_aff_id(aff)
+ if not data:
+ await query.answer(
+ results=[],
+ cache_time=0,
+ switch_pm_text=f'{emoji.CROSS_MARK} No aff for "{query.query}"',
+ switch_pm_parameter="okay",
+ )
+ await query.answer(results=[
+ InlineQueryResultArticle(
+ title="点击邀请 Ta",
+ input_message_content=InputTextMessageContent(
+ "点击下方按钮进群"
+ ),
+ reply_markup=InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton(
+ "点我点我",
+ url=f"https://t.me/{me.username}?start={aff}"
+ )]
+ ]
+ )
+ )])
+from pyrogram import Client, filters, ContinuePropagation
+from pyrogram.types import ChatMemberUpdated, ChatJoinRequest, InlineKeyboardMarkup, InlineKeyboardButton
+from plugins.defs import group_check, gen_link, invoke_link, invite_check, add_invite
+from ci import me
+START_MSG = """感谢您邀请我加入群组,我是 Invite Challenge Bot ,能够帮助您统计群组邀请数,**请先赋予我邀请用户权限以继续。**"""
+ADMIN_MSG = """恭喜!我已经可以开始统计邀请数了。请需要邀请用户的成员点击下方按钮获取专属邀请链接。
+同样你也可以发送 /aff 来生成此消息。"""
+UNADMIN_MSG = """呜呜呜 我已被撤销邀请用户权限,真的不要我了吗?"""
+PUBLIC_MSG = """呜呜呜 公开群暂不支持此机器人。"""
+async def admin_get(client: Client, update: ChatMemberUpdated):
+ if update.chat.username:
+ invoke_link(update.chat.id)
+ await client.send_message(update.chat.id, PUBLIC_MSG)
+ await client.leave_chat(update.chat.id)
+ return
+ if not update.new_chat_member:
+ if update.old_chat_member.user.id == me.id:
+ invoke_link(update.chat.id)
+ return
+ if not update.old_chat_member:
+ if update.new_chat_member.user.id == me.id:
+ await client.send_message(update.chat.id, START_MSG)
+ return
+ if update.new_chat_member.can_invite_users and (not update.old_chat_member.can_invite_users):
+ await gen_link(client, update)
+ await client.send_message(update.chat.id, ADMIN_MSG, reply_markup=InlineKeyboardMarkup(
+ [
+ [InlineKeyboardButton("点击申请", url=f"https://t.me/{me.username}?start=get{update.chat.id}")]
+ ]))
+ elif (not update.new_chat_member.can_invite_users) and update.old_chat_member.can_invite_users:
+ invoke_link(update.chat.id)
+ await client.send_message(update.chat.id, UNADMIN_MSG)
+async def apply_aff(client: Client, request: ChatJoinRequest):
+ if not group_check(request.chat.id):
+ return
+ data = await invite_check(client, request)
+ if not data:
+ return
+ await client.approve_chat_join_request(request.chat.id, request.from_user.id)
+ add_invite(request.chat.id, request.from_user.id)
\ No newline at end of file