🎉 Begin a project

This commit is contained in:
xtaodada 2022-03-25 00:08:26 +08:00
parent 7b3755ae9e
commit d89de2e1af
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
23 changed files with 928 additions and 0 deletions

121
.gitignore vendored Normal file
View File

@ -0,0 +1,121 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
config.ini
*session*
data/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pycharm
.idea/

54
ci.py Normal file
View File

@ -0,0 +1,54 @@
import json
from configparser import RawConfigParser
from os import sep, mkdir
from os.path import exists
import pyromod.listen
from pyrogram import Client
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from httpx import AsyncClient, get
from sqlitedict import SqliteDict
if not exists("data"):
mkdir("data")
sqlite = SqliteDict(f"data{sep}data.sqlite", encode=json.dumps, decode=json.loads, autocommit=True)
# data.sqlite 结构如下:
# {
# "room_id": {
# "msg_link": str,
# "subscribes": List[订阅id: int],
# },
# "update_time": str,
# }
# 读取配置文件
config = RawConfigParser()
config.read("config.ini")
bot_token: str = ""
admin_id: int = 0
channel_id: int = 0
bot_token = config.get("basic", "bot_token", fallback=bot_token)
admin_id = config.getint("basic", "admin", fallback=admin_id)
channel_id = config.getint("basic", "channel_id", fallback=channel_id)
""" Init httpx client """
# 使用自定义 UA
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
}
client = AsyncClient(timeout=10.0, headers=headers, follow_redirects=True)
# 自定义类型
class Bot:
def __init__(self, data: dict):
self.uid = data["id"]
self.username = data["username"]
self.name = data["first_name"]
me = Bot(get(f"https://api.telegram.org/bot{bot_token}/getme").json()["result"])
# 初始化客户端
scheduler = AsyncIOScheduler()
if not scheduler.running:
scheduler.configure(timezone="Asia/ShangHai")
scheduler.start()
app = Client("bot", bot_token=bot_token)

16
config.ini.example Normal file
View File

@ -0,0 +1,16 @@
[pyrogram]
api_id = 12345
api_hash = 0123456789abc0123456789abc
[basic]
bot_token = 111:abc
admin = 777000
channel_id = 0
[plugins]
root = plugins
[proxy]
enabled = False
hostname = 127.0.0.1
port = 1080

0
defs/decorators.py Normal file
View File

19
defs/format_time.py Normal file
View File

@ -0,0 +1,19 @@
import pytz
from datetime import datetime, timedelta
pytz.timezone("Asia/Shanghai")
date_format = "%Y/%m/%d %H:%M:%S"
def strf_time(data: int) -> str:
# data = 1648111686000
ts = datetime.fromtimestamp(data/1000)
return ts.strftime(date_format)
def now_time() -> str:
# UTC
ts = datetime.utcnow()
# UTC+8
ts = ts + timedelta(hours=8)
return ts.strftime(date_format)

48
defs/msg.py Normal file
View File

@ -0,0 +1,48 @@
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from defs.utils import Vtuber, TrackMessage
from defs.thumbnail import thumb
from ci import me
template = """
{}
<b>{}</b> 正在直播
<b>标题</b><code>{}</code>
<b>人气值</b><code>{}</code>
<b>开播时间</b><code>{}</code>
@DD_YTbs_Live_Tracker | @DD_YTbs_Bot
"""
def gen_button(data: Vtuber) -> InlineKeyboardMarkup:
data_ = [[InlineKeyboardButton("🔗️ 观看", url=data.room_link)],
[InlineKeyboardButton("主页", url=data.space_link),
InlineKeyboardButton(
"订阅",
url=f"https://t.me/{me.username}?start={data.mid}"), ]
]
return InlineKeyboardMarkup(data_)
def format_text(text: str) -> str:
text = text.strip()
for i in ["/", " ", "-", "@", "", ]:
text = text.replace(i, "_")
for i in ["", "", "[", "]", "", "(", ")", "`", "!", ]:
text = text.replace(i, "")
return text.strip()
def gen_tags(data: Vtuber) -> str:
return f"#id{data.mid} #{format_text(data.name.split()[0])} "
async def gen_update_msg(data: Vtuber) -> TrackMessage:
text = template.format(gen_tags(data), data.name, data.title, data.online,
data.liveStartTimeStr,)
button = gen_button(data)
img = await thumb(data.face, data.title, data.name)
return TrackMessage(text, button, img)

