mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-22 07:07:46 +00:00
🐛 Fix parse hoyolab posts
This commit is contained in:
parent
00fe2cf572
commit
7c9c5db7ac
@ -1,7 +1,8 @@
|
|||||||
|
import ujson
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, List, Optional
|
from typing import Any, List, Optional, Dict
|
||||||
|
|
||||||
from PIL import Image, UnidentifiedImageError
|
from PIL import Image, UnidentifiedImageError
|
||||||
from pydantic import BaseModel, PrivateAttr
|
from pydantic import BaseModel, PrivateAttr
|
||||||
@ -72,24 +73,55 @@ class PostInfo(BaseModel):
|
|||||||
image_urls: List[str]
|
image_urls: List[str]
|
||||||
created_at: int
|
created_at: int
|
||||||
video_urls: List[str]
|
video_urls: List[str]
|
||||||
|
content: str
|
||||||
|
|
||||||
def __init__(self, _data: dict, **data: Any):
|
def __init__(self, _data: dict, **data: Any):
|
||||||
super().__init__(**data)
|
super().__init__(**data)
|
||||||
self._data = _data
|
self._data = _data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_structured_content(data: List[Dict]) -> str:
|
||||||
|
content = []
|
||||||
|
for item in data:
|
||||||
|
if not item or item.get("insert") is None:
|
||||||
|
continue
|
||||||
|
insert = item["insert"]
|
||||||
|
if isinstance(insert, str):
|
||||||
|
if attr := item.get("attributes"):
|
||||||
|
if link := attr.get("link"):
|
||||||
|
content.append(f'<p><a href="{link}">{insert}</a></p>')
|
||||||
|
continue
|
||||||
|
content.append(f"<p>{insert}</p>")
|
||||||
|
elif isinstance(insert, dict):
|
||||||
|
if image := insert.get("image"):
|
||||||
|
content.append(f'<img src="{image}" />')
|
||||||
|
return "\n".join(content)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def paste_data(cls, data: dict, hoyolab: bool = False) -> "PostInfo":
|
def paste_data(cls, data: dict, hoyolab: bool = False) -> "PostInfo":
|
||||||
_data_post = data["post"]
|
_data_post = data["post"]
|
||||||
post = _data_post["post"]
|
post = _data_post["post"]
|
||||||
post_id = post["post_id"]
|
post_id = post["post_id"]
|
||||||
subject = post["subject"]
|
subject = post["subject"]
|
||||||
image_list = _data_post["image_list"]
|
image_list = []
|
||||||
|
image_keys = {"cover_list", "image_list"}
|
||||||
|
for key in image_keys:
|
||||||
|
image_list.extend(_data_post.get(key, []))
|
||||||
image_urls = [image["url"] for image in image_list]
|
image_urls = [image["url"] for image in image_list]
|
||||||
vod_list = _data_post.get("vod_list", [])
|
key1, key2 = ("video", "resolution") if hoyolab else ("vod_list", "resolutions")
|
||||||
video_urls = [vod["resolutions"][-1]["url"] for vod in vod_list]
|
vod_list = _data_post.get(key1, [])
|
||||||
|
if not isinstance(vod_list, list):
|
||||||
|
vod_list = [vod_list]
|
||||||
|
video_urls = [vod[key2][-1]["url"] for vod in vod_list if vod]
|
||||||
created_at = post["created_at"]
|
created_at = post["created_at"]
|
||||||
user = _data_post["user"] # 用户数据
|
user = _data_post["user"] # 用户数据
|
||||||
user_uid = user["uid"] # 用户ID
|
user_uid = user["uid"] # 用户ID
|
||||||
|
content = post["content"]
|
||||||
|
if hoyolab and ("<" not in content) and (structured_content := post.get("structured_content")):
|
||||||
|
content = PostInfo.parse_structured_content(ujson.loads(structured_content))
|
||||||
|
if hoyolab and post["view_type"] == 5:
|
||||||
|
# video
|
||||||
|
content = ujson.loads(content).get("describe", "")
|
||||||
return PostInfo(
|
return PostInfo(
|
||||||
_data=data,
|
_data=data,
|
||||||
hoyolab=hoyolab,
|
hoyolab=hoyolab,
|
||||||
@ -99,6 +131,7 @@ class PostInfo(BaseModel):
|
|||||||
image_urls=image_urls,
|
image_urls=image_urls,
|
||||||
video_urls=video_urls,
|
video_urls=video_urls,
|
||||||
created_at=created_at,
|
created_at=created_at,
|
||||||
|
content=content,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __getitem__(self, item):
|
def __getitem__(self, item):
|
||||||
|
@ -196,7 +196,7 @@ class Post(Plugin.Conversation):
|
|||||||
too_long = True
|
too_long = True
|
||||||
else:
|
else:
|
||||||
post_text += f"{escape_markdown(soup.get_text(), version=2)}\n"
|
post_text += f"{escape_markdown(soup.get_text(), version=2)}\n"
|
||||||
return post_text, too_long
|
return post_text.strip(), too_long
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def input_media(
|
def input_media(
|
||||||
@ -351,10 +351,10 @@ class Post(Plugin.Conversation):
|
|||||||
post_images = await self.gif_to_mp4(post_images)
|
post_images = await self.gif_to_mp4(post_images)
|
||||||
post_data = post_info["post"]["post"]
|
post_data = post_info["post"]["post"]
|
||||||
post_subject = post_data["subject"]
|
post_subject = post_data["subject"]
|
||||||
post_soup = BeautifulSoup(post_data["content"], features="html.parser")
|
post_soup = BeautifulSoup(post_info.content, features="html.parser")
|
||||||
post_text, too_long = self.parse_post_text(post_soup, post_subject)
|
post_text, too_long = self.parse_post_text(post_soup, post_subject)
|
||||||
url = post_info.get_url(self.short_name)
|
url = post_info.get_url(self.short_name)
|
||||||
post_text += f"\n[source]({url})"
|
post_text += f"\n\n[source]({url})"
|
||||||
if too_long or len(post_text) >= MessageLimit.CAPTION_LENGTH:
|
if too_long or len(post_text) >= MessageLimit.CAPTION_LENGTH:
|
||||||
post_text = post_text[: MessageLimit.CAPTION_LENGTH]
|
post_text = post_text[: MessageLimit.CAPTION_LENGTH]
|
||||||
await message.reply_text(f"警告!图片字符描述已经超过 {MessageLimit.CAPTION_LENGTH} 个字,已经切割")
|
await message.reply_text(f"警告!图片字符描述已经超过 {MessageLimit.CAPTION_LENGTH} 个字,已经切割")
|
||||||
|
Loading…
Reference in New Issue
Block a user