🎉 Begin a project. (#1)

* 🎉 Begin
This commit is contained in:
Xtao_dada 2022-01-21 20:11:42 +08:00 committed by GitHub
parent 529d85e55e
commit a6b20a3186
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 418 additions and 0 deletions

7
.gitignore vendored
View File

@ -102,6 +102,7 @@ celerybeat.pid
*.sage.py
# Environments
.idea/
.env
.venv
env/
@ -127,3 +128,9 @@ dmypy.json
# Pyre type checker
.pyre/
# data
config.ini
bot.session
data.db
data.json

28
ci.py Normal file
View File

@ -0,0 +1,28 @@
import sqlite3
from os.path import exists
from configparser import RawConfigParser
from typing import Optional
from pyrogram import Client
# [basic]
BOT_TOKEN: Optional[str] = None
ADMINS = ""
try:
config = RawConfigParser()
config.read("config.ini")
# [basic]
BOT_TOKEN = config["basic"].get("bot_token")
ADMINS = config["basic"].get("admins", ADMINS).split(",")
except Exception as e:
raise RuntimeError(f"Read data from config.ini error: {e}")
# check data.db
if not exists("data.db"):
raise FileNotFoundError("data.db not found.")
# check data.json
if not exists("data.json"):
raise FileNotFoundError("data.json not found.")
app = Client("bot", bot_token=BOT_TOKEN)
with app:
me = app.get_me()

15
config.ini.example Normal file
View File

@ -0,0 +1,15 @@
[pyrogram]
api_id = 1111
api_hash = abcd
[plugins]
root = plugins
include =
handlers.command
handlers.invite
handlers.inline
[basic]
bot_token = 123:abc
# admins = 1234567,45634567
admins = 1234567

BIN
data.db.example Normal file

Binary file not shown.

1
data.json.example Normal file
View File

@ -0,0 +1 @@
{}

4
main.py Normal file
View File

@ -0,0 +1,4 @@
from ci import app, me
print(f"Bot @{me.username} 开始运行")
app.run()

207
plugins/defs.py Normal file
View File

@ -0,0 +1,207 @@
import sqlite3
import json
from pyrogram import Client
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, ChatMemberUpdated, ChatJoinRequest
from ci import me
def check_aff_id(aff: int):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from aff where id=?", (aff,))
data_ = cursor.fetchone()
conn.close()
return data_
def set_aff(uid: int, aff: int):
data_ = check_aff_id(aff)
if not data_:
return
with open("data.json", "r") as f:
data = json.load(f)
data[str(uid)] = aff
with open("data.json", "w") as f:
json.dump(data, f)
return data_[2]
def remove_aff(uid: int):
with open("data.json", "r") as f:
data = json.load(f)
try:
del data[str(uid)]
except KeyError:
return
with open("data.json", "w") as f:
json.dump(data, f)
def check_aff(uid: int):
with open("data.json", "r") as f:
data = json.load(f)
try:
return data[str(uid)]
except KeyError:
return False
def group_check(cid: int):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from link where cid=? and status=?", (cid, "active",))
data = cursor.fetchone()
conn.close()
if data:
return data
return False
async def get_aff(message: Message, uid: int, cid: int):
data = group_check(cid)
if not data:
return
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from aff where uid=? and cid=?", (uid, cid,))
data_ = cursor.fetchone()
conn.close()
if not data_:
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO aff VALUES (NULL,?,?)", (uid, cid))
conn.commit()
cursor.execute("select * from aff where uid=? and cid=?", (uid, cid,))
data_ = cursor.fetchone()
conn.close()
aff = data_[0]
await message.reply(f"您在当前群组的专属邀请链接是https://t.me/{me.username}?start={aff}", quote=True,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("分享给好友", switch_inline_query=f"{aff}")]
])
)
return
async def send_invite(message: Message, cid: int):
data = group_check(cid)
if not data:
return
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from link where cid=? and status=?", (cid, "active"))
data_ = cursor.fetchone()
conn.close()
if not data_:
await message.reply("暂无可用的邀请链接。", quote=True)
else:
await message.reply("请点击下方按钮申请入群。", reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("点击入群", url=data_[1])]
]), quote=True)
async def gen_link(client: Client, update: ChatMemberUpdated):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from link where cid=?", (update.chat.id,))
data_ = cursor.fetchone()
conn.close()
if data_:
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("update link set status = 'active' where cid=?", (update.chat.id,))
conn.commit()
conn.close()
else:
data = await client.create_chat_invite_link(update.chat.id, name="Bot", creates_join_request=True)
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO link VALUES (?,?,?)", (update.chat.id, data.invite_link, "active"))
conn.commit()
conn.close()
def invoke_link(cid: int):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from link where cid=?", (cid,))
data_ = cursor.fetchone()
conn.close()
if data_:
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("update link set status = 'stop' where cid=?", (cid,))
conn.commit()
conn.close()
async def invite_check(client: Client, request: ChatJoinRequest):
data = group_check(request.chat.id)
if not data:
return False
link = data[1] # noqa
if request.invite_link.invite_link != link:
return False
if not check_aff(request.from_user.id):
await client.decline_chat_join_request(request.chat.id, request.from_user.id)
return False
return True
def add_invite(cid: int, uid: int):
aff = check_aff(uid)
remove_aff(uid)
data = check_aff_id(aff)
uid = data[1]
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from count where uid=? and cid=?", (uid, cid))
data_ = cursor.fetchone()
conn.close()
if data_:
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("update count set count=? where uid=? and cid=?", (data_[2] + 1, uid, cid))
conn.commit()
conn.close()
else:
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO count VALUES (?,?,?)", (uid, cid, 1))
conn.commit()
conn.close()
def get_list(cid: int):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from count where cid=?", (cid,))
data_ = cursor.fetchall()
conn.close()
data = []
if data_:
data = {}
for i in data_:
if i[2] > 0:
data[i[0]] = i[2]
# 排序
data = sorted(data.items(), key=lambda x: x[1], reverse=True)
return data
def get_count(cid: int, uid: int):
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
cursor.execute("select * from count where uid=? and cid=?", (uid, cid))
data_ = cursor.fetchone()
conn.close()
if data_:
return data_[2]
return 0