84
defs/source.py Normal file
View File

@ -0,0 +1,84 @@
from os import sep
from os.path import exists
from shutil import copyfile
from typing import List, Optional
from ci import client, sqlite
from json import load
from defs.format_time import now_time
from defs.utils import Vtuber
vtubers_info: dict[int:Vtuber] = {}
new_vtubers: List[int] = []
old_vtubers: List[int] = []
if exists(f"data{sep}info.json"):
with open(f"data{sep}info.json", "r", encoding="utf-8") as file:
temp_data = load(file)
for temp in temp_data:
temp_data_ = Vtuber(temp)
vtubers_info[temp_data_.room_id] = temp_data_
if exists(f"data{sep}vtubers.json"):
with open(f"data{sep}vtubers.json", "r", encoding="utf-8") as file:
temp_data = load(file)
new_vtubers = temp_data
if exists(f"data{sep}old_vtubers.json"):
with open(f"data{sep}old_vtubers.json", "r", encoding="utf-8") as file:
temp_data = load(file)
old_vtubers = temp_data
async def update_data() -> None:
global new_vtubers, old_vtubers
if exists(f"data{sep}vtubers.json"):
copyfile(f"data{sep}vtubers.json", f"data{sep}old_vtubers.json")
data = await client.get("https://api.tokyo.vtbs.moe/v1/living")
with open(f"data{sep}vtubers.json", "w", encoding="utf-8") as f:
f.write(data.text)
data = data.json()
old_vtubers = new_vtubers
new_vtubers = data
sqlite["update_time"] = now_time()
async def update_info() -> None:
global vtubers_info
data = await client.get("https://api.tokyo.vtbs.moe/v1/fullInfo")
with open(f"data{sep}info.json", "w", encoding="utf-8") as f:
f.write(data.text)
data = data.json()
for i in data:
data_ = Vtuber(i)
vtubers_info[data_.room_id] = data_
def compare() -> List[Vtuber]:
data = []
for i in new_vtubers:
if i not in old_vtubers:
data.append(vtubers_info[i])
return data
def from_name_to_v(name: str) -> Optional[Vtuber]:
try:
data = int(name)
except ValueError:
return None
return vtubers_info.get(data, None)
def from_list_to_name(data: List) -> str:
data_ = ""
for i in data:
v = vtubers_info.get(int(i), None)
if isinstance(v, Vtuber):
data_ += f"\n{v.name}"
return data_
def from_keyword_to_v(keyword: str) -> Optional[Vtuber]:
for value in vtubers_info.values():
data = str(value.mid) + value.name + str(value.room_id)
if keyword in data:
return value
return None

100
defs/subs.py Normal file
View File

@ -0,0 +1,100 @@
import traceback
from asyncio import sleep
from random import uniform
from pyrogram.errors import FloodWait, ButtonUrlInvalid, BadRequest
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from ci import app, sqlite, me
from defs.source import from_list_to_name
from defs.utils import Vtuber
subs_msg = """
<b>{} 正在直播</b>
<b>标题</b><code>{}</code>
<b>人气值</b><code>{}</code>
<b>开播时间</b><code>{}</code>
"""
subs_list_msg = """
<b>您订阅了</b>{}
"""
subs_list_no_msg = """
<b>您订阅了个寂寞</b>
"""
def gen_subs_button(data: Vtuber, link: str) -> InlineKeyboardMarkup:
data_ = [[InlineKeyboardButton("详情", url=link),
InlineKeyboardButton("退订",
url=f"https://t.me/{me.username}?start=un-{data.mid}"), ]]
return InlineKeyboardMarkup(data_)
def gen_back_button() -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([[InlineKeyboardButton("返回", callback_data="help"), ]])
def gen_subs_msg(cid: int) -> str:
data_ = []
for key, value in sqlite.items():
if key == "update_time":
continue
data = value.get("subscribes", [])
if cid in data:
data_.append(key)
if data_:
text = subs_list_msg.format(from_list_to_name(data_))
else:
text = subs_list_no_msg
return text
async def send_subs_msg(cid: int, data: Vtuber, link: str):
return await app.send_message(cid,
subs_msg.format(data.name, data.title,
data.online, data.liveStartTimeStr),
reply_markup=gen_subs_button(data, link))
async def send_to_subscribes(data: Vtuber):
users = sqlite.get(str(data.room_id), {}).get("subscribes", [])
link = sqlite.get(str(data.room_id), {}).get("msg_link", "https://t.me/DD_YTbs_Live_Tracker")
for i in users:
try:
await send_subs_msg(i, data, link)
except FloodWait as e:
print(f"Send subscribes msg flood - Sleep for {e.x} second(s)")
await sleep(uniform(0.5, 1.0))
await send_subs_msg(i, data, link)
except ButtonUrlInvalid:
print(f"Send button error")
await app.send_message(i, subs_msg.format(data.name, data.title,
data.online, data.liveStartTimeStr), )
except BadRequest:
users.remove(i)
except Exception as e:
traceback.print_exc()
sqlite[str(data.room_id)]["subscribes"] = users
def add_to_subs(cid: int, data: Vtuber) -> bool:
users = sqlite.get(str(data.room_id), {}).get("subscribes", [])
if cid not in users:
users.append(cid)
data_ = sqlite.get(str(data.room_id), {"subscribes": []})
data_["subscribes"] = users
sqlite[str(data.room_id)] = data_
return True
return False
def remove_from_subs(cid: int, data: Vtuber) -> bool:
users = sqlite.get(str(data.room_id), {}).get("subscribes", [])
if cid in users:
users.remove(cid)
data_ = sqlite.get(str(data.room_id), {"subscribes": []})
data_["subscribes"] = users
sqlite[str(data.room_id)] = data_
return True
return False

