🐛 Fix parse hoyolab posts

This commit is contained in:
xtaodada 2024-07-16 20:30:38 +08:00
parent bb34874b4d
commit ec6361fc09
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
2 changed files with 39 additions and 6 deletions

View File

@ -1,3 +1,4 @@
import ujson
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from io import BytesIO from io import BytesIO
@ -72,24 +73,55 @@ class PostInfo(BaseModel):
image_urls: List[str] image_urls: List[str]
created_at: int created_at: int
video_urls: List[str] video_urls: List[str]
content: str
def __init__(self, _data: dict, **data: Any): def __init__(self, _data: dict, **data: Any):
super().__init__(**data) super().__init__(**data)
self._data = _data self._data = _data
@staticmethod
def parse_structured_content(data: List[Dict]) -> str:
content = []
for item in data:
if not item or item.get("insert") is None:
continue
insert = item["insert"]
if isinstance(insert, str):
if attr := item.get("attributes"):
if link := attr.get("link"):
content.append(f'<p><a href="{link}">{insert}</a></p>')
continue
content.append(f"<p>{insert}</p>")
elif isinstance(insert, dict):
if image := insert.get("image"):
content.append(f'<img src="{image}" />')
return "\n".join(content)
@classmethod @classmethod
def paste_data(cls, data: dict, hoyolab: bool = False) -> "PostInfo": def paste_data(cls, data: dict, hoyolab: bool = False) -> "PostInfo":
_data_post = data["post"] _data_post = data["post"]
post = _data_post["post"] post = _data_post["post"]
post_id = post["post_id"] post_id = post["post_id"]
subject = post["subject"] subject = post["subject"]
image_list = _data_post["image_list"] image_list = []
image_keys = {"cover_list", "image_list"}
for key in image_keys:
image_list.extend(_data_post.get(key, []))
image_urls = [image["url"] for image in image_list] image_urls = [image["url"] for image in image_list]
vod_list = _data_post.get("vod_list", []) key1, key2 = ("video", "resolution") if hoyolab else ("vod_list", "resolutions")
video_urls = [vod["resolutions"][-1]["url"] for vod in vod_list] vod_list = _data_post.get(key1, [])
if not isinstance(vod_list, list):
vod_list = [vod_list]
video_urls = [vod[key2][-1]["url"] for vod in vod_list if vod]
created_at = post["created_at"] created_at = post["created_at"]
user = _data_post["user"] # 用户数据 user = _data_post["user"] # 用户数据
user_uid = user["uid"] # 用户ID user_uid = user["uid"] # 用户ID
content = post["content"]
if hoyolab and ("<" not in content) and (structured_content := post.get("structured_content")):
content = PostInfo.parse_structured_content(ujson.loads(structured_content))
if hoyolab and post["view_type"] == 5:
# video
content = ujson.loads(content).get("describe", "")
return PostInfo( return PostInfo(
_data=data, _data=data,
hoyolab=hoyolab, hoyolab=hoyolab,
@ -99,6 +131,7 @@ class PostInfo(BaseModel):
image_urls=image_urls, image_urls=image_urls,
video_urls=video_urls, video_urls=video_urls,
created_at=created_at, created_at=created_at,
content=content,
) )
def __getitem__(self, item): def __getitem__(self, item):

View File

@ -197,7 +197,7 @@ class Post(Plugin.Conversation):
too_long = True too_long = True
else: else:
post_text += f"{escape_markdown(soup.get_text(), version=2)}\n" post_text += f"{escape_markdown(soup.get_text(), version=2)}\n"
return post_text, too_long return post_text.strip(), too_long
@staticmethod @staticmethod
def input_media( def input_media(
@ -356,10 +356,10 @@ class Post(Plugin.Conversation):
post_images = await self.gif_to_mp4(post_images) post_images = await self.gif_to_mp4(post_images)
post_data = post_info["post"]["post"] post_data = post_info["post"]["post"]
post_subject = post_data["subject"] post_subject = post_data["subject"]
post_soup = BeautifulSoup(post_data["content"], features="html.parser") post_soup = BeautifulSoup(post_info.content, features="html.parser")
post_text, too_long = self.parse_post_text(post_soup, post_subject) post_text, too_long = self.parse_post_text(post_soup, post_subject)
url = post_info.get_url(self.short_name) url = post_info.get_url(self.short_name)
post_text += f"\n[source]({url})" post_text += f"\n\n[source]({url})"
if too_long or len(post_text) >= MessageLimit.CAPTION_LENGTH: if too_long or len(post_text) >= MessageLimit.CAPTION_LENGTH:
post_text = post_text[: MessageLimit.CAPTION_LENGTH] post_text = post_text[: MessageLimit.CAPTION_LENGTH]
await message.reply_text(f"警告!图片字符描述已经超过 {MessageLimit.CAPTION_LENGTH} 个字,已经切割") await message.reply_text(f"警告!图片字符描述已经超过 {MessageLimit.CAPTION_LENGTH} 个字,已经切割")