NeteaseCloudMusic_PythonSDK/package/NeteaseCloudMusic/main.py

278 lines
10 KiB
Python
Raw Normal View History

2023-12-11 10:31:07 +00:00
import json
import os.path
import socket
import time
2023-12-11 10:31:07 +00:00
from pprint import pprint
import http.cookies
import datetime
2023-12-16 15:18:26 +00:00
from diskcache import Cache
2023-12-11 10:31:07 +00:00
import pkg_resources
import requests
from py_mini_racer import py_mini_racer
from .help import api_list
from .utils import format_cookie_str, prase_cookie_str
import urllib.parse
2023-12-11 10:31:07 +00:00
class NeteaseCloudMusicApi:
__cookie = None
__ip = None
2023-12-16 15:18:26 +00:00
cache = Cache('cache', timeout=120) # 设置缓存目录和过期时间
# cache = TTLCache(maxsize=100, ttl=120) # 设置缓存大小为100缓存项的生存时间为120秒
def __init__(self, debug=False, cache=False):
2023-12-11 10:31:07 +00:00
self.DEBUG = debug # 是否开启调试模式
self.CACHE = cache # 是否开启缓存
2023-12-11 10:31:07 +00:00
self.special_api = {"/playlist/track/all": self.playlist_track_all,
"/login/cellphone": self.login_cellphone,
"/inner/version": self.inner_version,
"/login/refresh": self.login_refresh}
2023-12-11 10:31:07 +00:00
# 载入js代码
resource_path = pkg_resources.resource_filename(__name__, 'NeteaseCloudMusicApi.js')
with open(resource_path, 'r', encoding='utf-8') as file:
js_code = file.read()
self.ctx = py_mini_racer.MiniRacer()
self.ctx.eval(js_code)
def request(self, name: str, query: dict = None) -> dict:
"""
调用接口
接口文档地址 https://docs.neteasecloudmusicapi.binaryify.com
:param name: api名称 例如: song_url_v1, /song/url/v1
:param query: 请求参数
:return: 请求结果 示例{"code": 200, "data": {}, "msg": "success"}
"""
special = {
'daily_signin': '/daily_signin',
'fm_trash': '/fm_trash',
'personal_fm': '/personal_fm',
}
yubei_special = {'/yunbei/tasks/receipt': '/yunbei/receipt',
'/yunbei/tasks/expense': '/yunbei/expense'} # 这俩个接口准换的路由莫名奇妙
# 测试name是否合法
name.replace("\\", "/")
if not name.startswith("/"):
if name in special.keys():
name = special[name]
else:
name = "/" + name
name = name.replace("_", "/")
# 处理俩个云贝接口名称转换问题
if name in yubei_special.keys():
name = yubei_special[name]
# print("转换了个麻烦的路由", name)
2023-12-11 10:31:07 +00:00
if name not in api_list():
if name not in yubei_special.values():
raise Exception(f"apiName: {name} not foundplease use ”api_list()“ to view the interface list")
2023-12-16 15:18:26 +00:00
# 生成一个唯一的键,用于在缓存中查找结果
cache_key = (name, frozenset(query.items()) if query else None)
if self.CACHE:
# 检查缓存中是否已经有了结果
if self.cache.get(cache_key):
return self.cache.get(cache_key)
2023-12-16 15:18:26 +00:00
2023-12-11 10:31:07 +00:00
if query is None:
query = {}
2023-12-16 15:18:26 +00:00
else:
# 如果存在timestamp参数那么删除它
if query.get("timestamp"):
del query["timestamp"]
2023-12-11 10:31:07 +00:00
if query.get("cookie") is None:
query["cookie"] = self.cookie
if query.get("realIP") is None:
query["realIP"] = self.ip
else:
query["realIP"] = query.get("realIP")
# 特殊api处理
if name in self.special_api.keys():
result = self.special_api[name](query)
else:
result = self.call_api(name, query)
if self.CACHE:
# 将结果存入缓存
self.cache.set(cache_key, result)
2023-12-11 10:31:07 +00:00
return result
@staticmethod
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
print("get local ip error")
IP = "116.25.146.177"
finally:
s.close()
return IP
@property
def cookie(self):
if self.__cookie is None:
if os.path.isfile("cookie_storage"):
with open("cookie_storage", "r", encoding='utf-8') as f:
content = f.read()
try:
cookie_storage = json.loads(content)
# 验证cookie是否过期
create_time_stamp = cookie_storage['create_time_stamp']
if time.time() - create_time_stamp > 1296010:
# cookie过期了
self.__cookie = ""
else:
# 判断cookie生成时间是否超过1天
if time.time() - create_time_stamp > 86400:
# 更新cookie
# Todo login_refresh接口返回cookie好像少一些值
# self.request("/login/refresh", {"cookie": cookie_storage['cookie'], "timestamp": time.time()})
self.cookie = cookie_storage['cookie']
else:
self.__cookie = cookie_storage['cookie']
except json.JSONDecodeError and KeyError:
self.__cookie = ""
2023-12-11 10:31:07 +00:00
else:
self.__cookie = "" # 如果没有cookie文件就设置为空
return self.__cookie
@cookie.setter
def cookie(self, value):
if value is None:
self.__cookie = ""
return
"判断cookie是否合法, 简单检查一下关键的键"
necessary_keys = ["__csrf", "MUSIC_A_T", "MUSIC_R_T"]
cookie_dict = prase_cookie_str(value)
for key in necessary_keys:
if cookie_dict.get(key) is None:
raise Exception(f"cookie is illegal, missing key: {key}.")
self.__cookie = value
2023-12-11 10:31:07 +00:00
with open("cookie_storage", "w+", encoding='utf-8') as f:
f.write(json.dumps({"cookie": value, "create_time_stamp": time.time()}, indent=2, ensure_ascii=False))
2023-12-11 10:31:07 +00:00
@property
def ip(self):
if self.__ip is None:
self.__ip = self.get_local_ip()
return self.__ip
def call_api(self, name, query):
request_param = self.ctx.call('NeteaseCloudMusicApi.beforeRequest', name, query) # 拿到请求头和请求参数
# Todo 了解 py_mini_racer 返回没有自动编码 而 node可以
2023-12-11 10:31:07 +00:00
param_data = {}
if request_param["data"] != "":
for item in request_param["data"].split("&"):
# param_data[item.split("=")[0]] = urllib.parse.quote(item.split("=")[1], safe='') # 不需要编码后反而出错
2023-12-11 10:31:07 +00:00
param_data[item.split("=")[0]] = item.split("=")[1]
if request_param.get("method") == "GET":
response = requests.get(request_param["url"], params=param_data, headers=request_param["headers"])
else:
response = requests.post(request_param["url"], data=param_data, headers=request_param["headers"])
try:
data = json.loads(response.text)
except json.JSONDecodeError:
data = response.text
response_result = {
"headers": dict(response.headers),
"data": data,
"status": response.status_code,
}
result = self.ctx.call('NeteaseCloudMusicApi.afterRequest',
json.dumps(response_result),
request_param.get('crypto', None),
request_param['apiName']) # 拿到请求结果
return result
def playlist_track_all(self, query):
"""
获取歌单全部歌曲
:param query:
:return:
"""
detail_query = {"id": query.get("id"), "cookie": query.get("cookie"), "realIP": query.get("realIP")}
result = self.call_api("/playlist/detail", detail_query)
track_all_query = {"detail_result": json.dumps(result), "cookie": query.get("cookie"),
"realIP": query.get("realIP")}
if query.get("limit"):
track_all_query["limit"] = query.get("limit")
if query.get("offset"):
track_all_query["offset"] = query.get("offset")
result = self.call_api("/playlist/track/all", track_all_query)
return result
def inner_version(self, query):
"""
获取所使用的 NeteaseCloudMusicApi NeteaseCloudMusicApi_V8 版本号
:param query:
:return:
"""
result = self.ctx.call('NeteaseCloudMusicApi.inner_version')
return result
def login_cellphone(self, query):
"""
手机号登录
:param query:
:return:
"""
result = self.call_api("/login/cellphone", query)
2023-12-11 10:31:07 +00:00
# 自动 更新cookie
if result["code"] == 200:
if result.get("data").get("cookie"):
# cookie_str = format_cookie_str(result.get("data").get("cookie"))
cookie_str = result.get("data").get("cookie")
result["data"]["cookie"] = cookie_str
self.cookie = cookie_str
return result
def login_refresh(self, query):
"""
刷新登录状态
:param query:
:return:
"""
result = self.call_api("/login/refresh", query)
# 自动 更新cookie
# if result["code"] == 200:
# if result.get("data").get("cookie"):
# # cookie_str = format_cookie_str(result.get("data").get("cookie"))
# cookie_str = result.get("data").get("cookie")
# result["data"]["cookie"] = cookie_str
# pprint(cookie_str)
# self.cookie = cookie_str
return result