BiliBili_YouTubers_Live_Tra.../ci.py

55 lines
1.6 KiB
Python
Raw Permalink Normal View History

2022-03-24 16:08:26 +00:00
import json
from configparser import RawConfigParser
from os import sep, mkdir
from os.path import exists
import pyromod.listen
from pyrogram import Client
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from httpx import AsyncClient, get
from sqlitedict import SqliteDict
if not exists("data"):
mkdir("data")
sqlite = SqliteDict(f"data{sep}data.sqlite", encode=json.dumps, decode=json.loads, autocommit=True)
# data.sqlite 结构如下:
# {
# "room_id": {
# "msg_link": str,
# "subscribes": List[订阅id: int],
# },
# "update_time": str,
# }
# 读取配置文件
config = RawConfigParser()
config.read("config.ini")
bot_token: str = ""
admin_id: int = 0
channel_id: int = 0
bot_token = config.get("basic", "bot_token", fallback=bot_token)
admin_id = config.getint("basic", "admin", fallback=admin_id)
channel_id = config.getint("basic", "channel_id", fallback=channel_id)
""" Init httpx client """
# 使用自定义 UA
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
}
client = AsyncClient(timeout=10.0, headers=headers, follow_redirects=True)
# 自定义类型
class Bot:
def __init__(self, data: dict):
self.uid = data["id"]
self.username = data["username"]
self.name = data["first_name"]
me = Bot(get(f"https://api.telegram.org/bot{bot_token}/getme").json()["result"])
# 初始化客户端
scheduler = AsyncIOScheduler()
if not scheduler.running:
scheduler.configure(timezone="Asia/ShangHai")
scheduler.start()
app = Client("bot", bot_token=bot_token)