57
defs/thumbnail.py Normal file
View File

@ -0,0 +1,57 @@
import os
from io import BytesIO
from ci import client
from PIL import (
Image,
ImageDraw,
ImageFont,
)
def changeImageSize(maxWidth, maxHeight, image):
if image.size[0] == image.size[1]:
# Does not change the scale of the orientation image and displays it centered.
# It may look even better
newImage = image.resize((maxHeight, maxHeight))
img = Image.new("RGBA", (maxWidth, maxHeight))
img.paste(newImage, (int((maxWidth - maxHeight) / 2), 0))
return img
else:
widthRatio = maxWidth / image.size[0]
heightRatio = maxHeight / image.size[1]
newWidth = int(widthRatio * image.size[0])
newHeight = int(heightRatio * image.size[1])
newImage = image.resize((newWidth, newHeight))
return newImage
async def thumb(thumbnail, title, ctitle):
resp = await client.get(thumbnail)
if resp.status_code == 200:
image1 = Image.open(BytesIO(resp.content))
else:
return None
image2 = Image.open(f"source{os.sep}LightGreen.png")
image3 = changeImageSize(1280, 720, image1)
image4 = changeImageSize(1280, 720, image2)
image5 = image3.convert("RGBA")
image6 = image4.convert("RGBA")
Image.alpha_composite(image5, image6).save(f"data{os.sep}temp.png")
img = Image.open(f"data{os.sep}temp.png")
draw = ImageDraw.Draw(img)
font = ImageFont.truetype(f"source{os.sep}SourceHanSansCN-Regular-2.otf", 50)
font2 = ImageFont.truetype(f"source{os.sep}SourceHanSansCN-Medium-2.otf", 72)
draw.text(
(25, 615),
f"{title[:20]}...",
fill="black",
font=font2,
)
draw.text(
(27, 543),
f"{ctitle[:12]} 正在直播",
fill="black",
font=font,
)
img.save(f"data{os.sep}final.png")
return f"data{os.sep}final.png"

28
defs/utils.py Normal file
View File

@ -0,0 +1,28 @@
from typing import Optional
from defs.format_time import strf_time
class Vtuber:
def __init__(self, data: dict):
self.name: str = data["uname"]
self.mid: int = data["mid"]
self.space_link: str = f"https://space.bilibili.com/{self.mid}"
self.title: str = data["title"]
self.room_id: str = data["roomid"]
self.room_link: str = f"https://live.bilibili.com/{self.room_id}"
self.face: str = data["face"]
self.follower: int = data["follower"]
self.liveStatus: bool = data["liveStatus"]
self.online: Optional[int, bool] = data["online"]
self.notice: str = data["notice"].replace(r"\n", "\n")
self.time: int = data["time"]
self.timeStr: str = strf_time(self.time)
self.liveStartTime: int = data["liveStartTime"]
self.liveStartTimeStr: str = strf_time(self.liveStartTime)
class TrackMessage:
def __init__(self, text, button, img=None):
self.text = text
self.button = button
self.img = img

6
main.py Normal file
View File

