mirror of
https://github.com/2061360308/NeteaseCloudMusic_PythonSDK.git
synced 2024-11-25 16:12:15 +00:00
281 lines
10 KiB
Python
281 lines
10 KiB
Python
import json
|
||
import os.path
|
||
import socket
|
||
import time
|
||
from pprint import pprint
|
||
import http.cookies
|
||
import datetime
|
||
from diskcache import Cache
|
||
|
||
import pkg_resources
|
||
import requests
|
||
from py_mini_racer import py_mini_racer
|
||
from .help import api_list
|
||
from .utils import format_cookie_str, prase_cookie_str
|
||
import urllib.parse
|
||
|
||
|
||
class NeteaseCloudMusicApi:
|
||
__cookie = None
|
||
__ip = None
|
||
|
||
cache = Cache('cache', timeout=120) # 设置缓存目录和过期时间
|
||
|
||
# cache = TTLCache(maxsize=100, ttl=120) # 设置缓存大小为100,缓存项的生存时间为120秒
|
||
|
||
def __init__(self, debug=False, cache=False):
|
||
self.DEBUG = debug # 是否开启调试模式
|
||
self.CACHE = cache # 是否开启缓存
|
||
|
||
self.special_api = {"/playlist/track/all": self.playlist_track_all,
|
||
"/login/cellphone": self.login_cellphone,
|
||
"/inner/version": self.inner_version,
|
||
"/login/refresh": self.login_refresh}
|
||
|
||
# 载入js代码
|
||
resource_path = pkg_resources.resource_filename(__name__, 'NeteaseCloudMusicApi.js')
|
||
|
||
with open(resource_path, 'r', encoding='utf-8') as file:
|
||
js_code = file.read()
|
||
self.ctx = py_mini_racer.MiniRacer()
|
||
self.ctx.eval(js_code)
|
||
|
||
def request(self, name: str, query: dict = None) -> dict:
|
||
"""
|
||
调用接口
|
||
接口文档地址: https://docs.neteasecloudmusicapi.binaryify.com
|
||
:param name: api名称 例如: song_url_v1, /song/url/v1
|
||
:param query: 请求参数
|
||
:return: 请求结果 示例:{"code": 200, "data": {}, "msg": "success"}
|
||
"""
|
||
|
||
special = {
|
||
'daily_signin': '/daily_signin',
|
||
'fm_trash': '/fm_trash',
|
||
'personal_fm': '/personal_fm',
|
||
}
|
||
|
||
yubei_special = {'/yunbei/tasks/receipt': '/yunbei/receipt',
|
||
'/yunbei/tasks/expense': '/yunbei/expense'} # 这俩个接口准换的路由莫名奇妙
|
||
|
||
# 测试name是否合法
|
||
name.replace("\\", "/")
|
||
if not name.startswith("/"):
|
||
if name in special.keys():
|
||
name = special[name]
|
||
else:
|
||
name = "/" + name
|
||
name = name.replace("_", "/")
|
||
|
||
# 处理俩个云贝接口名称转换问题
|
||
if name in yubei_special.keys():
|
||
name = yubei_special[name]
|
||
# print("转换了个麻烦的路由", name)
|
||
|
||
if name not in api_list():
|
||
if name not in yubei_special.values():
|
||
raise Exception(f"apiName: {name} not found,please use ”api_list()“ to view the interface list")
|
||
|
||
# 生成一个唯一的键,用于在缓存中查找结果
|
||
cache_key = (name, frozenset(query.items()) if query else None)
|
||
|
||
if self.CACHE:
|
||
# 检查缓存中是否已经有了结果
|
||
if self.cache.get(cache_key):
|
||
return self.cache.get(cache_key)
|
||
|
||
if query is None:
|
||
query = {}
|
||
else:
|
||
# 如果存在timestamp参数,那么删除它
|
||
if query.get("timestamp"):
|
||
del query["timestamp"]
|
||
|
||
if query.get("cookie") is None:
|
||
query["cookie"] = self.cookie
|
||
|
||
if query.get("realIP") is None:
|
||
query["realIP"] = self.ip
|
||
else:
|
||
query["realIP"] = query.get("realIP")
|
||
|
||
# 特殊api处理
|
||
if name in self.special_api.keys():
|
||
result = self.special_api[name](query)
|
||
else:
|
||
result = self.call_api(name, query)
|
||
|
||
if self.CACHE:
|
||
# 将结果存入缓存
|
||
self.cache.set(cache_key, result)
|
||
|
||
return result
|
||
|
||
@staticmethod
|
||
def get_local_ip():
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
try:
|
||
# doesn't even have to be reachable
|
||
s.connect(('10.255.255.255', 1))
|
||
IP = s.getsockname()[0]
|
||
except Exception:
|
||
print("get local ip error")
|
||
IP = "116.25.146.177"
|
||
finally:
|
||
s.close()
|
||
return IP
|
||
|
||
@property
|
||
def cookie(self):
|
||
if self.__cookie is None:
|
||
if os.path.isfile("cookie_storage"):
|
||
with open("cookie_storage", "r", encoding='utf-8') as f:
|
||
content = f.read()
|
||
try:
|
||
cookie_storage = json.loads(content)
|
||
# 验证cookie是否过期
|
||
create_time_stamp = cookie_storage['create_time_stamp']
|
||
|
||
if time.time() - create_time_stamp > 1296010:
|
||
# cookie过期了
|
||
self.__cookie = ""
|
||
else:
|
||
# 判断cookie生成时间是否超过1天
|
||
if time.time() - create_time_stamp > 86400:
|
||
# 更新cookie
|
||
# Todo login_refresh接口返回cookie好像少一些值
|
||
# self.request("/login/refresh", {"cookie": cookie_storage['cookie'], "timestamp": time.time()})
|
||
self.cookie = cookie_storage['cookie']
|
||
else:
|
||
self.__cookie = cookie_storage['cookie']
|
||
except json.JSONDecodeError and KeyError:
|
||
self.__cookie = ""
|
||
else:
|
||
self.__cookie = "" # 如果没有cookie文件,就设置为空
|
||
|
||
return self.__cookie
|
||
|
||
@cookie.setter
|
||
def cookie(self, value):
|
||
if value is None:
|
||
self.__cookie = ""
|
||
return
|
||
|
||
"判断cookie是否合法, 简单检查一下关键的键"
|
||
necessary_keys = ["__csrf", "MUSIC_A_T", "MUSIC_R_T"]
|
||
cookie_dict = prase_cookie_str(value)
|
||
for key in necessary_keys:
|
||
if cookie_dict.get(key) is None:
|
||
raise Exception(f"cookie is illegal, missing key: {key}.")
|
||
|
||
self.__cookie = value
|
||
with open("cookie_storage", "w+", encoding='utf-8') as f:
|
||
f.write(json.dumps({"cookie": value, "create_time_stamp": time.time()}, indent=2, ensure_ascii=False))
|
||
|
||
@property
|
||
def ip(self):
|
||
if self.__ip is None:
|
||
self.__ip = self.get_local_ip()
|
||
return self.__ip
|
||
|
||
def call_api(self, name, query):
|
||
request_param = self.ctx.call('NeteaseCloudMusicApi.beforeRequest', name, query) # 拿到请求头和请求参数
|
||
|
||
# print(request_param)
|
||
|
||
# Todo 了解 py_mini_racer 返回没有自动编码 而 node可以
|
||
# from urllib.parse import unquote
|
||
# param_data = {}
|
||
# if request_param["data"] != "":
|
||
# for item in request_param["data"].split("&"):
|
||
# # param_data[item.split("=")[0]] = urllib.parse.quote(item.split("=")[1], safe='') # 不需要编码后反而出错
|
||
# param_data[item.split("=")[0]] = unquote(item.split("=")[1])
|
||
|
||
if request_param.get("method") == "GET":
|
||
response = requests.get(request_param["url"], params=request_param['data'], headers=request_param["headers"])
|
||
else:
|
||
response = requests.post(request_param["url"], data=request_param['data'], headers=request_param["headers"])
|
||
|
||
try:
|
||
data = json.loads(response.text)
|
||
except json.JSONDecodeError:
|
||
data = response.text
|
||
|
||
response_result = {
|
||
"headers": dict(response.headers),
|
||
"data": data,
|
||
"status": response.status_code,
|
||
}
|
||
|
||
result = self.ctx.call('NeteaseCloudMusicApi.afterRequest',
|
||
response_result,
|
||
request_param.get('crypto', None),
|
||
request_param['apiName']) # 拿到请求结果
|
||
|
||
return result
|
||
|
||
def playlist_track_all(self, query):
|
||
"""
|
||
获取歌单全部歌曲
|
||
:param query:
|
||
:return:
|
||
"""
|
||
|
||
detail_query = {"id": query.get("id"), "cookie": query.get("cookie"), "realIP": query.get("realIP")}
|
||
|
||
result = self.call_api("/playlist/detail", detail_query)
|
||
|
||
track_all_query = {"detail_result": json.dumps(result), "cookie": query.get("cookie"),
|
||
"realIP": query.get("realIP")}
|
||
if query.get("limit"):
|
||
track_all_query["limit"] = query.get("limit")
|
||
if query.get("offset"):
|
||
track_all_query["offset"] = query.get("offset")
|
||
|
||
result = self.call_api("/playlist/track/all", track_all_query)
|
||
return result
|
||
|
||
def inner_version(self, query):
|
||
"""
|
||
获取所使用的 NeteaseCloudMusicApi 和 NeteaseCloudMusicApi_V8 版本号
|
||
:param query:
|
||
:return:
|
||
"""
|
||
result = self.ctx.call('NeteaseCloudMusicApi.inner_version')
|
||
return result
|
||
|
||
def login_cellphone(self, query):
|
||
"""
|
||
手机号登录
|
||
:param query:
|
||
:return:
|
||
"""
|
||
result = self.call_api("/login/cellphone", query)
|
||
|
||
# 自动 更新cookie
|
||
if result["code"] == 200:
|
||
if result.get("data").get("cookie"):
|
||
# cookie_str = format_cookie_str(result.get("data").get("cookie"))
|
||
cookie_str = result.get("data").get("cookie")
|
||
result["data"]["cookie"] = cookie_str
|
||
self.cookie = cookie_str
|
||
return result
|
||
|
||
def login_refresh(self, query):
|
||
"""
|
||
刷新登录状态
|
||
:param query:
|
||
:return:
|
||
"""
|
||
result = self.call_api("/login/refresh", query)
|
||
|
||
# 自动 更新cookie
|
||
# if result["code"] == 200:
|
||
# if result.get("data").get("cookie"):
|
||
# # cookie_str = format_cookie_str(result.get("data").get("cookie"))
|
||
# cookie_str = result.get("data").get("cookie")
|
||
# result["data"]["cookie"] = cookie_str
|
||
# pprint(cookie_str)
|
||
# self.cookie = cookie_str
|
||
return result
|