This commit is contained in:
iwumingz 2022-04-06 22:39:27 +08:00
parent cd4cb7a5b2
commit 836694df48
38 changed files with 2263 additions and 1 deletions

82
.github/workflows/docker-image.yml vendored Normal file
View File

@ -0,0 +1,82 @@
# yaml-language-server: $schema=https://json.schemastore.org/github-workflow
name: Build Docker Image
# 当 push 到 main 分支,或者创建以 v 开头的 tag 时触发,可根据需求修改
on:
push:
tags:
- v*
env:
REGISTRY: ghcr.io
IMAGE: iwumingz/sycgram # prinsss/ga-hit-counter
jobs:
build-and-push:
runs-on: ubuntu-latest
# 这里用于定义 GITHUB_TOKEN 的权限
permissions:
packages: write
contents: read
steps:
- name: Checkout
uses: actions/checkout@v2
# 缓存 Docker 镜像以加速构建
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: ${{ runner.os }}-buildx-
# 配置 QEMU 和 buildx 用于多架构镜像的构建
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Inspect builder
run: |
echo "Name: ${{ steps.buildx.outputs.name }}"
echo "Endpoint: ${{ steps.buildx.outputs.endpoint }}"
echo "Status: ${{ steps.buildx.outputs.status }}"
echo "Flags: ${{ steps.buildx.outputs.flags }}"
echo "Platforms: ${{ steps.buildx.outputs.platforms }}"
# 登录到 GitHub Packages 容器仓库
# 注意 secrets.GITHUB_TOKEN 不需要手动添加,直接就可以用
- name: Log in to the Container registry
uses: docker/login-action@v1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# 根据输入自动生成 tag 和 label 等数据,说明见下
- name: Extract metadata for Docker
id: meta
uses: docker/metadata-action@v3
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE }}
# 构建并上传
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
file: ./Dockerfile
builder: ${{ steps.buildx.outputs.name }}
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
- name: Inspect image
run: docker buildx imagetools inspect ${{ env.REGISTRY }}/${{ env.IMAGE }}:${{ steps.meta.outputs.version }}

8
.gitignore vendored
View File

@ -127,3 +127,11 @@ dmypy.json
# Pyre type checker
.pyre/
data/app*
data/config*
data/log*
data/img*
*.debug.sh
READ.md

16
Dockerfile Normal file
View File

@ -0,0 +1,16 @@
FROM python:3.9-alpine
LABEL maintainer=iwumingz
WORKDIR /sycgram
COPY . /sycgram
RUN apk add --no-cache libjpeg libwebp libpng py3-lxml bc neofetch \
&& apk add --no-cache --virtual build-deps gcc g++ zlib-dev jpeg-dev libxml2-dev libxslt-dev libwebp-dev libpng-dev \
&& pip install -r requirements.txt --no-cache-dir \
&& apk del build-deps \
&& mkdir -p /sycgram/data \
&& rm -rf .git .github .gitignore Dockerfile install.sh LICENSE README.md requirements.txt
VOLUME /sycgram/data
ENTRYPOINT ["/usr/local/bin/python3", "-u", "main.py"]

View File

@ -1 +1,14 @@
# sycgram
# sycgram
## 安装与更新
```shell
# 如果选择的是安装则安装成功后可先使用Ctrl+P然后使用Ctrl+Q挂到后台运行
bash <(curl -fsL "https://raw.githubusercontent.com/iwumingz/sycgram/main/install.sh")
```
## 注意事项
- 脚本仅适用于Ubuntu/Debian
- 按个人需求随缘更,仅用于学习用途。
- 如果号码等输入错误了,重新安装即可

13
core/__init__.py Normal file
View File

@ -0,0 +1,13 @@
from .custom import command
from pyrogram import Client
app = Client(
"./data/app",
config_file='./data/config.ini',
plugins=dict(root="plugins")
)
__all__ = (
'app', 'command',
)