View File

@ -0,0 +1,61 @@
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from ci import me
from plugins.defs import set_aff, get_aff, send_invite, get_list, get_count
HELP_MSG = """<b>Invite Challenge Bot</b>
/start - 查看此帮助信息
/ping - 我还活着吗
此项目开源于https://github.com/Xtao-Labs/Invite_Challenge_Bot"""
AFF_MSG = """请点击下方按钮获取专属邀请链接。"""
@Client.on_message(filters.command("start") & filters.private)
async def start_command(client: Client, message: Message):
if len(message.command) == 1:
return await message.reply(HELP_MSG, quote=True)
if not message.command[1].isnumeric():
if "get" in message.command[1]:
try:
chat_id = int(message.command[1].replace("get", ""))
except ValueError:
return
await get_aff(message, message.from_user.id, chat_id)
else:
await message.reply(HELP_MSG, quote=True)
return
aff_num = int(message.command[1])
chat_id = set_aff(message.from_user.id, aff_num)
if chat_id:
await send_invite(message, chat_id)
@Client.on_message(filters.command("ping") & filters.private)
async def ping_command(client: Client, message: Message):
await message.reply("pong~", quote=True)
@Client.on_message(filters.command(["aff", f"aff@{me.username}"]) & filters.group)
async def aff_command(client: Client, message: Message):
await message.reply(AFF_MSG, reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("点击申请", url=f"https://t.me/{me.username}?start=get{message.chat.id}")]
]))
@Client.on_message(filters.command(["affs", f"affs@{me.username}"]) & filters.group)
async def aff_list_command(client: Client, message: Message):
if not message.from_user:
return await message.reply("请先解除匿名模式。")
data = get_list(message.chat.id)
count = get_count(message.chat.id, message.from_user.id)
if not data:
return await message.reply("没有任何人邀请过人。")
text = []
for i in range(min(5, len(data))):
text.append(f"{i + 1}. <code>{data[i][0]}</code> (<code>{data[i][1]}</code>人)")
await message.reply(f"[您](tg://user?id={message.from_user.id})的邀请数为:<spoiler>{count}</spoiler>\n\n"
f"本群 AFF 排行如下:\n\n" + "\n".join(text))