@ -0,0 +1,6 @@
import logging
from ci import app
# 日志记录
logging.basicConfig(level=logging.INFO)
app.run()

26
plugins/callback.py Normal file
View File

@ -0,0 +1,26 @@
from pyrogram import Client, filters
from pyrogram.types import CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from defs.subs import gen_subs_msg, gen_back_button
from plugins.help import help_msg
@Client.on_callback_query(filters.regex("help"))
async def help_set(_, query: CallbackQuery):
await query.edit_message_text(
help_msg,
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("订阅", callback_data="subs")]]
),
disable_web_page_preview=True,
)
@Client.on_callback_query(filters.regex("subs"))
async def subs_set(_, query: CallbackQuery):
text = gen_subs_msg(query.from_user.id)
await query.edit_message_text(
text,
reply_markup=gen_back_button(),
disable_web_page_preview=True,
)

30
plugins/help.py Normal file
View File

@ -0,0 +1,30 @@
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
help_msg = """
下面是我学会了的指令列表
👩🏻💼 » /subscribe <code>space_id|昵称|room_id</code> - 订阅直播间
<code>/subscribe 5659864</code>
<code>/subscribe 鹿野灸</code>
<code>/subscribe 2064239</code>
👩🏻💼 » /unsubscribe <code>space_id|昵称|room_id</code> - 取消订阅直播间
👩🏻💼 » /subscription - 列出您当前的订阅
👩🏻💼 » /info <code>space_id|昵称|room_id</code> - 查询主播信息
"""
@Client.on_message(filters.incoming & filters.private &
filters.command(["help"]))
async def help_command(_: Client, message: Message):
await message.reply(
help_msg,
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("订阅", callback_data="subs")]]
),
disable_web_page_preview=True,
quote=True,
)

56
plugins/info.py Normal file
View File

@ -0,0 +1,56 @@
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from ci import sqlite, me
from defs.source import from_keyword_to_v
from defs.utils import Vtuber
from plugins.start import not_found_msg
info_help_msg = """
👩🏻💼 » /info <code>space_id|昵称|room_id</code> - 查询模块信息
<code>/info 5659864</code>
<code>/info 鹿野灸</code>
<code>/info 2064239</code>
"""
vtuber_msg = """
<b>{}</b>
<b>粉丝数</b><code>{}</code>
<b>更新时间</b><code>{}</code>
<b>通知</b>
<code>{}</code>
@DD_YTbs_Live_Tracker | @DD_YTbs_Bot
"""
def gen_info_button(data: Vtuber) -> InlineKeyboardMarkup:
msg_link = sqlite.get(str(data.room_id), {}).get("msg_link", "https://t.me/DD_YTbs_Live_Tracker")
data_ = [[InlineKeyboardButton("详情", url=msg_link),
InlineKeyboardButton("订阅",
url=f"https://t.me/{me.username}?start={data.mid}"), ]]
return InlineKeyboardMarkup(data_)
@Client.on_message(filters.incoming & filters.private &
filters.command(["info"]))
async def info_command(_: Client, message: Message):
if len(message.command) == 1:
await message.reply(info_help_msg, quote=True)
else:
data = " ".join(message.command[1:])
v = from_keyword_to_v(data)
if v:
await message.reply(
vtuber_msg.format(
v.name,
v.follower,
v.timeStr,
v.notice,
),
reply_markup=gen_info_button(v),
quote=True,
)
else:
await message.reply(not_found_msg.format(data), quote=True)

57
plugins/inline.py Normal file
View File

@ -0,0 +1,57 @@
from pyrogram import Client, emoji
from pyrogram.types import InlineQuery, InputTextMessageContent, InlineQueryResultArticle
from defs.source import vtubers_info
from plugins.info import vtuber_msg, gen_info_button
@Client.on_inline_query()
async def inline_process(_: Client, query: InlineQuery):
data = []
text = query.query.split()
nums = 0
if not vtubers_info:
return
data_ = vtubers_info
for key, v in data_.items():
if len(text) == 0:
data.append(InlineQueryResultArticle(
v.name,
InputTextMessageContent(vtuber_msg.format(
v.name,
v.follower,
v.timeStr,
v.notice,
)),
reply_markup=gen_info_button(v),
))
nums += 1
else:
name = str(v.mid) + v.name + str(v.room_id)
skip = False
for i in text:
if i not in name:
skip = True
if not skip:
data.append(InlineQueryResultArticle(
v.name,
InputTextMessageContent(vtuber_msg.format(
v.name,
v.follower,
v.timeStr,
v.notice,
)),
reply_markup=gen_info_button(v),
))
nums += 1
if nums >= 50:
break
if nums == 0:
return await query.answer(
results=[],
switch_pm_text=f'{emoji.CROSS_MARK} 字符串 "{" ".join(text)}" 没有搜索到任何结果',
switch_pm_parameter="help",
)
await query.answer(data,
switch_pm_text=f'{emoji.KEY} 搜索了 {len(vtubers_info.values())} 个 Vtuber',
switch_pm_parameter="help", )