37
core/custom.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : custom.py
@Time : 2022/04/02 10:17:03
@Author : Viperorz
@Version : 1.0.0
@License : (C)Copyright 2021-2022
@Desc : None
"""
from typing import List, Union
from pyrogram import filters
from pyrogram.types import Message
from tools.constants import STORE_TRACE_DATA
from tools.storage import SimpleStore
def command(command: Union[str, List[str]]):
"""匹配UserBot指令"""
return filters.me & filters.text & filters.command(command, '-')
def is_traced():
"""正则匹配用户输入指令及参数"""
async def func(flt, _, msg: Message):
async with SimpleStore(auto_flush=False) as store:
trace_data = store.get_data(STORE_TRACE_DATA)
if not trace_data:
return False
elif not trace_data.get(msg.from_user.id):
return False
return True
# "data" kwarg is accessed with "flt.data" above
return filters.incoming & filters.create(func)

82
data/command.yml Normal file
View File

@ -0,0 +1,82 @@
help:
format: -help <command>
usage: 指令列表
note:
format: -note <save|del> <序号> or -note <序号|list|clear>
usage: 回复一条消息,根据序号保存/删除该消息文本
dme:
format: -dme <数量>
usage: 直接使用。批量删除消息, 范围1 ~ 1500默认1
f:
format: -f <数量>
usage: 回复一条消息转发该消息n次。范围1 ~ 30 默认1
cp:
format: -cp <数量>
usage: 回复一条消息无引用转发该消息n次。范围1 ~ 30 默认1
ghost:
format: -ghost <status|list>
usage: 直接使用。开启ghost的对话会被自动标记为已读
id:
format: -id
usage: 回复一条消息或直接使用查看对话及消息的ID
sb:
format: -sb
usage: 回复一条消息,将在所有共同且拥有管理踢人权限的群组中踢出目标消息的主人
dc:
format: -dc
usage: 回复一条消息或者直接使用。查看目标消息或当前对话的DC区
pingdc:
format: -pingdc
usage: 测试与各个DC的延时
ex:
format: -ex <数字> <FROM> <TO>
usage: 汇率转换
speedtest:
format: -speedtest <无|节点ID|list|update>
usage: 服务器本地网络测速
s:
format: -s <无|emoji> or -s <sticker_set_title> <sticker_set_name>
usage:
收集回复的贴纸/图片/图片文件消息。直接使用时,可以设置默认贴纸包标题&名字;
回复使用时可以指定emoji不指定则使用默认emoji
trace:
format: -trace <emoji>
usage: 回复一条消息,当目标消息的主人发消息时,自动丢<emoji>。默认:💩
cc:
format: -cc <数量> or -cc <emoji>
usage: 回复使用:遍历该消息的主人发过的消息并丢<数量>个<emoji>给Ta直接使用
指令<emoji>为默认emoji。数量范围1 ~ 233Emoji默认为💩
cal:
format: -cal <四则运算式>
usage: 直接使用。默认除法精确到小数点后4位
sh:
format: -sh <shell脚本>
usage: 直接使用
sysinfo:
format: -sysinfo
usage: 直接使用,查看系统信息
diss:
format: -diss or -biss
usage: 喷子语录
tg:
format: -biss
usage: 舔狗语录

154
install.sh Normal file
View File

@ -0,0 +1,154 @@
#!/bin/bash
clear
CONTAINER_NAME="sycgram"
GITHUB_IMAGE_NAME="iwumingz/${CONTAINER_NAME}"
GITHUB_IMAGE_PATH="ghcr.io/${GITHUB_IMAGE_NAME}"
PROJECT_PATH="/opt/${CONTAINER_NAME}"
PROJECT_VERSION="v1.0.0"
red='\033[0;31m'
green='\033[0;32m'
yellow='\033[0;33m'
plain='\033[0m'
pre_check() {
[[ $EUID -ne 0 ]] && echo -e "${red}错误: ${plain} 需要root权限\n" && exit 1
command -v git >/dev/null 2>&1
if [[ $? != 0 ]]; then
echo -e "正在安装Git..."
apt install git -y >/dev/null 2>&1
echo -e "${green}Git${plain} 安装成功"
fi
command -v curl >/dev/null 2>&1
if [[ $? != 0 ]]; then
echo -e "正在安装curl..."
apt install curl -y >/dev/null 2>&1
echo -e "${green}curl${plain} 安装成功"
fi
command -v docker >/dev/null 2>&1
if [[ $? != 0 ]]; then
echo -e "正在安装Docker..."
bash <(curl -fsL https://get.docker.com) >/dev/null 2>&1
echo -e "${green}Docker${plain} 安装成功"
fi
command -v tar >/dev/null 2>&1
if [[ $? == 0 ]]; then
echo -e "正在安装tar..."
apt install tar -y >/dev/null 2>&1
echo -e "${green}tar${plain} 安装成功"
fi
}
delete_old_image_and_container(){
# 获取最新指令说明
target="https://raw.githubusercontent.com/iwumingz/sycgram/main/data/command.yml"
curl -fsL ${target} > "${PROJECT_PATH}/data/command.yml"
echo "正在删除旧版本容器..."
docker rm -f $(docker ps -a | grep ${CONTAINER_NAME} | awk '{print $1}')
echo "正在删除旧版本镜像..."
docker image rm -f $(docker images | grep ${CONTAINER_NAME} | awk '{print $3}')
}
check_and_create_config(){
if [ ! -f ${PROJECT_PATH}/data/config.ini ]; then
mkdir -p "${PROJECT_PATH}/data" >/dev/null 2>&1
read -p "Please input your api_id" api_id
read -p "Please input your api_hash" api_hash
cat > ${PROJECT_PATH}/data/config.ini <<EOF
[pyrogram]
api_id=${api_id}
api_hash=${api_hash}
[plugins]
root=plugins
EOF
fi
}
stop_sycgram(){
docker stop $(docker ps -a | grep ${GITHUB_IMAGE_NAME} | awk '{print $1}')
}
view_docker_log(){
docker logs -f $(docker ps -a | grep sycgram | awk '{print $1}')
}
uninstall_sycgram(){
delete_old_image_and_container;
rm -rf ${project_path}
}
reinstall_sycgram(){
rm -rf ${PROJECT_PATH}
install_sycgram "-it"
}
install_sycgram(){
pre_check;
check_and_create_config;
delete_old_image_and_container;
echo -e "正在拉取镜像..."
docker pull ghcr.io/iwumingz/sycgram:latest
echo -e "正在启动容器..."
docker run $1 \
--name ${CONTAINER_NAME} \
--env TZ="Asia/Shanghai" \
--restart always \
-v ${PROJECT_PATH}/data:/sycgram/data \
${GITHUB_IMAGE_PATH}:latest
}
show_menu() {
echo -e "${green}Sycgram${plain} | ${green}管理脚本${plain} | ${red}${PROJECT_VERSION}${plain}"
echo -e " ${green}1.${plain} 安装"
echo -e " ${green}2.${plain} 更新"
echo -e " ${green}3.${plain} 停止"
echo -e " ${green}4.${plain} 查看日志"
echo -e " ${green}5.${plain} 重新安装"
echo -e " ${green}6.${plain} 卸载"
echo -e " ${green}0.${plain} 退出脚本"
echo -n "请选择编号: "
read -ep "请输入选择 [0-6]: " option
case "${option}" in
0)
exit 0
;;
1)
install_sycgram "-it"
;;
2)
install_sycgram "-itd"
;;
3)
stop_sycgram
;;
4)
view_docker_log
;;
5)
reinstall_sycgram
;;
6)
uninstall_sycgram
;;
*)
echo -e "${yellow}已退出脚本...${plain}"
exit
;;
esac
}
show_menu;

6
main.py Normal file
View File

@ -0,0 +1,6 @@
from core import app
from tools.initializer import init_logger
if __name__ == '__main__':
init_logger()
app.run()

0
plugins/__init__.py Normal file
View File

29
plugins/calculate.py Normal file
View File

@ -0,0 +1,29 @@
import asyncio
from core import command
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message
from tools.helpers import Parameters, basher
@Client.on_message(command("cal"))
async def calculate(_: Client, msg: Message):
"""计算器"""
_, args = Parameters.get(msg)
try:
res = await basher(f"""echo "scale=4;{args}" | bc""", 3)
except asyncio.exceptions.TimeoutError:
await msg.edit_text("❗️ Connection Timeout")
return
if not res.get('output'):
await msg.edit_text(f"Error{res.get('error')}")
return
text = f"""In`{args}`\nOut`{res.get('output')}`"""
try:
await msg.edit_text(text)
except FloodWait as e:
await asyncio.sleep(e.x)
except RPCError:
await msg.edit_text(text)

107
plugins/cc.py Normal file
View File

@ -0,0 +1,107 @@
import asyncio
import re
from random import choice, random
from typing import List
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.types import Message
from tools.constants import CC_MAX_TIMES, REACTIONS, STORE_CC_DATA, TG_GROUPS
from tools.helpers import Parameters, delete_this, emoji_sender, get_cmd_error
from tools.storage import SimpleStore
@Client.on_message(command('cc'))
async def cc(cli: Client, msg: Message):
"""
cc:
format: -cc <数量> or -cc <emoji|set>
usage:
回复使用遍历该消息的主人发过的消息并丢<数量><emoji>给Ta直接使用
指令<emoji>为默认emoji默认💩
"""
cmd, opt = Parameters.get(msg)
replied_msg = msg.reply_to_message
async with SimpleStore(auto_flush=False) as store:
cc_emoji = store.data.get(STORE_CC_DATA)
if not cc_emoji:
cc_emoji = store.data[STORE_CC_DATA] = '💩'
if replied_msg and bool(re.match(r"[0-9]+", opt)):
cc_times = int(opt)
elif opt in REACTIONS or opt == 'set':
store.data[STORE_CC_DATA] = choice(
REACTIONS) if opt == 'set' else opt
tmp = store.data[STORE_CC_DATA]
store.flush()
await msg.edit_text(f"Default emoji changed to `{tmp}`")
return
else:
await msg.edit_text(get_cmd_error(cmd))
return
# 攻击次数
cc_times = cc_times if 1 <= cc_times <= CC_MAX_TIMES else CC_MAX_TIMES
cc_msgs: List[int] = []
# 遍历和搜索消息
if msg.chat.type in TG_GROUPS:
async for target in cli.search_messages(
chat_id=msg.chat.id, limit=1000,
from_user=replied_msg.from_user.id,
):
if target.message_id > 1 and target.from_user:
cc_msgs.append(target.message_id)
if len(cc_msgs) == cc_times:
break
else:
async for target in cli.iter_history(msg.chat.id, limit=1000):
if target.message_id > 1 and target.from_user and \
target.from_user.id == replied_msg.from_user.id:
cc_msgs.append(target.message_id)
if len(cc_msgs) == cc_times:
break
if len(cc_msgs) > 0:
await msg.edit_text("🔥 `Attacking ...`")
shot = 0
for n, target_id in enumerate(cc_msgs):
try:
res = await emoji_sender(
cli=cli,
chat_id=msg.chat.id,
msg_id=target_id,
emoji=cc_emoji
)
except FloodWait as e:
await asyncio.sleep(e + 1)
res = await emoji_sender(
cli=cli,
chat_id=msg.chat.id,
msg_id=target_id,
emoji=cc_emoji
)
if not res and shot == 0:
await msg.edit_text(
f"This chat don't allow using {cc_emoji} to react."
)
return
shot = shot + 1
logger.success(f"{cmd} | attacking | {n+1}")
await asyncio.sleep(random() / 5)
# Finished
text = f"✅ Finished and the hit rate is {shot/cc_times*100}%"
else:
# Finished
text = "❓ Unable to find attack target"
await delete_this(msg)
res = await cli.send_message(msg.chat.id, text)
await asyncio.sleep(3)
await delete_this(res)

15
plugins/dc.py Normal file
View File

@ -0,0 +1,15 @@
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import get_dc_text
@Client.on_message(command('dc'))
async def dc(_: Client, msg: Message):
"""获取群聊或者目标消息用户的dc_id"""
_is_replied = bool(msg.reply_to_message)
dc_id = msg.reply_to_message.from_user.dc_id \
if _is_replied else msg.chat.dc_id
name = msg.reply_to_message.from_user.mention(style="md") \
if _is_replied else f"`{msg.chat.title}`"
await msg.edit_text(get_dc_text(name, dc_id))

66
plugins/dme.py Normal file
View File

@ -0,0 +1,66 @@
import asyncio
import time
from typing import List
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message
from tools.helpers import Parameters, get_iterlimit, is_deleted_id
@Client.on_message(command('dme'))
async def dme(client: Client, message: Message):
"""删除指令数量的消息"""
cmd, limit = Parameters.get_int(message, max_num=1500)
counter, ids_deleted = 0, []
await message.edit_text("🧹`Clearing history...`")
start = time.time()
async def delete_messages(cli: Client, ids_deleted: List[int]):
if len(ids_deleted) == 100:
try:
await cli.delete_messages(message.chat.id, ids_deleted)
except FloodWait as e:
await asyncio.sleep(e.x + 0.5)
except RPCError as e:
logger.error(e)
else:
ids_deleted.clear()
# 第一阶段,暴力扫描最近的消息,这些消息有可能无法搜索到
async for msg in client.iter_history(message.chat.id, limit=get_iterlimit(limit)):
if is_deleted_id(msg):
logger.info(f'{cmd} | scanning | {msg.message_id}')
ids_deleted.append(msg.message_id)
counter = counter + 1
await delete_messages(client, ids_deleted)
if counter == limit:
break
# 第二阶段,对于老的消息直接扫描性能不好,还会触发限制,使用搜索功能来提速
if counter < limit:
async for msg in client.search_messages(
chat_id=message.chat.id,
offset=counter,
limit=limit - counter,
from_user='me',
):
if is_deleted_id(msg) and msg.message_id not in ids_deleted:
logger.info(f'{cmd} | searching | {msg.message_id}')
ids_deleted.append(msg.message_id)
counter = counter + 1
await delete_messages(client, ids_deleted)
if counter == limit:
break
if len(ids_deleted) != 0:
await client.delete_messages(message.chat.id, ids_deleted)
text = f"🧹Deleted {counter} messages in {time.time() - start:.3f} seconds."
res = await message.reply(text)
await asyncio.sleep(3)
await res.delete()
# log
logger.success(f"{cmd} | {text}")
await logger.complete()

54
plugins/forward.py Normal file
View File

@ -0,0 +1,54 @@
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.errors import RPCError
from pyrogram.types import Message
from tools.helpers import Parameters, delete_this
async def check_replied_msg(msg: Message, cmd: str) -> bool:
replied_msg = msg.reply_to_message
if not replied_msg:
await msg.edit_text(f"❗️ Please use `{cmd}` to reply to a message.")
return False
elif replied_msg.has_protected_content or replied_msg.chat.has_protected_content:
await msg.edit_text("😮‍💨 Please don't foward protected messages")
return False
else:
return True
@Client.on_message(command('f'))
async def forward(_: Client, msg: Message):
"""转发目标消息"""
cmd, num = Parameters.get_int(msg)
replied_msg = msg.reply_to_message
if not await check_replied_msg(msg, cmd):
return
await delete_this(msg)
for _ in range(num):
try:
await replied_msg.forward(msg.chat.id, disable_notification=True)
except RPCError as e:
logger.error(e)
@Client.on_message(command('cp'))
async def copy_forward(cli: Client, msg: Message):
"""无引用转发"""
cmd, num = Parameters.get_int(msg)
if not await check_replied_msg(msg, cmd):
return
await delete_this(msg)
for _ in range(num):
try:
await cli.copy_message(
chat_id=msg.chat.id,
from_chat_id=msg.chat.id,
message_id=msg.reply_to_message.message_id,
disable_notification=True
)
except RPCError as e:
logger.error(e)

58
plugins/ghost.py Normal file
View File

@ -0,0 +1,58 @@
import asyncio
from core.custom import command
from loguru import logger
from pyrogram import Client, filters
from pyrogram.errors import RPCError
from pyrogram.types import Message
from tools.constants import STORE_GHOST_DATA
from tools.ghosts import get_ghost_to_read
from tools.helpers import Parameters, delete_this, get_fullname
from tools.storage import SimpleStore
@Client.on_message(filters.incoming, group=-2)
async def ghost_event(cli: Client, msg: Message):
"""自动标记对话为<已读>"""
if await get_ghost_to_read(msg.chat.id):
try:
await cli.read_history(msg.chat.id)
except RPCError as e:
logger.error(e)
else:
if msg.text or msg.caption:
text = msg.text or msg.text
text = f"Ghost | {msg.chat.title} | {get_fullname(msg.from_user)} | {text}"
logger.debug(text)
finally:
await logger.complete()
@Client.on_message(command('ghost'))
async def ghost(_: Client, msg: Message):
"""指令:将该对话标记为可自动<已读>状态"""
_, opt = Parameters.get(msg)
chat = msg.chat
async with SimpleStore(auto_flush=False) as store:
ghost_data = store.get_data(STORE_GHOST_DATA)
# ghost状态
if opt == 'status':
text = f"此对话是否开启ghost{'' if chat.id in ghost_data else ''}"
elif opt == 'list':
tmp = '\n'.join(f'```{k} {v}```' for k, v in ghost_data.items())
text = f"📢 已开启ghost的对话名单\n{tmp}"
# ghost开关
else:
if chat.id in ghost_data:
text = "❌ 已关闭此对话的ghost"
ghost_data.pop(chat.id, None)
else:
text = "✅ 已开启此对话的ghost"
ghost_data[chat.id] = chat.title or get_fullname(msg.from_user)
store.flush()
await msg.edit_text(text, parse_mode='md')
await asyncio.sleep(1)
if opt != 'status' and opt != 'list':
await delete_this(msg)

24
plugins/help.py Normal file
View File

@ -0,0 +1,24 @@
from typing import Any, Dict
import yaml
from core import command
from pyrogram import Client
from pyrogram.types import Message
@Client.on_message(command('help'))
async def helper(_: Client, msg: Message):
"""指令用法提示。格式:-help <cmd|None>"""
cmd = msg.text.replace('-help', '', 1).strip()
cmd_data: Dict[str, Any] = yaml.full_load(open('./data/command.yml', 'rb'))
if not cmd:
tmp = ''.join(f"`{k}`" for k in cmd_data.keys())
text = f"📢 **指令列表:**\n{tmp}\n\n**发送** `-help <cmd>` **查看某指令的详细用法**"
elif not cmd_data.get(cmd):
text = f'❓ `{cmd}` Command Not Found'
else:
text = f"格式:`{cmd_data.get(cmd).get('format')}`\n" \
f"用法:`{cmd_data.get(cmd).get('usage')}`"
await msg.edit_text(text, parse_mode='md')

23
plugins/info.py Normal file
View File

@ -0,0 +1,23 @@
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import get_fullname
@Client.on_message(command("id"))
async def get_id(_: Client, msg: Message):
"""直接使用或者回复目标消息从而获取各种IDs"""
text = f"Message ID: `{msg.message_id}`\n\n" \
f"Chat Title: `{msg.chat.title}`\n" \
f"Chat Type: `{msg.chat.type}`\n" \
f"Chat ID: `{msg.chat.id}`"
if msg.reply_to_message:
user = msg.reply_to_message.from_user
text = f"Repiled Message ID: `{msg.reply_to_message.message_id}`\n\n" \
f"User Nick: `{get_fullname(user)}`\n"\
f"User Name: `@{user.username}`\n" \
f"User ID: `{user.id}`\n\n" \
f"{text}"
await msg.edit_text(text)

54
plugins/kick.py Normal file
View File

@ -0,0 +1,54 @@
import asyncio
from inspect import Parameter
from loguru import logger
from core import command
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message
from tools.helpers import delete_this, get_cmd_error, kick_one
@Client.on_message(command('sb'))
async def sb(cli: Client, msg: Message):
"""回复一条消息,将在所有共同且拥有管理踢人权限的群组中踢出目标消息的主人"""
cmd, *_ = Parameter.get(msg)
reply_to_message = msg.reply_to_message
if not reply_to_message or msg.chat.type in ['bot', 'private']:
await msg.edit_text(get_cmd_error(cmd))
return
counter, target = 0, reply_to_message.from_user
common_groups = await target.get_common_chats()
logger.info(
f"Start to kick <{target.first_name}{target.last_name} <{target.id}>")
for chat in common_groups:
try:
if await kick_one(cli, chat.id, target.id):
counter = counter + 1
except FloodWait as e:
await asyncio.sleep(e.x)
if await kick_one(cli, chat.id, target.id):
counter = counter + 1
logger.success(
f"Kick this user out of <{chat.tile} {chat.id}>"
)
except RPCError as e:
logger.warning(
f"No admin rights in this group <{chat.title} {chat.id}>")
logger.warning(e)
# delete this user all messages
await cli.delete_user_history(msg.chat.id, target.id)
# Inform
text = f"😂 Kick {target.mention(style='md')} in {counter} common groups."
await msg.edit_text(text)
await asyncio.sleep(10)
await delete_this(msg)
# log
logger.success(f"{cmd} | {text}")
await logger.complete()

63
plugins/note.py Normal file
View File

@ -0,0 +1,63 @@
import asyncio
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.errors import BadRequest, FloodWait
from pyrogram.types import Message
from tools.constants import STORE_NOTES_DATA
from tools.helpers import Parameters, get_cmd_error
from tools.storage import SimpleStore
@Client.on_message(command('note'))
async def note(_: Client, msg: Message):
"""
用法一-note <save|del> <序号>
用法二-note <序号|list|clear>
作用发送已保存的笔记
"""
cmd, opts = Parameters.get_more(msg)
if not (1 <= len(opts) <= 2):
await msg.edit_text(get_cmd_error(cmd))
return
replied_msg = msg.reply_to_message
async with SimpleStore() as store:
notes_data = store.get_data(STORE_NOTES_DATA)
if len(opts) == 2 and opts[0] == 'save' and replied_msg:
if replied_msg:
notes_data[opts[1]] = replied_msg.text or replied_msg.caption
text = "😊 Notes saved successfully."
else:
text = get_cmd_error(cmd)
elif len(opts) == 2 and opts[0] == 'del':
if notes_data.pop(opts[1], None):
text = "😊 Notes deleted successfully."
else:
text = "❓ Can't find the note to delete."
elif len(opts) == 1:
option = opts[0]
if option == 'list':
tmp = '\n'.join(
f'```{k} | {v[0:30]} ...```' for k, v in notes_data.items())
text = f"已保存的笔记:\n{tmp}"
elif option == 'clear':
notes_data.clear()
text = "✅ All saved notes have been deleted."
else:
res = notes_data.get(option)
text = res if res else f"😱 No saved notes found for {option}"
else:
text = get_cmd_error(cmd)
try:
await msg.edit_text(text)
except BadRequest as e:
logger.error(e) # 存在消息过长的问题,应拆分发送。(就不拆 😊)
except FloodWait as e:
logger.warning(e)
await asyncio.sleep(e.x)
await msg.edit_text(text)
finally:
await logger.complete()

53
plugins/other.py Normal file
View File

@ -0,0 +1,53 @@
import asyncio
import re
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message
from tools.helpers import delete_this, escape_markdown
from tools.sessions import session
@Client.on_message(command(['biss', 'diss', 'tg']))
async def other(_: Client, msg: Message):
"""喷人/舔狗"""
if bool(re.match(r"-(d|b)iss", msg.text)):
symbol = '💢 '
api = 'https://zuan.shabi.workers.dev/'
elif bool(re.match(r"-tg", msg.text)):
symbol = '👅 '
api = 'http://ovooa.com/API/tgrj/api.php'
await msg.edit_text(f"{symbol}It's preparating.")
for _ in range(10):
try:
resp = await session.get(api, timeout=5.5)
if resp.status == 200:
text = escape_markdown(await resp.text())
else:
resp.raise_for_status()
except Exception as e:
logger.error(e)
continue
words = f"{msg.reply_to_message.from_user.mention(style='md')} {text}" \
if msg.reply_to_message else text
try:
await msg.edit_text(words, parse_mode='md')
except FloodWait as e:
await asyncio.sleep(e.x)
await msg.edit_text(words, parse_mode='md')
except RPCError as e:
logger.error(e)
await logger.complete()
return
# Failed to get api text
await delete_this(msg)
res = await msg.edit_text('😤 Rest for a while.')
await asyncio.sleep(3)
await delete_this(res)

29
plugins/pingdc.py Normal file
View File

@ -0,0 +1,29 @@
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import execute
@Client.on_message(command('pingdc'))
async def pingdc(_: Client, msg: Message):
"""到各个DC区的延时"""
DCs = {
1: "149.154.175.50",
2: "149.154.167.51",
3: "149.154.175.100",
4: "149.154.167.91",
5: "91.108.56.130"
}
data = []
for dc in range(1, 6):
result = await execute(f"ping -c 1 {DCs[dc]} | awk -F '/' " + "'END {print $5}'")
output = result.get('output')
data.append(output.replace('\n', '') if output else '-1')
await msg.edit_text(
f"🇺🇸 DC1(迈阿密): `{data[0]}`\n"
f"🇳🇱 DC2(阿姆斯特丹): `{data[1]}`\n"
f"🇺🇸 DC3(迈阿密): `{data[2]}`\n"
f"🇳🇱 DC4(阿姆斯特丹): `{data[3]}`\n"
f"🇸🇬 DC5(新加坡): `{data[4]}`", "md"
)

50
plugins/rate.py Normal file
View File

@ -0,0 +1,50 @@
from core import command
from loguru import logger
from pyrogram import Client
from pyrogram.types import Message
from tools.constants import RATE_API
from tools.helpers import Parameters
from tools.sessions import session
@Client.on_message(command('ex'))
async def rate(_: Client, msg: Message):
"""查询当天货币汇率,格式:-ex <float> <FROM> <TO>"""
cmd, args = Parameters.get_more(msg)
if len(args) != 3:
failure = f"❗️ Usage like `{cmd} 1 usd cny`, it will be exchanged from usd to cny."
await msg.edit_text(failure)
return
try:
num = abs(float(args[0]))
except ValueError:
await msg.edit_text("❗️ Not the correct number.")
return
else:
__from = args[1].lower()
__to = args[2].lower()
for _ in range(10):
async with session.get(
f'{RATE_API}/{__from}/{__to}.json', timeout=5.5
) as resp:
try:
if resp.status == 200:
data = await resp.json()
result = float(data.get(__to)) * num
success = f"```{__from.upper()} : {__to.upper()} = {num} : {result:.5f}```"
await msg.edit_text(success)
logger.success(success)
return
else:
resp.raise_for_status()
except Exception as e:
logger.error(e)
continue
finally:
await logger.complete()
failure = "❗️ Network error or wrong currency symbol. Please try again."
await msg.edit_text(failure)
return

42
plugins/shell.py Normal file
View File

@ -0,0 +1,42 @@
import asyncio
from getpass import getuser
from io import BytesIO
from platform import node
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import Parameters, basher, delete_this, get_cmd_error
@Client.on_message(command("sh"))
async def shell(_: Client, msg: Message):
"""执行shell脚本"""
cmd, _input = Parameters.get(msg)
if not _input:
await msg.edit_text(get_cmd_error(cmd))
return
try:
res = await basher(_input, timeout=30)
except asyncio.exceptions.TimeoutError:
await msg.edit_text('❗️ Connection Timeout')
return
_output: str = res.get('output') if not res.get('error') else res.get('error')
header = f"**{getuser()}@{node()}**\n"
all_bytes = len(header.encode() + _input.encode() + _output.encode())
if all_bytes >= 2048:
await delete_this(msg)
await msg.reply_document(
document=BytesIO(_output.encode()),
caption=f"{header}> # `{_input}`",
file_name="output.log",
parse_mode='md'
)
return
await msg.edit_text(
f"{header}> # `{_input}`\n```{_output.strip()}```",
parse_mode='md'
)

63
plugins/speedtest.py Normal file
View File

@ -0,0 +1,63 @@
import asyncio
import re
from core import command
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.types import Message
from tools.constants import SPEEDTEST_RUN
from tools.helpers import Parameters, delete_this, get_cmd_error, show_error
from tools.speedtests import Speedtester
@Client.on_message(command('speedtest'))
async def speedtest(_: Client, msg: Message):
"""服务器测速,用法:-speedtest <节点ID|list|update>"""
cmd, opt = Parameters.get(msg)
await msg.edit_text("⚡️ Speedtest is running.")
async with Speedtester() as tester:
if opt == 'update':
try:
update_res = await tester.init_for_speedtest('update')
except asyncio.exceptions.TimeoutError:
await msg.edit_text("⚠️ Update Timeout")
except Exception as e:
await show_error(msg, e)
else:
# TODO有个未知错误
await msg.edit_text(f"Result\n```{update_res}```")
return
elif opt == 'list':
try:
text = await tester.list_servers_ids(f"{SPEEDTEST_RUN} -L")
await msg.edit_text(text, parse_mode='md')
except asyncio.exceptions.TimeoutError:
await msg.edit_text("⚠️ Speedtest Timeout")
return
elif bool(re.match(r'[0-9]+', opt)) or not opt:
try:
text, link = await tester.running(
f"""{SPEEDTEST_RUN}{'' if not opt else f' -s {opt}'}"""
)
except asyncio.exceptions.TimeoutError:
await msg.edit_text("⚠️ Speedtest Timeout")
return
else:
await msg.edit_text(get_cmd_error(cmd))
return
if not link:
await msg.edit_text(text)
return
# send speed report
try:
await msg.reply_photo(photo=link, caption=text, parse_mode='md')
except FloodWait as e:
await asyncio.sleep(e.x)
await msg.reply_photo(photo=link, caption=text, parse_mode='md')
except Exception as e:
await show_error(msg, e)
# delete cmd history
await delete_this(msg)

157
plugins/sticker.py Normal file
View File

@ -0,0 +1,157 @@
import asyncio
from time import time
from core import command
from loguru import logger
from pyrogram import Client, filters
from pyrogram.types import Message
from tools.constants import STICKER_BOT, STICKER_ERROR_LIST
from tools.helpers import Parameters, check_if_package_existed, get_default_pkg
from tools.stickers import StickerAdder, sticker_cond, sticker_locker
from tools.storage import SimpleStore
@Client.on_message(filters.incoming & filters.user(STICKER_BOT), group=-1)
async def sticker_event(cli: Client, msg: Message):
async with sticker_cond.get_response():
if msg.text not in STICKER_ERROR_LIST:
sticker_cond.notify()
logger.success(f"Receive @Stickers response | {msg.text}")
else:
async with SimpleStore() as store:
me = await cli.get_me()
pkg_title, pkg_name = get_default_pkg(me)
store.data['sticker_error'] = msg.text
store.data['sticker_set_title'] = pkg_title
store.data['sticker_set_name'] = pkg_name
logger.error(f"Receive @Stickers error | {msg.text}")
await logger.complete()
@Client.on_message(command('s'))
async def sticker(cli: Client, msg: Message):
"""
用法一-s <emoji|> 回复一条消息
用法二-s <sticker_set_title> <sticker_set_name> 切换默认贴纸包标题和名字
作用偷为静态贴纸对象贴纸/图片/图片文件
"""
_, args = Parameters.get_more(msg)
if not msg.reply_to_message:
# 处理参数
if len(args) != 2:
pkg_title, pkg_name = get_default_pkg(msg.from_user)
await msg.edit('✅ Reset sticker title and name to default..')
else:
pkg_title, pkg_name = args
if len(pkg_title.encode()) >= 168:
await msg.edit_text('❗️ Too long sticker set title.')
return
elif len(pkg_title.encode()) >= 58:
await msg.edit_text('❗️ Too long sticker set name.')
return
await msg.edit('✅ Customize sticker title and name successfully.')
async with SimpleStore() as store:
store.data['sticker_set_title'] = pkg_title
store.data['sticker_set_name'] = pkg_name
return
async with SimpleStore() as store:
pkg_title = store.data.get('sticker_set_title')
pkg_name = store.data.get('sticker_set_name')
if not pkg_title or not pkg_name:
await msg.edit_text(
"⚠️ The default sticker title and name are empty, "
"please use `-s` reset!"
)
return
# 尝试检查贴纸包是否存在
try:
pkg_existed = await check_if_package_existed(pkg_name)
except Exception as e:
# 无法判定是否贴纸包存在
logger.error(e)
await msg.edit_text('⚠️ Network Error | Please try again later ...')
return
# 开始前的检查
await msg.edit_text('👆 Working on adding stickers ...')
await cli.unblock_user(STICKER_BOT)
# 开始偷贴纸
async with sticker_locker.get_lock():
try:
await sticker_helper(
cli=cli,
msg=msg,
pkg_title=pkg_title,
pkg_name=pkg_name,
pkg_existed=pkg_existed,
)
except asyncio.exceptions.TimeoutError:
async with SimpleStore() as store:
sticker_error = store.data.get('sticker_error')
store.data.pop('sticker_error', None)
await msg.edit_text(f"❌ Error\n```{sticker_error}```")
except TypeError:
await msg.edit_text("😭 Not static image, now stopped ...")
except Exception as e:
logger.error(e)
await msg.edit_text("😭 Failed to add stickers, now stopped ...")
finally:
await logger.complete()
async def sticker_helper(
cli: Client,
msg: Message,
pkg_title: str,
pkg_name: str,
pkg_existed: bool,
):
replied = msg.reply_to_message
if not (replied.sticker or replied.photo or (
replied.document and
'image' in replied.document.mime_type
)):
raise TypeError("It's not photo")
start, adder = time(), StickerAdder(cli, msg)
# ---------------- 目标消息为:贴纸 ----------------
success = f"👍 Finished in <time> and Click [here](https://t.me/addstickers/{pkg_name}) for details."
# Up to 3 attempts
for attempts in range(3):
if pkg_existed:
# counter == 6
await adder.do_cancel()
await adder.send_message('/addsticker')
await adder.send_message(pkg_name)
if await adder.upload_photo():
continue
await adder.send_emoji()
await adder.send_message('/done')
else:
# counter == 8
await adder.do_cancel()
await adder.send_message('/newpack')
await adder.send_message(pkg_title)
if await adder.upload_photo():
continue
await adder.send_emoji()
await adder.send_message('/publish')
await adder.send_message('/skip')
await adder.send_message(pkg_name)
if adder.is_finished(pkg_existed):
success = success.replace('<time>', f'{time()-start:.3f}', 1)
await adder.done(success, parse_mode='md')
return
else:
adder.send_retries(attempts)
failure = "😭 Failed to add stickers, now stopped ..."
await adder.done(failure, 'md')
logger.warning(failure)
await logger.complete()
return

14
plugins/sysinfo.py Normal file
View File

@ -0,0 +1,14 @@
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import basher
@Client.on_message(command("sysinfo"))
async def sysinfo(_: Client, msg: Message):
"""查询系统信息"""
res = await basher("neofetch --config none --stdout")
if not res.get('error'):
await msg.edit_text(f"```{res.get('output')}```")
else:
await msg.edit_text(f"```{res.get('error')}```")

97
plugins/trace.py Normal file
View File

@ -0,0 +1,97 @@
import asyncio
from core import command
from core.custom import is_traced
from loguru import logger
from pyrogram import Client
from pyrogram.errors import BadRequest, FloodWait, RPCError
from pyrogram.types import Message
from tools.constants import REACTIONS, STORE_TRACE_DATA
from tools.helpers import Parameters, delete_this
from tools.storage import SimpleStore
@Client.on_message(is_traced(), group=-4)
async def trace_event(cli: Client, msg: Message):
user = msg.from_user
async with SimpleStore(auto_flush=False) as store:
try:
emoji = store.get_data(STORE_TRACE_DATA).get(user.id)
await cli.send_reaction(
msg.chat.id, msg.message_id, emoji
)
except BadRequest:
failure = f"Group named <{msg.chat.title}> can't use {emoji} to react."
store.data[STORE_TRACE_DATA].pop(user.id, None)
store.flush()
await cli.send_message('me', failure)
except RPCError as e:
logger.error(e)
@Client.on_message(command('trace'))
async def trace(cli: Client, msg: Message):
"""群组中追着丢emoji
指令-trace
用法-trace <emoji|clear|list> 用于回复一条消息
"""
cmd, opt = Parameters.get(msg)
replied_msg = msg.reply_to_message
if not opt and not replied_msg:
await msg.edit_text(f'❗️ Use `{cmd}` to reply a message.')
return
if opt != 'clear' and opt != 'list':
emoji = '💩' if opt not in REACTIONS else opt
user = replied_msg.from_user
try:
await cli.send_reaction(
msg.chat.id,
replied_msg.message_id,
emoji
)
except RPCError as e:
logger.warning(e)
await msg.edit_text(f"❗️ Can't use {emoji} in this chat.")
return
async with SimpleStore() as store:
trace_data = store.get_data(STORE_TRACE_DATA)
if opt != 'clear' and opt != 'list':
# 追踪列表中没有,则添加
if not trace_data.get(user.id):
trace_data[user.id] = emoji
text = f"✅ 添加 {user.mention(style='md')} 到trace列表"
logger.success(text)
# 追踪列表有,则删除
elif trace_data.pop(user.id, False):
text = f"✅ 将 {user.mention(style='md')} 从trace列表移除"
logger.success(text)
# 删除失败??
else:
text = f"❌ 竟然将 {user.mention(style='md')} 从trace列表移除失败!"
logger.warning(text)
elif opt == 'clear':
trace_data.clear()
text = "✅ 已清空trace名单"
elif opt == 'list':
tmp = '\n'.join(f"`{k}` | {v}" for k, v in trace_data.items())
text = f"📢 trace名单\n{tmp}"
try:
await msg.edit_text(text, parse_mode='md')
except FloodWait as e:
await asyncio.sleep(e.x)
await msg.edit_text(text, parse_mode='md')
except RPCError as e:
logger.error(e)
await msg.edit_text(f"Network error | `{e}`")
finally:
if opt != 'clear' and opt != 'list':
await asyncio.sleep(3)
await delete_this(msg)
await logger.complete()

BIN
requirements.txt Normal file

Binary file not shown.

0
tools/__init__.py Normal file
View File

55
tools/constants.py Normal file
View File

@ -0,0 +1,55 @@
from typing import Dict, List
# ------------- rate --------------
RATE_API: str = 'https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies'
HTTP_HEADERS: Dict[str, str] = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
}
# ------------- speedtest --------------
SPEEDTEST_PATH_FILE: str = './data/speedtest'
SPEEDTEST_CLI: str = "https://install.speedtest.net/app/cli/ookla-speedtest-1.1.1-linux-<arch>.tgz"
INSTALL_SPEEDTEST: str = f"wget -qO- {SPEEDTEST_CLI} | tar zx -C ./data speedtest"
SPEEDTEST_RUN: str = f'{SPEEDTEST_PATH_FILE} --accept-license --accept-gdpr -f json'
# ------------- sticker --------------
STICKER_BOT: int = 429000
STICKER_IMG: str = './data/img/tmp.png'
STICKER_DESCRIP: str = b'A Telegram user has created the Sticker\xc2\xa0Set.'.decode('utf-8')
GT_120_STICKERS: str = "Whoa! That's probably enough stickers for one set, " \
"give it a break. A set can't have more than 120 stickers at the moment."
UNACCEPTABLE_SET_NAME: str = 'Sorry, this short name is unacceptable.'
TAKEN_SET_NAME: str = 'Sorry, this short name is already taken.'
INVALID_SET_NAME: str = 'Invalid set selected.'
STICKER_ERROR_LIST: List[str] = [
GT_120_STICKERS,
UNACCEPTABLE_SET_NAME,
TAKEN_SET_NAME,
INVALID_SET_NAME,
]
# ------------- cc & trace --------------
REACTIONS: list[str] = ['👍', '👎', '❤️', '🔥', '🥰', '👏',
'😁', '🤔', '🤯', '😱', '🤬', '😢',
'🎉', '🤩', '🤮', '💩']
CC_MAX_TIMES: int = 233
# ------------- ghost --------------
GHOST_INTERVAL: float = 1.5
# ------------- other --------------
TG_GROUP: str = 'group'
TG_SUPERGROUP: str = 'supergroup'
TG_CHANNEL: str = 'channel'
TG_BOT: str = 'bot'
TG_PRIVATE: str = 'private'
TG_GROUPS: List[str] = ['group', 'supergroup']
# ------------- Store -------------
STORE_CC_DATA: str = 'data:cc'
STORE_NOTES_DATA: str = 'data:notes'
STORE_TRACE_DATA: str = 'data:trace'
STORE_GHOST_DATA: str = 'data:ghost'
STORE_GHOST_CACHE: str = 'cache:ghost'

20
tools/ghosts.py Normal file
View File

@ -0,0 +1,20 @@
from time import time
from typing import Union
from .constants import GHOST_INTERVAL, STORE_GHOST_CACHE, STORE_GHOST_DATA
from .storage import SimpleStore
async def get_ghost_to_read(cid: Union[int, str]) -> bool:
"""是否自动标记为已读"""
async with SimpleStore() as store:
ghost_cache = store.get_data(STORE_GHOST_CACHE)
ghost_list = store.get_data(STORE_GHOST_DATA)
if cid in ghost_list.keys() and (
not ghost_cache.get(cid) or
time() - ghost_cache.get(cid) > GHOST_INTERVAL
):
ghost_cache[cid] = time()
return True
return False

200
tools/helpers.py Normal file
View File

@ -0,0 +1,200 @@
import asyncio
import re
from typing import Any, Dict, List, Tuple, Union
from bs4 import BeautifulSoup
from loguru import logger
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message, User
from .constants import STICKER_DESCRIP
from .sessions import session
class Parameters:
@classmethod
def get(cls, msg: Message) -> Tuple[str]:
"""返回所需的指令的单个参数,类型为`str`"""
text = msg.text.split(' ', 1)
if len(text) > 1:
return text[0], ''.join(text[1:]).strip()
else:
return text[0], ''
@classmethod
def get_int(cls, msg: Message, max_num: int = 30) -> Tuple[str, int]:
"""返回所需的指令的单个参数,类型为`int`"""
cmd, arg = cls.get(msg)
if not arg or not bool(re.match(r"[0-9]+$", arg)):
return cmd, 1
num = int(arg) if 1 <= int(arg) <= max_num else 1
return cmd, num
@classmethod
def get_more(cls, msg: Message) -> Tuple[str, List[str]]:
cmd, text = cls.get(msg)
return cmd, [x.strip() for x in text.split(' ') if x]
def get_iterlimit(num: int) -> int:
"""
Args:
num (int): 实际删除消息数
Returns:
int: 迭代历史消息限制数
"""
return num * 3 if num * 3 < 1500 else 1500
def get_dc_text(name: str, dc_id: int) -> str:
text = f"{name} 的数据中心为:`DC{dc_id}`\n该数据中心位于:"
if dc_id == 1 or dc_id == 3:
return f"{text}`美国佛罗里达州迈阿密`"
elif dc_id == 2 or dc_id == 4:
return f"{text}`荷兰北荷兰省阿姆斯特丹`"
elif dc_id == 5:
return f"{text}`新加坡`"
else:
return "❗️ 无法获取该用户/群组的数据中心 ..."
def get_fullname(user: User) -> str:
if user:
if user.last_name:
return f"{user.first_name} {user.last_name}"
return user.first_name
else:
return "Anonymous"
def get_default_pkg(user: User) -> Tuple[str]:
if user.username:
return f"@{user.username} 的贴纸包(1)", f"{user.username}_1"
return f"@{user.first_name} 的贴纸包(1)", f"tmp_{user.id}_1"
def is_deleted_id(msg: Message) -> bool:
return bool(msg.message_id > 1 and msg.from_user and msg.from_user.is_self)
def get_cmd_error(cmd: str) -> str:
return f"Use `-help {cmd.replace('-', '', 1)}` to view detailed usage."
async def check_if_package_existed(pkg_name: str) -> bool:
"""检测贴纸包是否存在
Args:
pkg_name (`str`): 贴纸包名字
Raises:
ValueError: 无法检测贴纸包是否存在
Returns:
bool: `True`为贴纸包存在`False`为贴纸包不存在
"""
async with session.get(
f'https://t.me/addstickers/{pkg_name}', timeout=9.9,
) as resp:
if resp.status == 200:
soup = BeautifulSoup(await resp.text(), 'lxml')
target = soup.find(
'div', class_='tgme_page_description').text.strip()
return not bool(STICKER_DESCRIP == target)
else:
resp.raise_for_status()
raise ValueError("Can't check if sticker package is existed.")
async def emoji_sender(
cli: Client,
chat_id: Union[int, str],
msg_id: int,
emoji: str = '',
) -> bool:
try:
await cli.send_reaction(chat_id, msg_id, emoji)
except FloodWait as e:
raise e
except RPCError:
return False
else:
return True
async def delete_this(msg: Message) -> None:
try:
await msg.delete()
except RPCError as e:
logger.warning(e)
await logger.complete()
async def basher(cmd: str, timeout: int = 10) -> Dict[str, Any]:
return await asyncio.wait_for(execute(cmd), timeout=timeout)
async def execute(command: str) -> Dict[str, Any]:
executor = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await executor.communicate()
except Exception as e:
return {'output': '', 'error': str(e)}
else:
return {
'output': stdout.decode('utf-8', 'ignore'),
'error': stderr.decode('utf-8', 'ignore')
}
async def show_error(msg: Message, e: Any) -> None:
await msg.edit_text(f"⚠️ Error\n```{e}```")
async def kick_one(cli: Client, cid: Union[int, str], uid: Union[int, str]):
me = await cli.get_chat_member(cid, 'me')
if me.can_restrict_members and await cli.ban_chat_member(cid, uid):
return True
return False
def escape_markdown(text: str, version: int = 1, entity_type: str = None) -> str:
"""
Helper function to escape telegram markup symbols.
Args:
text (:obj:`str`): The text.
version (:obj:`int` | :obj:`str`): Use to specify the version of telegrams Markdown.
Either ``1`` or ``2``. Defaults to ``1``.
entity_type (:obj:`str`, optional): For the entity types ``PRE``, ``CODE`` and the link
part of ``TEXT_LINKS``, only certain characters need to be escaped in ``MarkdownV2``.
See the official API documentation for details. Only valid in combination with
``version=2``, will be ignored else.
"""
if int(version) == 1:
escape_chars = r'_*`['
elif int(version) == 2:
if entity_type in ['pre', 'code']:
escape_chars = r'\`'
elif entity_type == 'text_link':
escape_chars = r'\)'
else:
escape_chars = r'_*[]()~`>#+-=|{}.!'
else:
raise ValueError('Markdown version must be either 1 or 2!')
return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text)

26
tools/initializer.py Normal file
View File

@ -0,0 +1,26 @@
from loguru import logger
import sys
def init_logger():
# log config
logger.add(
sys.stderr,
format="{time} {level} {message}",
filter="my_module",
level="INFO",
enqueue=True
)
logger.add(
'./data/log/debug.app.log',
filter=lambda record: record["level"].name == "DEBUG",
level="DEBUG",
enqueue=True,
retention="21 days"
)
logger.add(
'./data/log/info.app.log',
filter=lambda record: record["level"].name != "DEBUG",
level="INFO",
enqueue=True
)

6
tools/sessions.py Normal file
View File

@ -0,0 +1,6 @@
import aiohttp
session = aiohttp.ClientSession(headers={
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
})

252
tools/speedtests.py Normal file
View File

@ -0,0 +1,252 @@
import json
import os
import platform
from datetime import datetime
from os import path
from time import time
from typing import Any, Dict, Optional, Tuple
from loguru import logger
from tools.constants import INSTALL_SPEEDTEST, SPEEDTEST_PATH_FILE
from .helpers import basher
class Speedtester:
def __init__(self) -> None:
pass
async def __aenter__(self):
self._timer = time()
logger.info("Speedtest start ...")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
logger.info(
f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
async def running(self, cmd: str) -> Tuple[str]:
"""开始执行speedtest
Args:
cmd (str, optional): speedtest的完整指令需要返回json格式.
Defaults to 'speedtest-cli --share --json'.
Returns:
Tuple[str]: 第一个值是文本/错误第二个是图片link
"""
await self.init_for_speedtest()
# 超时报错
res = await basher(cmd, timeout=60)
logger.info(f"Speedtest Execution | {res}")
try:
# output result
self.__output: Dict[str, Any] = json.loads(res.get('output'))
self.__server: Dict[str, Any] = self.__output.get('server')
except Exception:
return f"⚠️ Error\n```{res.get('error')}```", ''
else:
text = "**Speedtest**\n" \
f"Server: {self.get_server()}\n" \
f"Sponsor: {self.get_sponsor()}\n" \
f"Upload: {self.get_speed('upload')}\n" \
f"Download: {self.get_speed('download')}\n" \
f"jitter: {self.get_ping('jitter')}\n" \
f"Latency: {self.get_ping('latency')}\n" \
f"Time: {self.get_time()}"
return text, f"{self.__output.get('result').get('url')}.png"
async def list_servers_ids(self, cmd: str) -> str:
await self.init_for_speedtest()
res = await basher(cmd, timeout=10)
logger.info(f"Speedtest Execution | {res}")
if not res.get('error'):
try:
self.__output: Dict[str, Any] = json.loads(res.get('output'))
except (TypeError, AttributeError, json.decoder.JSONDecodeError):
return "⚠️ Unable to get ids of servers"
else:
tmp = '\n'.join(
f"`{k.get('id')}` **|** {k.get('name')} **|** {k.get('location')} {k.get('country')}"
for k in self.__output.get('servers')
)
return f"**Speedtest节点列表**\n{tmp}"
return f"⚠️ Error\n```{res.get('error')}```"
def get_server(self) -> str:
location = self.__server.get('location')
country = self.__server.get('country')
return f"`{location} {country}`"
def get_sponsor(self) -> str:
return f"`{self.__server.get('name')}`"
def get_speed(self, opt: str) -> str:
"""
Args:
opt (str): upload or download
Returns:
str: Convert to bits
"""
def convert(bits) -> str:
"""Unit conversion"""
power = 1000
n = 0
units = {
0: 'bps',
1: 'Kbps',
2: 'Mbps',
3: 'Gbps',
4: 'Tbps'
}
while bits > power:
bits = bits / power
n = n + 1
return f"{bits:.3f} {units.get(n)}"
return f"`{convert(self.__output.get(opt).get('bandwidth')*8)}`"
def get_ping(self, opt: str) -> str:
return f"`{self.__output.get('ping').get(opt):.3f}`"
def get_time(self) -> str:
return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
async def init_for_speedtest(self, opt: str = 'install') -> Optional[str]:
exists_file = path.exists(SPEEDTEST_PATH_FILE)
arch = platform.uname().machine
if arch not in ["x86_64", "aarch64"]:
text = f"Unsupported System Architecture: {arch}"
logger.warning(text)
return text
elif opt == 'install':
if not exists_file:
await self.__download_file(arch)
logger.success("First install speedtest")
return
elif opt == 'update':
os.remove(SPEEDTEST_PATH_FILE)
await self.__download_file(arch)
if path.exists(SPEEDTEST_PATH_FILE):
text = "✅ Update speedtest successfully."
logger.success(text)
return text
return "❌ Failed to update speedtest"
else:
raise ValueError(f'Wrong speedtest option {opt}')
async def __download_file(self, arch: str) -> None:
await basher(
INSTALL_SPEEDTEST.replace('<arch>', arch, 1),
timeout=30
)
# import asyncio
# import json
# from datetime import datetime
# from time import time
# from typing import Any, Dict, Tuple
# from loguru import logger
# from .helpers import execute
# class Speedtester:
# def __init__(self) -> None:
# pass
# async def __aenter__(self):
# self._timer = time()
# logger.info(f"Speedtest start in {self._timer}")
# return self
# async def __aexit__(self, exc_type, exc_val, exc_tb):
# logger.info(
# f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
# async def running(self, cmd: str) -> Tuple[str]:
# """开始执行speedtest
# Args:
# cmd (str, optional): speedtest的完整指令需要返回json格式.
# Defaults to 'speedtest-cli --share --json'.
# Returns:
# Tuple[str]: 第一个值是文本/错误第二个是图片link
# """
# # 超时报错
# res = await asyncio.wait_for(execute(cmd), timeout=60)
# logger.info(f"Speedtest Execution | {res}")
# await logger.complete()
# if not res.get('error'):
# if 'list' in cmd:
# return res.get('output'), ''
# try:
# # output result
# self.__output: Dict[str, Any] = json.loads(res.get('output'))
# self.__server: Dict[str, Any] = self.__output.get('server')
# except (TypeError, AttributeError, json.decoder.JSONDecodeError):
# return "⚠️ Unable to get detailed data.", ''
# else:
# text = "**Speedtest**\n" \
# f"Server: {self.get_server()}\n" \
# f"Sponsor: {self.get_sponsor()}\n" \
# f"Upload: {self.get_speed('upload')}\n" \
# f"Download: {self.get_speed('download')}\n" \
# f"Latency: {self.get_latency()}\n" \
# f"Time: {self.get_time()}"
# return text, self.__output.get('share')
# return f"⚠️ Error | {res.get('error')}", ''
# def get_server(self) -> str:
# country = self.__server.get('country')
# cc = self.__server.get('cc')
# return f"`{country} - {cc}`"
# def get_sponsor(self) -> str:
# return f"`{self.__server.get('sponsor')}`"
# def get_speed(self, option: str) -> str:
# """
# Args:
# option (str): upload or download
# Returns:
# str: Convert to bits
# """
# return f"`{self.convert(self.__output.get(option))}`"
# def get_latency(self) -> str:
# return f"`{self.__server.get('latency')}`"
# def get_time(self) -> str:
# return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
# @staticmethod
# def convert(bits) -> str:
# """Unit conversion"""
# power = 1024
# n = 0
# units = {
# 0: 'bps',
# 1: 'Kbps',
# 2: 'Mbps',
# 3: 'Gbps',
# 4: 'Tbps'
# }
# while bits > power:
# bits = bits / power
# n = n + 1
# return f"{bits:.3f} {units.get(n)}"