View File

@ -0,0 +1,43 @@
from pyrogram import Client, emoji
from pyrogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardMarkup, \
InlineKeyboardButton
from ci import me
from plugins.defs import check_aff_id
@Client.on_inline_query()
async def answer_inline(client: Client, query: InlineQuery):
aff = 0
try:
aff = int(query.query)
except ValueError:
await query.answer(
results=[],
cache_time=0,
switch_pm_text=f'{emoji.CROSS_MARK} No aff for "{query.query}"',
switch_pm_parameter="okay",
)
data = check_aff_id(aff)
if not data:
await query.answer(
results=[],
cache_time=0,
switch_pm_text=f'{emoji.CROSS_MARK} No aff for "{query.query}"',
switch_pm_parameter="okay",
)
await query.answer(results=[
InlineQueryResultArticle(
title="点击邀请 Ta",
input_message_content=InputTextMessageContent(
"点击下方按钮进群"
),
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton(
"点我点我",
url=f"https://t.me/{me.username}?start={aff}"
)]
]
)
)])

View File

@ -0,0 +1,50 @@
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.types import ChatMemberUpdated, ChatJoinRequest, InlineKeyboardMarkup, InlineKeyboardButton
from plugins.defs import group_check, gen_link, invoke_link, invite_check, add_invite
from ci import me
START_MSG = """感谢您邀请我加入群组,我是 Invite Challenge Bot ,能够帮助您统计群组邀请数,**请先赋予我邀请用户权限以继续。**"""
ADMIN_MSG = """恭喜!我已经可以开始统计邀请数了。请需要邀请用户的成员点击下方按钮获取专属邀请链接。
同样你也可以发送 /aff 来生成此消息"""
UNADMIN_MSG = """呜呜呜 我已被撤销邀请用户权限,真的不要我了吗?"""
PUBLIC_MSG = """呜呜呜 公开群暂不支持此机器人。"""
@Client.on_chat_member_updated()
async def admin_get(client: Client, update: ChatMemberUpdated):
if update.chat.username:
invoke_link(update.chat.id)
await client.send_message(update.chat.id, PUBLIC_MSG)
await client.leave_chat(update.chat.id)
return
if not update.new_chat_member:
if update.old_chat_member.user.id == me.id:
invoke_link(update.chat.id)
return
if not update.old_chat_member:
if update.new_chat_member.user.id == me.id:
await client.send_message(update.chat.id, START_MSG)
return
if update.new_chat_member.can_invite_users and (not update.old_chat_member.can_invite_users):
await gen_link(client, update)
await client.send_message(update.chat.id, ADMIN_MSG, reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("点击申请", url=f"https://t.me/{me.username}?start=get{update.chat.id}")]
]))
elif (not update.new_chat_member.can_invite_users) and update.old_chat_member.can_invite_users:
invoke_link(update.chat.id)
await client.send_message(update.chat.id, UNADMIN_MSG)
@Client.on_chat_join_request()
async def apply_aff(client: Client, request: ChatJoinRequest):
if not group_check(request.chat.id):
return
data = await invite_check(client, request)
if not data:
return
await client.approve_chat_join_request(request.chat.id, request.from_user.id)
add_invite(request.chat.id, request.from_user.id)

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
pyrogram==1.3.6
tgcrypto>=1.2.3