8
plugins/ping.py Normal file
View File

@ -0,0 +1,8 @@
from pyrogram import Client, filters
from pyrogram.types import Message
@Client.on_message(filters.incoming & filters.private &
filters.command(["ping", ]))
async def ping_check(_: Client, message: Message):
await message.reply("poi ~", quote=True)

76
plugins/start.py Normal file
View File

@ -0,0 +1,76 @@
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from ci import me
from defs.source import from_name_to_v
from defs.subs import add_to_subs, remove_from_subs
des = """
你好{} 我是 [{}]({})一个为 BiliBili Vtuber 用户打造的一体化机器人
我可以帮助你获取 BiliBili Vtuber 的开播提醒和信息查询
点击下面的帮助按钮来查看使用方法
加入 [我的频道](https://t.me/DD_YTbs_Live_Tracker) 获取关于 BiliBili Vtuber 的所有开播提醒和公告
"""
unsub_msg = """
<b>成功退订了</b> <code>{}</code> <b>的开播提醒</b>
"""
not_sub_msg = """
<b>你好像没有订阅</b> <code>{}</code> <b>的开播提醒</b>
"""
sub_msg = """
<b>成功订阅了</b> <code>{}</code> <b>的开播提醒</b>
"""
already_sub_msg = """
<b>已经订阅过</b> <code>{}</code> <b>的开播提醒</b>
"""
not_found_msg = """
<b>没有找到名为</b> <code>{}</code> <b> Vtuber</b>
"""
def gen_help_button() -> InlineKeyboardMarkup:
data_ = [[InlineKeyboardButton("📢 官方频道", url="https://t.me/DD_YTbs_Live_Tracker"),
InlineKeyboardButton("💬 官方群组", url="https://t.me/Invite_Challenge_Bot?start=1"), ],
[InlineKeyboardButton("❓ 阅读帮助", callback_data="help")],
]
return InlineKeyboardMarkup(data_)
@Client.on_message(filters.incoming & filters.private &
filters.command(["start"]))
async def start_command(_: Client, message: Message):
"""
回应消息
"""
if len(message.command) == 1:
await message.reply(des.format(message.from_user.mention(),
me.name,
f"https://t.me/{me.username}"),
reply_markup=gen_help_button(),
quote=True, )
else:
data = message.command[1]
if data.startswith("un-"):
# 退订
name = data[3:]
data = from_name_to_v(name)
if data:
success = remove_from_subs(message.from_user.id, data)
if success:
await message.reply(unsub_msg.format(data.name), quote=True)
else:
await message.reply(not_sub_msg.format(data.name), quote=True)
else:
await message.reply(not_found_msg.format(name), quote=True)
else:
# 订阅
name = data
data = from_name_to_v(data)
if data:
success = add_to_subs(message.from_user.id, data)
if success:
await message.reply(sub_msg.format(data.name), quote=True)
else:
await message.reply(already_sub_msg.format(data.name), quote=True)
else:
await message.reply(not_found_msg.format(name), quote=True)

62
plugins/subs.py Normal file
View File

