NeteaseCloudMusic_PythonSDK/package/NeteaseCloudMusic/main.py

281 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os.path
import socket
import time
from pprint import pprint
import http.cookies
import datetime
from diskcache import Cache
import pkg_resources
import requests
from py_mini_racer import py_mini_racer
from .help import api_list
from .utils import format_cookie_str, prase_cookie_str
import urllib.parse
class NeteaseCloudMusicApi:
__cookie = None
__ip = None
cache = Cache('cache', timeout=120) # 设置缓存目录和过期时间
# cache = TTLCache(maxsize=100, ttl=120) # 设置缓存大小为100缓存项的生存时间为120秒
def __init__(self, debug=False, cache=False):
self.DEBUG = debug # 是否开启调试模式
self.CACHE = cache # 是否开启缓存
self.special_api = {"/playlist/track/all": self.playlist_track_all,
"/login/cellphone": self.login_cellphone,
"/inner/version": self.inner_version,
"/login/refresh": self.login_refresh}
# 载入js代码
resource_path = pkg_resources.resource_filename(__name__, 'NeteaseCloudMusicApi.js')
with open(resource_path, 'r', encoding='utf-8') as file:
js_code = file.read()
self.ctx = py_mini_racer.MiniRacer()
self.ctx.eval(js_code)
def request(self, name: str, query: dict = None) -> dict:
"""
调用接口
接口文档地址: https://docs.neteasecloudmusicapi.binaryify.com
:param name: api名称 例如: song_url_v1, /song/url/v1
:param query: 请求参数
:return: 请求结果 示例:{"code": 200, "data": {}, "msg": "success"}
"""
special = {
'daily_signin': '/daily_signin',
'fm_trash': '/fm_trash',
'personal_fm': '/personal_fm',
}
yubei_special = {'/yunbei/tasks/receipt': '/yunbei/receipt',
'/yunbei/tasks/expense': '/yunbei/expense'} # 这俩个接口准换的路由莫名奇妙
# 测试name是否合法
name.replace("\\", "/")
if not name.startswith("/"):
if name in special.keys():
name = special[name]
else:
name = "/" + name
name = name.replace("_", "/")
# 处理俩个云贝接口名称转换问题
if name in yubei_special.keys():
name = yubei_special[name]
# print("转换了个麻烦的路由", name)
if name not in api_list():
if name not in yubei_special.values():
raise Exception(f"apiName: {name} not foundplease use ”api_list()“ to view the interface list")
# 生成一个唯一的键,用于在缓存中查找结果
cache_key = (name, frozenset(query.items()) if query else None)
if self.CACHE:
# 检查缓存中是否已经有了结果
if self.cache.get(cache_key):
return self.cache.get(cache_key)
if query is None:
query = {}
else:
# 如果存在timestamp参数那么删除它
if query.get("timestamp"):
del query["timestamp"]
if query.get("cookie") is None:
query["cookie"] = self.cookie
if query.get("realIP") is None:
query["realIP"] = self.ip
else:
query["realIP"] = query.get("realIP")
# 特殊api处理
if name in self.special_api.keys():
result = self.special_api[name](query)
else:
result = self.call_api(name, query)
if self.CACHE:
# 将结果存入缓存
self.cache.set(cache_key, result)
return result
@staticmethod
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
print("get local ip error")
IP = "116.25.146.177"
finally:
s.close()
return IP
@property
def cookie(self):
if self.__cookie is None:
if os.path.isfile("cookie_storage"):
with open("cookie_storage", "r", encoding='utf-8') as f:
content = f.read()
try:
cookie_storage = json.loads(content)
# 验证cookie是否过期
create_time_stamp = cookie_storage['create_time_stamp']
if time.time() - create_time_stamp > 1296010:
# cookie过期了
self.__cookie = ""
else:
# 判断cookie生成时间是否超过1天
if time.time() - create_time_stamp > 86400:
# 更新cookie
# Todo login_refresh接口返回cookie好像少一些值
# self.request("/login/refresh", {"cookie": cookie_storage['cookie'], "timestamp": time.time()})
self.cookie = cookie_storage['cookie']
else:
self.__cookie = cookie_storage['cookie']
except json.JSONDecodeError and KeyError:
self.__cookie = ""
else:
self.__cookie = "" # 如果没有cookie文件就设置为空
return self.__cookie
@cookie.setter
def cookie(self, value):
if value is None:
self.__cookie = ""
return
"判断cookie是否合法, 简单检查一下关键的键"
necessary_keys = ["__csrf", "MUSIC_A_T", "MUSIC_R_T"]
cookie_dict = prase_cookie_str(value)
for key in necessary_keys:
if cookie_dict.get(key) is None:
raise Exception(f"cookie is illegal, missing key: {key}.")
self.__cookie = value
with open("cookie_storage", "w+", encoding='utf-8') as f:
f.write(json.dumps({"cookie": value, "create_time_stamp": time.time()}, indent=2, ensure_ascii=False))
@property
def ip(self):
if self.__ip is None:
self.__ip = self.get_local_ip()
return self.__ip
def call_api(self, name, query):
request_param = self.ctx.call('NeteaseCloudMusicApi.beforeRequest', name, query) # 拿到请求头和请求参数
# print(request_param)
# Todo 了解 py_mini_racer 返回没有自动编码 而 node可以
# from urllib.parse import unquote
# param_data = {}
# if request_param["data"] != "":
# for item in request_param["data"].split("&"):
# # param_data[item.split("=")[0]] = urllib.parse.quote(item.split("=")[1], safe='') # 不需要编码后反而出错
# param_data[item.split("=")[0]] = unquote(item.split("=")[1])
if request_param.get("method") == "GET":
response = requests.get(request_param["url"], params=request_param['data'], headers=request_param["headers"])
else:
response = requests.post(request_param["url"], data=request_param['data'], headers=request_param["headers"])
try:
data = json.loads(response.text)
except json.JSONDecodeError:
data = response.text
response_result = {
"headers": dict(response.headers),
"data": data,
"status": response.status_code,
}
result = self.ctx.call('NeteaseCloudMusicApi.afterRequest',
response_result,
request_param.get('crypto', None),
request_param['apiName']) # 拿到请求结果
return result
def playlist_track_all(self, query):
"""
获取歌单全部歌曲
:param query:
:return:
"""
detail_query = {"id": query.get("id"), "cookie": query.get("cookie"), "realIP": query.get("realIP")}
result = self.call_api("/playlist/detail", detail_query)
track_all_query = {"detail_result": json.dumps(result), "cookie": query.get("cookie"),
"realIP": query.get("realIP")}
if query.get("limit"):
track_all_query["limit"] = query.get("limit")
if query.get("offset"):
track_all_query["offset"] = query.get("offset")
result = self.call_api("/playlist/track/all", track_all_query)
return result
def inner_version(self, query):
"""
获取所使用的 NeteaseCloudMusicApi 和 NeteaseCloudMusicApi_V8 版本号
:param query:
:return:
"""
result = self.ctx.call('NeteaseCloudMusicApi.inner_version')
return result
def login_cellphone(self, query):
"""
手机号登录
:param query:
:return:
"""
result = self.call_api("/login/cellphone", query)
# 自动 更新cookie
if result["code"] == 200:
if result.get("data").get("cookie"):
# cookie_str = format_cookie_str(result.get("data").get("cookie"))
cookie_str = result.get("data").get("cookie")
result["data"]["cookie"] = cookie_str
self.cookie = cookie_str
return result
def login_refresh(self, query):
"""
刷新登录状态
:param query:
:return:
"""
result = self.call_api("/login/refresh", query)
# 自动 更新cookie
# if result["code"] == 200:
# if result.get("data").get("cookie"):
# # cookie_str = format_cookie_str(result.get("data").get("cookie"))
# cookie_str = result.get("data").get("cookie")
# result["data"]["cookie"] = cookie_str
# pprint(cookie_str)
# self.cookie = cookie_str
return result