Formatted-RSS-to-Telegram/FR2T/fr2t.py

244 lines
7.8 KiB
Python
Raw Normal View History

2021-05-27 14:14:43 +00:00
import copy
import copyreg
2021-05-26 09:32:39 +00:00
import datetime
import hashlib
2021-05-27 14:14:43 +00:00
import os
2021-05-26 09:32:39 +00:00
import re
import ssl
2021-05-27 14:14:43 +00:00
import time
from multiprocessing import Pool
import yaml
from jinja2 import Template
from pymongo import MongoClient
from .parser import rssParser, objParser
2021-05-27 15:20:59 +00:00
from .sender import editToTelegram, sendToTelegram
from .utils import escapeAll, escapeText
2021-05-26 09:32:39 +00:00
class FR2T:
def __init__(self, config_path, rss_path):
self.config_path = config_path
self.rss_path = rss_path
self.loadConfig()
def loadConfig(self):
2021-05-27 09:33:37 +00:00
with open(self.rss_path, "r", encoding="UTF-8") as c:
2021-05-26 09:32:39 +00:00
self.config = yaml.safe_load(c)
2021-05-27 09:33:37 +00:00
with open(self.config_path, "r", encoding="UTF-8") as c:
2021-05-26 09:32:39 +00:00
rss_config = yaml.safe_load(c)
2021-05-27 10:31:26 +00:00
self.database_url = os.getenv("DATABASE") or rss_config["database_url"]
self.expire_time = (
os.getenv("EXPIRE_TIME") or rss_config.get("expire_time") or "30d"
)
self.user_agent = (
os.getenv("USER-AGENT")
or rss_config.get("user-agent")
or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
)
2021-05-26 09:32:39 +00:00
self.telegram = rss_config["telegram"]
2021-05-26 09:32:39 +00:00
telegram_update = {}
for up in self.telegram:
up_v = os.getenv("TG_" + up.upper())
if up_v:
telegram_update[up] = up_v
self.telegram.update(telegram_update)
2021-05-27 10:31:26 +00:00
self.telegram["disable_notification"] = (
self.telegram.get("disable_notification") or "false"
)
self.telegram["disable_web_page_preview"] = (
self.telegram.get("disable_web_page_preview") or "false"
)
self.telegram["parse_mode"] = (
self.telegram.get("parse_mode") or "MarkdownV2"
)
2021-05-26 09:32:39 +00:00
def run(self):
def save_sslcontext(obj):
return obj.__class__, (obj.protocol,)
copyreg.pickle(ssl.SSLContext, save_sslcontext)
2021-05-27 16:42:10 +00:00
tmp_rss = []
for r in self.config["rss"]:
url = r.get("url")
if isinstance(url, str):
tmp_rss.append(r)
elif isinstance(url, list):
for u in url:
tmp_r = copy.deepcopy(r)
tmp_r["url"] = u
tmp_rss.append(tmp_r)
args = [
(r, self.telegram, self.database_url, self.user_agent)
2021-05-27 16:42:10 +00:00
for r in tmp_rss
]
2021-05-26 09:32:39 +00:00
with Pool(8) as p:
p.map(mixInput, args)
print("Finished!")
def purge(self):
now_time = datetime.datetime.now()
days = hours = 0
if self.expire_time.endswith("y"):
days = int(self.expire_time.strip("y")) * 365
if self.expire_time.endswith("m"):
days = int(self.expire_time.strip("m")) * 30
if self.expire_time.endswith("d"):
days = int(self.expire_time.strip("d"))
if self.expire_time.endswith("h"):
2021-06-26 08:39:25 +00:00
hours = int(self.expire_time.strip("h"))
2021-05-26 09:32:39 +00:00
expired_time = now_time - datetime.timedelta(days=days, hours=hours)
expired_timestamp = datetime.datetime.timestamp(expired_time)
deleted_num = 0
client = MongoClient(self.database_url)
db = client["RSS"]
col_list = db.list_collection_names()
for col_name in col_list:
print(f"开始清理: {col_name}")
col = db[col_name]
purge_rule = {"create_time": {"$lt": expired_timestamp}}
2021-05-26 09:32:39 +00:00
deleted_result = col.delete_many(purge_rule)
deleted_num += deleted_result.deleted_count
print(f"已删除 {deleted_num} 个记录!")
def mixInput(mix_args):
runProcess(*mix_args)
2021-05-26 09:32:39 +00:00
def runProcess(rss, telegram, database_url, user_agent):
2021-05-26 09:32:39 +00:00
client = MongoClient(database_url)
db = client["RSS"]
2021-05-27 16:42:10 +00:00
url = rss["url"]
2021-05-26 09:32:39 +00:00
rss_content = rssParser(url, user_agent)
2021-05-26 09:32:39 +00:00
if not rss_content:
expired_url = db["Expire"].find_one({"url": url})
if expired_url:
if expired_url["expired"] > 10:
msg = escapeText(telegram["parse_mode"], url)
print(f"订阅 {url} 已失效")
sendToTelegram(telegram, f"订阅 {msg} 已失效\n\n\#提醒")
else:
db["Expire"].update_one(
{"_id": expired_url["_id"]},
{"$set": {"expired": expired_url["expired"] + 1}},
)
else:
2021-05-27 10:31:26 +00:00
db["Expire"].insert_one({"url": url, "expired": 1})
2021-05-26 09:32:39 +00:00
else:
db["Expire"].update_one(
{"url": url},
{"$set": {"expired": 0}},
)
2021-05-26 09:32:39 +00:00
for content in rss_content:
result = {}
2021-05-27 10:31:26 +00:00
if rss.get("rules"):
for rule in rss["rules"]:
obj = objParser(content, rule["obj"])
if not rule.get("type") or rule["type"] == "regex":
matcher = re.compile(rule["matcher"])
matched = matcher.search(obj)
if len(matched.groups()) == 1:
matched = matched.groups()[0]
else:
tmp_matched = list(matched.groups())
tmp_matched.insert(0, matched.group())
matched = tmp_matched
result[rule["dest"]] = matched
elif rule["type"] == "func":
loc = locals()
tmp_func = rule["matcher"] + "\ntmp_return = func(obj)\n"
exec(tmp_func)
result[rule["dest"]] = loc["tmp_return"]
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
send = True
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
if rss.get("filters"):
for filter in rss["filters"]:
obj = objParser(content, filter["obj"])
if re.search(filter["matcher"], obj):
send = False
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
if send:
template = Template(rss["text"])
2021-05-26 09:32:39 +00:00
2021-05-27 14:14:43 +00:00
args = dict(
**result, **content, rss_name=rss["name"], rss_url=rss["url"]
)
2021-05-27 12:40:18 +00:00
escapeAll(telegram["parse_mode"], args)
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
text = template.render(args)
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
id1_hash = hashlib.md5(url.encode()).hexdigest()
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
id2 = content.get("id") or content.get("guid") or content.get("link")
id2_hash = hashlib.md5(id2.encode()).hexdigest()
2021-05-26 09:32:39 +00:00
2021-05-27 12:40:18 +00:00
id = id1_hash + id2_hash
tmp_tg = copy.deepcopy(telegram)
if rss.get("telegram"):
tmp_tg.update(rss["telegram"])
handleText(rss["name"], id, text, tmp_tg, db)
2021-05-26 09:32:39 +00:00
def handleText(name, id, text, tg, db):
text_hash = hashlib.md5(text.encode()).hexdigest()
text_posted = db[name].find_one({"text": text_hash})
if not text_posted:
id_posted = db[name].find_one({"id": id})
if id_posted:
if editToTelegram(tg, id_posted["message"], text):
2021-05-27 10:31:26 +00:00
db[name].update_one(
{"_id": id_posted["_id"]},
2021-05-26 09:32:39 +00:00
{"$set": {"text": text_hash, "edit_time": time.time()}},
)
2021-05-27 09:33:37 +00:00
print(
"Edited 1 message: ID {} TEXT {} in {}".format(
id_posted["message"], text_hash, name
)
)
2021-05-26 09:32:39 +00:00
else:
message_id = sendToTelegram(tg, text)
if message_id:
2021-05-27 10:31:26 +00:00
db[name].insert_one(
{
"id": id,
"message": message_id,
"text": text_hash,
"create_time": time.time(),
"edit_time": time.time(),
}
)
2021-05-26 09:32:39 +00:00
print(f"Sent 1 message: {text_hash} in {name}")