diff --git a/list.json b/list.json index c69994b..5dbc8ce 100644 --- a/list.json +++ b/list.json @@ -286,19 +286,19 @@ "section": "daily", "maintainer": "TNTcraftHIM", "size": "28.8 kb", - "supported": true, + "supported": false, "des-short": "网易云搜歌/随机热歌/点歌", "des": "可以在线搜索并且快速点歌(可回复信息点歌),也支持随机热歌功能,支持解锁VIP/灰色歌曲。命令:nem。" }, { "name": "neteasedown", - "version": "1.121", + "version": "1.2", "section": "daily", "maintainer": "xtaodada", - "size": "15.6 kb", + "size": "6.7 kb", "supported": true, "des-short": "网易云搜歌/点歌", - "des": "可以在线搜索并且快速点歌(可回复信息点歌),不支持解锁VIP/灰色歌曲。命令:ned。" + "des": "可以在线搜索并且快速点歌(可回复信息点歌),支持解锁VIP歌曲。命令:ned。" }, { "name": "killallmembers", diff --git a/neteasedown.py b/neteasedown.py index 9f890fa..823eb28 100644 --- a/neteasedown.py +++ b/neteasedown.py @@ -1,368 +1,167 @@ # -*- coding: utf-8 -*- -import os -import re -import binascii -import base64 -import json -import copy -import requests -from time import sleep -from sys import executable -from os.path import exists -from telethon.tl.types import DocumentAttributeAudio -from pagermaid.listener import listener -from pagermaid.utils import alias_command -fake_headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", # noqa - "Accept-Charset": "UTF-8,*;q=0.5", - "Accept-Encoding": "gzip,deflate,sdch", - "Accept-Language": "en-US,en;q=0.8", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:60.0) Gecko/20100101 Firefox/60.0", # noqa - "referer": "https://www.google.com"} -wget_headers = {"Accept": "*/*", - "Accept-Encoding": "identity", - "User-Agent": "Wget/1.19.5 (darwin17.5.0)"} +from os import sep, remove, listdir +from os.path import isfile +from sys import executable try: - from Crypto.Cipher import AES + from mutagen.id3 import ID3, APIC, TIT2, TPE1 + from pyncm import GetCurrentSession, apis, DumpSessionAsString, SetCurrentSession, LoadSessionFromString + from pyncm.apis import LoginFailedException + from pyncm.apis.cloudsearch import CloudSearchType - AES.new("0CoJUm6Qyw8W8jud".encode('utf-8'), - AES.MODE_CBC, "0102030405060708".encode('utf-8')) cc_imported = True except ImportError: + print(f'[!] Please install {executable} -m pip install mutagen git+https://github.com/Xtao-Labs/pyncm.git') cc_imported = False -try: - import eyed3 - - eyed3_imported = True -except ImportError: - eyed3_imported = False -class DataError(RuntimeError): - """ 得到的data中没有预期的内容 """ +from telethon.tl.types import DocumentAttributeAudio - def __init__(self, *args, **kwargs): - pass +from pagermaid.listener import listener +from pagermaid.utils import alias_command, execute -class MusicApi: - # class property - # 子类修改时使用 deepcopy - def __init__(self): - pass - - session = requests.Session() - session.headers.update(fake_headers) - - @classmethod - def request(cls, url, method="POST", data=None): - if method == "GET": - resp = cls.session.get(url, params=data, timeout=7) - else: - resp = cls.session.post(url, data=data, timeout=7) - if resp.status_code != requests.codes.ok: - raise RequestError(resp.text) - if not resp.text: - raise ResponseError("No response data.") - return resp.json() +def download_by_url(url, dest): + # Downloads generic content + response = GetCurrentSession().get(url, stream=True) + with open(dest, 'wb') as f: + for chunk in response.iter_content(1024 * 2 ** 10): + f.write(chunk) # write every 1MB read + return dest -class NeteaseApi(MusicApi): - session = copy.deepcopy(MusicApi.session) - session.headers.update({"referer": "http://music.163.com/"}) +def gen_author(song_info: dict) -> str: + data = [] + for i in song_info["songs"][0]["ar"]: + data.append(i["name"]) + return " ".join(data) - @classmethod - def encode_netease_data(cls, data) -> str: - data = json.dumps(data) - key = binascii.unhexlify("7246674226682325323F5E6544673A51") - encryptor = AES.new(key, AES.MODE_ECB) - # 补足data长度,使其是16的倍数 - pad = 16 - len(data) % 16 - fix = chr(pad) * pad - byte_data = (data + fix).encode("utf-8") - return binascii.hexlify(encryptor.encrypt(byte_data)).upper().decode() - @classmethod - def encrypted_request(cls, data) -> dict: - MODULUS = ( - "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7" - "b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280" - "104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932" - "575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b" - "3ece0462db0a22b8e7" +async def netease_down(track_info: dict, song_info: dict) -> str: + if not isfile(f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}'): + # Downloding source audio + download_by_url(track_info["data"][0]["url"], + f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}') + # Downloading cover + download_by_url(song_info["songs"][0]["al"]["picUrl"], + f'data{sep}{song_info["songs"][0]["name"]}.jpg') + with open(f'data{sep}{song_info["songs"][0]["name"]}.jpg', 'rb') as f: + picData = f.read() + # 设置标签 + info = {'picData': picData, + 'title': song_info["songs"][0]["name"], + 'artist': gen_author(song_info)} + songFile = ID3(f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}') + songFile['APIC'] = APIC( # 插入封面 + encoding=3, + mime='image/jpeg', + type=3, + desc=u'Cover', + data=info['picData'] ) - PUBKEY = "010001" - NONCE = b"0CoJUm6Qyw8W8jud" - data = json.dumps(data).encode("utf-8") - secret = cls.create_key(16) - params = cls.aes(cls.aes(data, NONCE), secret) - encseckey = cls.rsa(secret, PUBKEY, MODULUS) - return {"params": params, "encSecKey": encseckey} - - @classmethod - def aes(cls, text, key): - pad = 16 - len(text) % 16 - text = text + bytearray([pad] * pad) - encryptor = AES.new(key, 2, b"0102030405060708") - ciphertext = encryptor.encrypt(text) - return base64.b64encode(ciphertext) - - @classmethod - def rsa(cls, text, pubkey, modulus): - text = text[::-1] - rs = pow(int(binascii.hexlify(text), 16), int(pubkey, 16), int(modulus, 16)) - return format(rs, "x").zfill(256) - - @classmethod - def create_key(cls, size): - return binascii.hexlify(os.urandom(size))[:16] - - -def netease_search(keyword) -> list: - eparams = { - "method": "POST", - "url": "http://music.163.com/api/cloudsearch/pc", - "params": {"s": keyword, "type": 1, "offset": 0, "limit": 5}, - } - data = {"eparams": NeteaseApi.encode_netease_data(eparams)} - - songs_list = [] - res_data = ( - NeteaseApi.request( - "http://music.163.com/api/linux/forward", method="POST", data=data + songFile['TIT2'] = TIT2( # 插入歌名 + encoding=3, + text=info['title'] ) - .get("result", {}) - .get("songs", {}) - ) - try: - for item in res_data: - if item.get("privilege", {}).get("fl", {}) == 0: - # 没有版权 - continue - # 获得歌手名字 - singers = [s.get("name", "") for s in item.get("ar", [])] - # 获得音乐的文件大小 - # TODO: 获取到的大小并不准确,考虑逐一获取歌曲详情 - if item.get("privilege", {}).get("fl", {}) >= 320000 and item.get("h", ""): - size = item.get("h", {}).get("size", 0) - elif item.get("privilege", {}).get("fl", {}) >= 192000 and item.get( - "m", "" - ): - size = item.get("m", {}).get("size", 0) - else: - size = item.get("l", {}).get("size", 0) - - song = {"id": item.get("id", ""), - "title": item.get("name", ""), - "singer": "、".join(singers), - "album": item.get("al", {}).get("name", ""), - "duration": int(item.get("dt", 0) / 1000), - "size": round(size / 1048576, 2), - "cover_url": item.get("al", {}).get("picUrl", "")} - songs_list.append(song) - except Exception as e: - raise DataError(e) - - return songs_list - - -def netease_down(info): - try: - imagedata = requests.get(info['cover_url'], headers=wget_headers).content - if not exists('data/' + info['title'] + '.mp3'): - r = requests.get( - info['song_url'], - stream=True, - headers=wget_headers, - ) - with open('data/' + info['title'] + '.mp3', "wb") as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - tag = eyed3.load(info['title'] + '.mp3') - tag.initTag() - tag = tag.tag - tag.artist = info['singer'] - tag.title = info['title'] - tag.album = info['album'] - tag.images.remove('') - tag.images.set(6, imagedata, "image/jpeg", u"Media") - tag.save(version=eyed3.id3.ID3_DEFAULT_VERSION, encoding='utf-8') - return imagedata - except Exception as e: - raise DataError - - -def netease_single(id): - song_id = id - data_detail = NeteaseApi.encrypted_request( - dict(c=json.dumps([{"id": song_id}]), ids=[song_id]) - ) - res_data_detail = NeteaseApi.request( - "http://music.163.com/weapi/v3/song/detail", method="POST", data=data_detail - ).get("songs", []) - data = NeteaseApi.encrypted_request(dict(ids=[song_id], br=32000)) - res_data = NeteaseApi.request( - "http://music.163.com/weapi/song/enhance/player/url", - method="POST", - data=data, - ).get("data", []) - if len(res_data_detail) > 0 and len(res_data) > 0: - try: - item = res_data_detail[0] - singers = [s.get("name", "") for s in item.get("ar", {})] - song = {"id": item.get("id", ""), - "title": item.get("name", ""), - "singers": singers, - "singer": "、".join(singers), - "album": item.get("al", {}).get("name", ""), - "duration": int(item.get("dt", 0) / 1000), - "cover_url": item.get("al", {}).get("picUrl", ""), - "song_url": res_data[0].get("url", ""), - "rate": int(res_data[0].get("br", 0) / 1000)} - return song - except TypeError: - raise DataError("Get song detail failed.") - else: - raise DataError("Get song detail failed.") + songFile['TPE1'] = TPE1( # 插入第一演奏家、歌手、等 + encoding=3, + text=info['artist'] + ) + songFile.save() + # 返回 + return f'data{sep}{song_info["songs"][0]["name"]}.{track_info["data"][0]["type"]}' @listener(is_plugin=True, outgoing=True, command=alias_command("ned"), description="网易云搜/点歌。", - parameters="<关键词/id>") + parameters="{关键词/id}/{login <账号> <密码>}/{clear}") async def ned(context): - if len(context.parameter) < 1: - await context.edit("**使用方法:** `-ned` `<关键词/id>`") + if not cc_imported: + await context.edit(f"[!] Please run `-sh {executable} -m pip install " + f"mutagen git+https://github.com/Xtao-Labs/pyncm.git` and then restart pagermaid.") return - else: - if not eyed3_imported or not cc_imported: - try: - await context.edit("支持库 `eyed3` `PyCryptodome` 未安装...\n正在尝试自动安装...") - await execute(f'{executable} -m pip install eyed3') - await execute(f'{executable} -m pip install pycryptodome') - await sleep(10) - result = await execute(f'{executable} -m pip show eyed3') - result_1 = await execute(f'{executable} -m pip show pycryptodome') - if len(result) > 0 and len(result_1) > 0: - await context.edit('支持库 `eyed3` `pycryptodome` 安装成功...\n正在尝试自动重启...') - await context.client.disconnect() - else: - await context.edit( - f"自动安装失败..\n请尝试手动安装 `-sh {executable} -m pip install eyed3` 和 " - f"`-sh {executable} -m pip install pycryptodome` 随后,请重启 PagerMaid-Modify 。") - return - except: + if len(context.parameter) < 1: + # 使用方法 + await context.edit(f"**使用方法:** `-{alias_command('ned')}" + " {关键词/id}/{login <账号> <密码>}`") + return + # 处理账号登录 + if context.parameter[0] == "login": + # 显示登录信息 + if len(context.parameter) == 1: + login_info = GetCurrentSession().login_info + if login_info["success"]: + await context.edit(f"已登录账号:**{login_info['content']['profile']['nickname']}**") return - return - type = 'keyword' - id = context.parameter[0] - # 测试是否为 id - try: - id = int(id) - type = 'id' - except ValueError: - pass - if type == 'keyword': - # 开始搜歌 - await context.edit(f"【{id}】搜索中 . . .") - try: - info = netease_search(id) - except DataError: - await context.edit(f"【{id}】搜索失败。") - return - if len(info) > 0: - text = f"关于【{id}】的结果如下 \n" - for i in range(len(info)): - text += f"#{i + 1}: \n歌名: {info[i]['title']}\n" - if info[i]['album']: - res = '' + \ - info[i]['album'] + '' - text += f"专辑: {res} \n" - text += f"作者: {info[i]['singer']}\n" \ - f"歌曲ID{info[i]['id']}\n————————\n" - await context.edit(text, parse_mode='html', link_preview=True) else: - await context.edit("**未搜索到结果**") - sleep(3) - await context.delete() - return - elif type == 'id': - # 开始点歌 - # 检查 id 是否为 1-5 - try: - reply = await context.get_reply_message() - except ValueError: - await context.edit("出错了呜呜呜 ~ 无效的参数。") + await context.edit(f"**未登录/登录失败**,错误信息:`{login_info['content']}`") return - if reply and 0 < id < 6: - msg = reply.message - search = re.findall(".*【(.*)】.*", msg) - if search: - try: - start = "#" + context.parameter[0] + ":" - search = ".*" + start + "(.*?)" + '————————' + ".*" - msg = re.findall(search, msg, re.S)[0] - search = ".*歌曲ID: (.*)\n.*" - title = ".*歌名: (.*?)\n.*" - title = "【" + re.findall(title, msg, re.S)[0] + "】" - id = re.findall(search, msg, re.S)[0] - if reply.sender.is_self: - await reply.edit(f"{title}点歌完成") - except: - await context.edit("出错了呜呜呜 ~ 无效的歌曲序号。") - return - else: - await context.edit("出错了呜呜呜 ~ 无效的参数。") - return - await context.edit("获取中 . . .") - try: - data = netease_single(id) - await context.edit(f"【{data['title']}】下载中 . . .") - img_data = netease_down(data) - except DataError: - await context.edit(f"【{id}】获取失败。") - return - await context.edit(f"【{data['title']}】发送中 . . .") - cap = data['singer'] + " - " + "**" + data['title'] + f"**\n#NeteaseMusic #{data['rate']}kbps " - if not exists("plugins/NeteaseMusicExtra/FastTelethon.py"): - if not exists("plugins/NeteaseMusicExtra"): - os.mkdir("plugins/NeteaseMusicExtra") - faster = requests.request( - "GET", "https://gist.githubusercontent.com/TNTcraftHIM" - "/ca2e6066ed5892f67947eb2289dd6439/raw" - "/86244b02c7824a3ca32ce01b2649f5d9badd2e49/FastTelethon.py") - if faster.status_code == 200: - with open("plugins/NeteaseMusicExtra/FastTelethon.py", "wb") as f: - f.write(faster.content) - else: - pass - try: - from NeteaseMusicExtra.FastTelethon import upload_file - file = await upload_file(context.client, open('data/' + data['title'] + '.mp3', 'rb'), - 'data/' + data['title'] + '.mp3') - except: - file = 'data/' + data['title'] + '.mp3' - if not exists("plugins/NeteaseMusicExtra/NoFastTelethon.txt"): - with open("plugins/NeteaseMusicExtra/NoFastTelethon.txt", "w") as f: - f.write("此文件出现表示 FastTelethon 支持文件在首次运行 NeteaseMusic 插件时导入失败\n这可能是因为Github" - "服务器暂时性的访问出错导致的\nFastTelethon可以提升低网络性能机型在上传文件时的效率,但是正常情况提升并不明显\n" - "如想要手动导入,可以手动下载:\nhttps://gist.githubusercontent.com/TNTcraftHIM" - "/ca2e6066ed5892f67947eb2289dd6439/raw" - "/86244b02c7824a3ca32ce01b2649f5d9badd2e49/FastTelethon.py\n并放入当前文件夹") - await bot.send_message(context.chat_id, '`FastTelethon`支持文件导入失败,上传速度可能受到影响\n' - '此提示仅出现**一次**,手动导入可参考:\n`' + os.getcwd() + - '/plugins/NeteaseMusicExtra/NoFastTelethon.txt`') - await context.client.send_file( - context.chat_id, - file, - caption=cap, - link_preview=False, - force_document=False, - thumb=img_data, - attributes=(DocumentAttributeAudio( - data['duration'], False, data['title'], data['singer']),) - ) - await context.delete() + # 过滤空参数 + if len(context.parameter) == 2: + # 登录命令格式错误 + await context.edit(f"**使用方法:** `-{alias_command('ned')} <账号> <密码>`") return + # 开始登录 + try: + apis.login.LoginViaCellphone(context.parameter[1], context.parameter[2]) + except LoginFailedException: + await context.edit("**登录失败**,请检查账号密码是否正确。") + return + # 获取登录信息 + login_info = GetCurrentSession().login_info + await context.edit(f"**登录成功**,已登录账号:**{login_info['content']['profile']['nickname']}**") + # 保存登录信息 + with open(f"data{sep}session.ncm", 'w+') as f: + f.write(DumpSessionAsString(GetCurrentSession())) + return + if context.parameter[0] == "clear": + # 清除歌曲缓存 + for i in listdir("data"): + if i.find(".mp3") != -1 or i.find(".jpg") != -1: + remove(f"data{sep}{i}") + await context.edit("**已清除缓存**") + return + # 加载登录信息 + if isfile(f"data{sep}session.ncm"): + with open(f"data{sep}session.ncm") as f: + SetCurrentSession(LoadSessionFromString(f.read())) + # 搜索歌曲 + song_id = context.arguments + # id + if song_id.isdigit(): + song_id = int(song_id) + else: + search_data = apis.cloudsearch.GetSearchResult(song_id, CloudSearchType(1), 1) + if len(search_data["result"]["songs"]) == 1: + song_id = search_data["result"]["songs"][0]["id"] + else: + await context.edit(f"**没有找到歌曲**,请检查歌曲名称是否正确。") + return + # 获取歌曲信息小于等于 320k HQ + track_info = apis.track.GetTrackAudio([song_id]) + # 获取歌曲详情 + song_info = apis.track.GetTrackDetail([song_id]) + if track_info["data"][0]["code"] == 404: + await context.edit(f"**没有找到歌曲**,请检查歌曲id是否正确。") + return + await context.edit(f"正在下载歌曲:**{song_info['songs'][0]['name']} - {gen_author(song_info)}**") + # 下载歌曲并且设置歌曲标签 + path = await netease_down(track_info, song_info) + await context.edit("正在上传歌曲。。。") + # 上传歌曲 + cap = f"「**{song_info['songs'][0]['name']}**」\n" \ + f"{gen_author(song_info)}\n" \ + f"\n" \ + f"#netease #{int(track_info['data'][0]['br'] / 1000)}kbps #{track_info['data'][0]['type']}" + duration = int(song_info['songs'][0]['dt'] / 1000) + await context.client.send_file( + context.chat_id, + path, + caption=cap, + link_preview=False, + force_document=False, + thumb=path[:-3] + 'jpg', + attributes=(DocumentAttributeAudio( + duration, False, song_info['songs'][0]['name'], gen_author(song_info)),) + ) + await context.delete()