twitter2telegram/import.py

76 lines
2.5 KiB
Python
Raw Permalink Normal View History

2023-12-01 15:18:58 +00:00
import asyncio
import time
from typing import List
from defs.feed import UsernameNotFound
from defs.models import User
2023-12-01 15:32:41 +00:00
from defs.sqlite import UserDB, TweetDB
2023-12-01 15:18:58 +00:00
from defs.update import async_get_user
from init import logs
async def get_data(tasks):
tasks_count = len(tasks) // 20 + 1 if len(tasks) % 20 else len(tasks) // 20
start_time = time.time()
for idx in range(0, len(tasks), 20):
tasks_group = tasks[idx:idx + 20]
logs.info(f"开始获取第 {idx // 20 + 1} / {tasks_count} 组用户的数据")
await asyncio.gather(*tasks_group)
logs.info(f"获取数据用时 {time.time() - start_time:.2f}")
async def get_need_update_usernames():
with open("export.txt", "r", encoding="utf-8") as f:
usernames: List[str] = [i for i in f.read().strip().split("\n") if i]
users = UserDB.get_all()
for i in usernames.copy():
if i in users:
usernames.remove(i)
return usernames
async def check_need_add(users_data) -> List[str]:
nums = len(users_data)
keys = list(users_data.keys())
values = list(users_data.values())
need_add = []
for idx in range(nums):
username = keys[idx]
user_data = values[idx]
if isinstance(user_data, User):
logs.info(f"获取 {user_data.name} (@{user_data.username}) 的数据成功,"
f"{len(user_data.tweets)} 条推文")
2023-12-01 15:32:41 +00:00
need_send_tweets = [
tweet for tweet in user_data.tweets[1:]
if not TweetDB.check_id(user_data.username, tweet.id)
]
for tweet in need_send_tweets:
TweetDB.add(user_data.username, tweet.id)
2023-12-01 15:18:58 +00:00
need_add.append(user_data.username)
elif isinstance(user_data, UsernameNotFound):
logs.warning(f"获取 {username} 的数据失败,可能用户名已改变")
return need_add
async def start_import():
logs.info("开始批量导入用户")
users = await get_need_update_usernames()
if not users:
logs.info("没有需要导入的用户")
return
logs.info(f"共有 {len(users)} 个用户需要导入")
users_data = {user: None for user in users}
tasks = [async_get_user(users_data, user) for user in users]
await get_data(tasks)
need_add = await check_need_add(users_data)
if not need_add:
logs.info("没有需要导入的用户")
return
for i in need_add:
UserDB.add(i)
logs.info(f"导入 {len(need_add)} 个用户完成")
if __name__ == '__main__':
asyncio.run(start_import())