218
tools/stickers.py Normal file
View File

@ -0,0 +1,218 @@
import asyncio
from math import floor
from typing import Optional
import emoji
from loguru import logger
from PIL import Image, UnidentifiedImageError
from pyrogram import Client
from pyrogram.errors import FloodWait, RPCError
from pyrogram.types import Message
from .constants import STICKER_BOT, STICKER_IMG
from .helpers import Parameters, delete_this
class StickerLocker:
"""贴纸指令🔒"""
def __init__(self) -> None:
self._lock = asyncio.Lock()
def get_lock(self) -> None:
return self._lock
sticker_locker = StickerLocker()
class StickerEvent:
"""贴纸对话事件"""
def __init__(self) -> None:
self._cond = asyncio.Condition()
def get_response(self) -> asyncio.Condition:
return self._cond
def wait(self):
return self._cond.wait()
def notify(self):
return self._cond.notify()
sticker_cond = StickerEvent()
class StickerAdder:
"""
新增贴纸的指令`-s`实现详细步骤
1解禁机器人(`@Stickers`)
2发送/cancel
3检测目标消息是否为贴纸
3-1转发贴纸到@Stickerss
3-2获取并发送emoji
3-3发送/done
3-4结束指令
4若不是则检测是否为图片或图片格式的文件
1转化图片
"""
def __init__(self, cli: Client, msg: Message) -> None:
self._cli = cli
self._msg = msg
self._bot_id = STICKER_BOT
self._count = 0
def is_finished(self, pkg_existed: bool) -> bool:
return (pkg_existed and self._count == 6) or \
(not pkg_existed and self._count == 8)
async def do_cancel(self) -> None:
"""取消原指令残留效果"""
self._count = 0
await self.send_message('/cancel')
async def send_message(self, text: str) -> None:
"""发送指令(或`emoji`)给贴纸"""
try:
await self._cli.send_message(
self._bot_id, text,
disable_notification=True)
except FloodWait as e:
await asyncio.sleep(e.x)
await self._cli.send_message(
self._bot_id, text,
disable_notification=True)
except RPCError as e:
logger.warning(e)
else:
await self.__wait_for()
async def send_emoji(self) -> None:
if self._msg.reply_to_message.sticker:
an_emoji = self._msg.reply_to_message.sticker.emoji
else:
_, arg = Parameters.get(self._msg)
if emoji.is_emoji(arg):
an_emoji = arg
else:
an_emoji = '⚡️'
await self.send_message(an_emoji)
async def send_retries(self, n: int) -> None:
try:
retry_text = f"⚠️ Retrying {n+1} times ..."
await self._msg.edit_text(retry_text)
logger.warning(retry_text)
except RPCError as e:
logger.warning(e)
finally:
await logger.complete()
async def upload_photo(self) -> Optional[bool]:
"""下载图片/图片文件,修剪后发送至@Stickers"""
img = await self._msg.reply_to_message.download(STICKER_IMG)
if not img:
return True
try:
resize_image(img)
except UnidentifiedImageError as e:
logger.warning(e)
return True
try:
await self._cli.send_document(
self._bot_id, document=img)
except FloodWait as e:
await asyncio.sleep(e.x)
await self._cli.send_document(
self._bot_id, document=img)
except RPCError as e:
logger.warning(e)
else:
await self.__wait_for()
async def edit_text(self, text, parse_mode: Optional[str] = None) -> None:
"""编辑消息"""
try:
await self._msg.edit_text(text, parse_mode=parse_mode)
except FloodWait as e:
await asyncio.sleep(e.x)
await self._msg.edit_text(text, parse_mode=parse_mode)
except RPCError as e:
logger.warning(e)
async def done(self, text: str, parse_mode: Optional[str] = None) -> None:
try:
await self.edit_text(text, parse_mode=parse_mode)
await asyncio.sleep(3.5)
await delete_this(self._msg)
except FloodWait as e:
await asyncio.sleep(e.x)
await self.edit_text(text, parse_mode=parse_mode)
await asyncio.sleep(3.5)
await delete_this(self._msg)
except RPCError as e:
logger.warning(e)
async def mark_as_read(self) -> None:
"""自动已读机器人的消息"""
try:
await self._cli.read_history(self._bot_id)
except FloodWait as e:
await asyncio.sleep(e.x)
await self._cli.read_history(self._bot_id)
except RPCError as e:
logger.warning(e)
async def __wait_for(self) -> None:
"""等待贴纸机器人(`@Stickers`)的回应"""
async with sticker_cond.get_response():
await asyncio.wait_for(sticker_cond.wait(), timeout=5)
logger.debug(
f"Counter of response from @Stickers is {self._count}"
)
self._count = self._count + 1
await self.mark_as_read()
def resize_image(photo: str):
with Image.open(photo) as img:
maxsize = (512, 512)
if img.width < 512 or img.height < 512:
w = img.width
h = img.height
if w > h:
scale = 512 / w
size1new = 512
size2new = h * scale
else:
scale = 512 / h
size1new = w * scale
size2new = 512
size_new = (floor(size1new), floor(size2new))
img = img.resize(size_new)
else:
img.thumbnail(maxsize)
img.save(photo, format='png')
return
def isEmoji(content):
if not content:
return False
if u"\U0001F600" <= content <= u"\U0001F64F":
return True
elif u"\U0001F300" <= content <= u"\U0001F5FF":
return True
elif u"\U0001F680" <= content <= u"\U0001F6FF":
return True
elif u"\U0001F1E0" <= content <= u"\U0001F1FF":
return True
else:
return False

