PagerMaid-Modify/pagermaid/__init__.py

247 lines
7.2 KiB
Python
Raw Permalink Normal View History

""" PagerMaid initialization. """
from subprocess import run, PIPE
from datetime import datetime
from time import time
2022-03-13 12:21:32 +00:00
from os import getcwd, makedirs, environ
from os.path import exists
from sys import version_info, platform
from yaml import load, FullLoader
from json import load as load_json
from shutil import copyfile
from redis import StrictRedis
from logging import getLogger, INFO, DEBUG, ERROR, StreamHandler, basicConfig
from distutils.util import strtobool
from coloredlogs import ColoredFormatter
from apscheduler.schedulers.asyncio import AsyncIOScheduler
2020-08-08 16:31:48 +00:00
from telethon import TelegramClient
from telethon.sessions import StringSession
from languages.languages import Lang
persistent_vars = {}
module_dir = __path__[0]
working_dir = getcwd()
config = None
help_messages = {}
scheduler = AsyncIOScheduler()
if not scheduler.running:
scheduler.configure(timezone="Asia/ShangHai")
scheduler.start()
version = 0.1
logs = getLogger(__name__)
logging_format = "%(levelname)s [%(asctime)s] [%(name)s] %(message)s"
logging_handler = StreamHandler()
logging_handler.setFormatter(ColoredFormatter(logging_format))
2021-03-03 17:03:48 +00:00
root_logger = getLogger()
root_logger.setLevel(ERROR)
2021-03-03 17:03:48 +00:00
root_logger.addHandler(logging_handler)
basicConfig(level=INFO)
logs.setLevel(INFO)
try:
config = load(open(r"config.yml"), Loader=FullLoader)
except FileNotFoundError:
logs.fatal("The configuration file does not exist, and a new configuration file is being generated.")
copyfile(f"{module_dir}/assets/config.gen.yml", "config.yml")
exit(1)
# i18n
language = Lang(config["application_language"])
# alias
alias_dict: dict = {}
if exists("data/alias.json"):
try:
with open("data/alias.json", encoding="utf-8") as f:
alias_dict = load_json(f)
except Exception as e:
print("Reading alias file failed")
print(e)
exit(1)
def lang(text: str) -> str:
""" i18n """
return language.get(text)
analytics = None
2021-07-18 07:52:11 +00:00
try:
allow_analytics = strtobool(config.get('allow_analytic', 'True'))
2021-07-18 07:52:11 +00:00
except ValueError:
allow_analytics = True
if allow_analytics:
import analytics
analytics.write_key = 'EI5EyxFl8huwAvv932Au7XoRSdZ63wC4'
analytics = analytics
if strtobool(config['debug']):
logs.setLevel(DEBUG)
else:
logs.setLevel(INFO)
if platform == "linux" or platform == "linux2" or platform == "darwin" or platform == "freebsd7" \
or platform == "freebsd8" or platform == "freebsdN" or platform == "openbsd6":
logs.info(
lang('platform') + platform + lang('platform_load')
)
else:
logs.error(
f"{lang('error_prefix')} {lang('platform')}" + platform + lang('platform_unsupported')
)
exit(1)
if version_info[0] < 3 or version_info[1] < 6:
logs.error(
f"{lang('error_prefix')} {lang('python')}"
)
exit(1)
if not exists(f"{getcwd()}/data"):
makedirs(f"{getcwd()}/data")
api_key = config['api_key']
api_hash = config['api_hash']
session_string = "pagermaid"
# environ
if environ.get('api_key'):
api_key = environ.get('api_key')
if environ.get('api_hash'):
api_hash = environ.get('api_hash')
if environ.get('session'):
string_session = environ.get('session')
session_string = StringSession(string_session)
# api type
try:
api_key = int(api_key)
except ValueError:
logs.info(
lang('config_error')
)
exit(1)
except:
pass
proxy_addr = config.get('proxy_addr', '').strip()
proxy_port = config.get('proxy_port', '').strip()
http_addr = config.get('http_addr', '').strip()
http_port = config.get('http_port', '').strip()
mtp_addr = config.get('mtp_addr', '').strip()
mtp_port = config.get('mtp_port', '').strip()
mtp_secret = config.get('mtp_secret', '').strip()
redis_host = config.get('redis').get('host', 'localhost')
redis_port = config.get('redis').get('port', 6379)
redis_db = config.get('redis').get('db', 14)
redis_password = config.get('redis').get('password', '')
use_ipv6 = bool(strtobool(config.get('ipv6', 'False')))
silent = bool(strtobool(config.get('silent', 'True')))
if api_key is None or api_hash is None:
logs.info(
lang('config_error')
)
exit(1)
# 开始检查代理配置
proxies = {}
if not proxy_addr == '' and not proxy_port == '':
2020-08-08 16:31:48 +00:00
try:
import python_socks
proxies = {
"http": f"socks5://{proxy_addr}:{proxy_port}",
"https": f"socks5://{proxy_addr}:{proxy_port}"
}
bot = TelegramClient(session_string, api_key, api_hash,
auto_reconnect=True,
proxy=(python_socks.ProxyType.SOCKS5, proxy_addr, int(proxy_port)),
use_ipv6=use_ipv6)
2020-08-08 16:31:48 +00:00
except:
proxies = {}
bot = TelegramClient(session_string, api_key, api_hash,
auto_reconnect=True,
use_ipv6=use_ipv6)
elif not http_addr == '' and not http_port == '':
try:
import python_socks
proxies = {
"http": f"http://{http_addr}:{http_port}",
"https": f"http://{http_addr}:{http_port}"
}
bot = TelegramClient(session_string, api_key, api_hash,
auto_reconnect=True,
proxy=(python_socks.ProxyType.HTTP, http_addr, int(http_port)),
use_ipv6=use_ipv6)
except:
bot = TelegramClient(session_string, api_key, api_hash,
auto_reconnect=True,
use_ipv6=use_ipv6)
elif not mtp_addr == '' and not mtp_port == '' and not mtp_secret == '':
2020-08-08 16:31:48 +00:00
from telethon import connection
bot = TelegramClient(session_string, api_key, api_hash,
auto_reconnect=True,
connection=connection.ConnectionTcpMTProxyRandomizedIntermediate,
proxy=(mtp_addr, int(mtp_port), mtp_secret),
use_ipv6=use_ipv6)
else:
bot = TelegramClient(session_string, api_key, api_hash, auto_reconnect=True, use_ipv6=use_ipv6)
user_id = 0
user_bot = False
redis = StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)
async def save_id():
global user_id, user_bot
me = await bot.get_me()
user_id = me.id
user_bot = me.bot
2021-04-04 05:03:57 +00:00
if me.username is not None:
2021-07-18 07:52:11 +00:00
if allow_analytics:
analytics.identify(user_id, {
'name': me.first_name,
'username': me.username,
'bot': f"{user_bot}"
})
2021-04-04 05:03:57 +00:00
else:
2021-07-18 07:52:11 +00:00
if allow_analytics:
analytics.identify(user_id, {
'name': me.first_name,
'bot': f"{user_bot}"
})
if user_bot:
user_bot = me.username
logs.info(f"{lang('save_id')} {me.first_name}({user_id})")
with bot:
bot.loop.run_until_complete(save_id())
report_time = time()
start_time = datetime.utcnow()
git_hash = run("git rev-parse HEAD", stdout=PIPE, shell=True).stdout.decode()
def redis_status():
try:
redis.ping()
return True
except BaseException:
return False
async def log(message):
logs.info(
message.replace('`', '\"')
)
if not strtobool(config['log']):
return
try:
await bot.send_message(
int(config['log_chatid']),
message
)
except ValueError:
pass