Formatted-RSS-to-Telegram/FR2T/fr2t.py
Apocalypsor ec83c15da3
Fix
2021-06-26 16:39:25 +08:00

244 lines
7.8 KiB
Python

import copy
import copyreg
import datetime
import hashlib
import os
import re
import ssl
import time
from multiprocessing import Pool
import yaml
from jinja2 import Template
from pymongo import MongoClient
from .parser import rssParser, objParser
from .sender import editToTelegram, sendToTelegram
from .utils import escapeAll, escapeText
class FR2T:
def __init__(self, config_path, rss_path):
self.config_path = config_path
self.rss_path = rss_path
self.loadConfig()
def loadConfig(self):
with open(self.rss_path, "r", encoding="UTF-8") as c:
self.config = yaml.safe_load(c)
with open(self.config_path, "r", encoding="UTF-8") as c:
rss_config = yaml.safe_load(c)
self.database_url = os.getenv("DATABASE") or rss_config["database_url"]
self.expire_time = (
os.getenv("EXPIRE_TIME") or rss_config.get("expire_time") or "30d"
)
self.user_agent = (
os.getenv("USER-AGENT")
or rss_config.get("user-agent")
or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
)
self.telegram = rss_config["telegram"]
telegram_update = {}
for up in self.telegram:
up_v = os.getenv("TG_" + up.upper())
if up_v:
telegram_update[up] = up_v
self.telegram.update(telegram_update)
self.telegram["disable_notification"] = (
self.telegram.get("disable_notification") or "false"
)
self.telegram["disable_web_page_preview"] = (
self.telegram.get("disable_web_page_preview") or "false"
)
self.telegram["parse_mode"] = (
self.telegram.get("parse_mode") or "MarkdownV2"
)
def run(self):
def save_sslcontext(obj):
return obj.__class__, (obj.protocol,)
copyreg.pickle(ssl.SSLContext, save_sslcontext)
tmp_rss = []
for r in self.config["rss"]:
url = r.get("url")
if isinstance(url, str):
tmp_rss.append(r)
elif isinstance(url, list):
for u in url:
tmp_r = copy.deepcopy(r)
tmp_r["url"] = u
tmp_rss.append(tmp_r)
args = [
(r, self.telegram, self.database_url, self.user_agent)
for r in tmp_rss
]
with Pool(8) as p:
p.map(mixInput, args)
print("Finished!")
def purge(self):
now_time = datetime.datetime.now()
days = hours = 0
if self.expire_time.endswith("y"):
days = int(self.expire_time.strip("y")) * 365
if self.expire_time.endswith("m"):
days = int(self.expire_time.strip("m")) * 30
if self.expire_time.endswith("d"):
days = int(self.expire_time.strip("d"))
if self.expire_time.endswith("h"):
hours = int(self.expire_time.strip("h"))
expired_time = now_time - datetime.timedelta(days=days, hours=hours)
expired_timestamp = datetime.datetime.timestamp(expired_time)
deleted_num = 0
client = MongoClient(self.database_url)
db = client["RSS"]
col_list = db.list_collection_names()
for col_name in col_list:
print(f"开始清理: {col_name}")
col = db[col_name]
purge_rule = {"create_time": {"$lt": expired_timestamp}}
deleted_result = col.delete_many(purge_rule)
deleted_num += deleted_result.deleted_count
print(f"已删除 {deleted_num} 个记录!")
def mixInput(mix_args):
runProcess(*mix_args)
def runProcess(rss, telegram, database_url, user_agent):
client = MongoClient(database_url)
db = client["RSS"]
url = rss["url"]
rss_content = rssParser(url, user_agent)
if not rss_content:
expired_url = db["Expire"].find_one({"url": url})
if expired_url:
if expired_url["expired"] > 10:
msg = escapeText(telegram["parse_mode"], url)
print(f"订阅 {url} 已失效")
sendToTelegram(telegram, f"订阅 {msg} 已失效\n\n\#提醒")
else:
db["Expire"].update_one(
{"_id": expired_url["_id"]},
{"$set": {"expired": expired_url["expired"] + 1}},
)
else:
db["Expire"].insert_one({"url": url, "expired": 1})
else:
db["Expire"].update_one(
{"url": url},
{"$set": {"expired": 0}},
)
for content in rss_content:
result = {}
if rss.get("rules"):
for rule in rss["rules"]:
obj = objParser(content, rule["obj"])
if not rule.get("type") or rule["type"] == "regex":
matcher = re.compile(rule["matcher"])
matched = matcher.search(obj)
if len(matched.groups()) == 1:
matched = matched.groups()[0]
else:
tmp_matched = list(matched.groups())
tmp_matched.insert(0, matched.group())
matched = tmp_matched
result[rule["dest"]] = matched
elif rule["type"] == "func":
loc = locals()
tmp_func = rule["matcher"] + "\ntmp_return = func(obj)\n"
exec(tmp_func)
result[rule["dest"]] = loc["tmp_return"]
send = True
if rss.get("filters"):
for filter in rss["filters"]:
obj = objParser(content, filter["obj"])
if re.search(filter["matcher"], obj):
send = False
if send:
template = Template(rss["text"])
args = dict(
**result, **content, rss_name=rss["name"], rss_url=rss["url"]
)
escapeAll(telegram["parse_mode"], args)
text = template.render(args)
id1_hash = hashlib.md5(url.encode()).hexdigest()
id2 = content.get("id") or content.get("guid") or content.get("link")
id2_hash = hashlib.md5(id2.encode()).hexdigest()
id = id1_hash + id2_hash
tmp_tg = copy.deepcopy(telegram)
if rss.get("telegram"):
tmp_tg.update(rss["telegram"])
handleText(rss["name"], id, text, tmp_tg, db)
def handleText(name, id, text, tg, db):
text_hash = hashlib.md5(text.encode()).hexdigest()
text_posted = db[name].find_one({"text": text_hash})
if not text_posted:
id_posted = db[name].find_one({"id": id})
if id_posted:
if editToTelegram(tg, id_posted["message"], text):
db[name].update_one(
{"_id": id_posted["_id"]},
{"$set": {"text": text_hash, "edit_time": time.time()}},
)
print(
"Edited 1 message: ID {} TEXT {} in {}".format(
id_posted["message"], text_hash, name
)
)
else:
message_id = sendToTelegram(tg, text)
if message_id:
db[name].insert_one(
{
"id": id,
"message": message_id,
"text": text_hash,
"create_time": time.time(),
"edit_time": time.time(),
}
)
print(f"Sent 1 message: {text_hash} in {name}")