@ -0,0 +1,62 @@
from pyrogram import Client, filters
from pyrogram.types import Message
from defs.source import from_keyword_to_v
from defs.subs import gen_subs_msg, gen_back_button, add_to_subs, remove_from_subs
from plugins.start import sub_msg, not_found_msg, already_sub_msg, unsub_msg, not_sub_msg
sub_help_msg = """
👩🏻💼 » /subscribe <code>space_id|昵称|room_id</code> - 订阅直播间
<code>/subscribe 5659864</code>
<code>/subscribe 鹿野灸</code>
<code>/subscribe 2064239</code>
"""
unsub_help_msg = """
👩🏻💼 » /unsubscribe <code>space_id|昵称|room_id</code> - 取消订阅直播间
<code>/unsubscribe 5659864</code>
<code>/unsubscribe 鹿野灸</code>
<code>/unsubscribe 2064239</code>
"""
@Client.on_message(filters.incoming & filters.private &
filters.command(["subscription"]))
async def subscription_command(_: Client, message: Message):
text = gen_subs_msg(message.from_user.id)
await message.reply(text, reply_markup=gen_back_button(), quote=True, )
@Client.on_message(filters.incoming & filters.private &
filters.command(["subscribe"]))
async def sub_command(_: Client, message: Message):
if len(message.command) == 1:
await message.reply(sub_help_msg, reply_markup=gen_back_button(), quote=True)
else:
data = " ".join(message.command[1:])
module = from_keyword_to_v(data)
if module:
success = add_to_subs(message.from_user.id, module)
if success:
await message.reply(sub_msg.format(module.name), quote=True)
else:
await message.reply(already_sub_msg.format(module.name), quote=True)
else:
await message.reply(not_found_msg.format(data), quote=True)
@Client.on_message(filters.incoming & filters.private &
filters.command(["unsubscribe"]))
async def un_sub_command(_: Client, message: Message):
if len(message.command) == 1:
await message.reply(unsub_help_msg, reply_markup=gen_back_button(), quote=True)
else:
data = " ".join(message.command[1:])
module = from_keyword_to_v(data)
if module:
success = remove_from_subs(message.from_user.id, module)
if success:
await message.reply(unsub_msg.format(module.name), quote=True)
else:
await message.reply(not_sub_msg.format(module.name), quote=True)
else:
await message.reply(not_found_msg.format(data), quote=True)

72
plugins/track.py Normal file
View File

@ -0,0 +1,72 @@
import traceback
from asyncio import sleep
from random import uniform
from pyrogram.errors import FloodWait, ButtonUrlInvalid
from pyrogram.types import Message
from ci import app, scheduler, channel_id, admin_id, sqlite, client
from pyrogram import Client, filters
from defs.format_time import strf_time, now_time
from defs.msg import gen_update_msg
from defs.source import update_data, compare, update_info
from defs.subs import send_to_subscribes
async def send_track_msg(track_msg, no_button=False) -> Message:
button = None if no_button else track_msg.button
if track_msg.img:
return await app.send_photo(channel_id, track_msg.img, caption=track_msg.text,
parse_mode="html",
reply_markup=button)
return await app.send_message(channel_id, track_msg.text,
parse_mode="html",
reply_markup=button)
@scheduler.scheduled_job("cron", minute="*/10", id="0")
async def run_every_10_minute():
await update_data()
need_update = compare()
for i in need_update:
data = (await client.get(f"https://api.tokyo.vtbs.moe/v1/room/{i.room_id}")).json()
i.liveStartTime = data["live_time"]
if i.liveStartTime == 0:
i.liveStartTimeStr = now_time()
else:
i.liveStartTimeStr = strf_time(i.liveStartTime)
track_msg = await gen_update_msg(i)
msg = None
try:
msg = await send_track_msg(track_msg)
except FloodWait as e:
print(f"Send document flood - Sleep for {e.x} second(s)")
await sleep(e.x + uniform(0.5, 1.0))
msg = await send_track_msg(track_msg)
except ButtonUrlInvalid:
print(f"Send button error")
msg = await send_track_msg(track_msg, no_button=True)
except Exception as e:
traceback.print_exc()
await sleep(uniform(0.5, 2.0))
data_ = sqlite.get(str(i.room_id), {"msg_link": ""})
if msg:
data_["msg_link"] = msg.link
else:
data_["msg_link"] = "https://t.me/DD_YTbs_Live_Tracker"
sqlite[str(i.room_id)] = data_
await send_to_subscribes(i)
await sleep(uniform(0.5, 2.0))
@scheduler.scheduled_job("cron", hour="*/12", id="0")
async def run_every_12_hour():
await update_info()
@Client.on_message(filters.incoming & filters.private & filters.chat(admin_id) &
filters.command(["force_update", ]))
async def force_update(_: Client, __: Message):
await run_every_12_hour()
await run_every_10_minute()

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
Pyrogram>=1.4.9
Tgcrypto>=1.2.3
pyromod
httpx>=0.22.0
apscheduler>=3.8.1
sqlitedict>=2.0.0
pytz
pillow

BIN
source/LightGreen.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 350 KiB

Binary file not shown.

Binary file not shown.