76
tools/storage.py Normal file
View File

@ -0,0 +1,76 @@
import asyncio
import pickle
from os import mkdir, path
from typing import Any, Dict
class SimpleStore:
"""简单的存一些东西,凑合用用"""
def __init__(
self,
file_name: str = './data/app.pickle',
auto_flush: bool = True
) -> None:
self.__lock = asyncio.Lock()
self.__file_name = file_name
self.__auto_flush = auto_flush
try:
self.__store = pickle.load(open(file_name, 'rb'), encoding='utf-8')
except EOFError:
self.__store = {}
except FileNotFoundError:
self.__store = {}
if not path.exists("./data"):
mkdir('./data')
pickle.dump({}, open(file_name, 'wb'))
except Exception as e:
raise e
async def __aenter__(self):
await self.__lock.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.__auto_flush:
self.flush()
if self.__lock.locked():
self.__lock.release()
def get_lock(self) -> asyncio.Lock:
return self.__lock
@property
def data(self) -> Dict[str, Any]:
return self.__store
def get_data(self, key: Any) -> Dict: # , typed: Any
if self.__store.get(key):
return self.__store[key]
else:
self.__store[key] = {}
return self.__store[key]
# def update(self, data: Dict):
# return self.__store.update(data)
# def clear(self) -> Dict:
# self.__store.clear()
# def getter(self, key: Any) -> Any:
# return self.__store.get(key)
# def setter(self, key: Any, value: Any) -> None:
# self.__store[key] = value
# def deleter(self, key: Any) -> Optional[Any]:
# return self.__store.pop(key, None)
def flush(self):
"""更新数据并持久化到pickle文件"""
pickle.dump(self.__store, open(self.__file_name, 'wb'))
# storage = SimpleStorage('./data/app.pickle')