diff --git a/pagermaid/modules/external.py b/pagermaid/modules/external.py index dd4f09c..eeac31b 100644 --- a/pagermaid/modules/external.py +++ b/pagermaid/modules/external.py @@ -2,15 +2,12 @@ from googletrans import Translator, LANGUAGES from os import remove -from urllib import request, parse -from math import ceil +from requests import get from time import sleep from threading import Thread from bs4 import BeautifulSoup from gtts import gTTS from re import compile as regex_compile -from re import search, sub -from collections import deque from pagermaid import log from pagermaid.listener import listener, config from pagermaid.utils import clear_emojis, attach_log, fetch_youtube_audio @@ -101,31 +98,40 @@ async def tts(context): await context.delete() -@listener(outgoing=True, command="google", +@listener(outgoing=True, command="googletest", description="使用 Google 查询", parameters="") -async def google(context): +async def googletest(context): """ Searches Google for a string. """ + USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0" + headers = {"user-agent": USER_AGENT} if context.arguments == "": await context.edit("出错了呜呜呜 ~ 无效的参数。") return query = context.arguments + query = query.replace(' ', '+') + URL = f"https://google.com/search?q={query}" await context.edit("正在拉取结果 . . .") - search_results = GoogleSearch().search(query=query) - results = "" - count = 0 - for result in search_results.results: - if count == int(config['result_length']): - break - count += 1 - title = result.title - link = result.url - desc = result.text - results += f"\n[{title}]({link}) \n`{desc}`\n" - await context.edit(f"**Google** |`{query}`| 🎙 🔍 \n" - f"{results}", - link_preview=False) - await log(f"在Google搜索引擎上查询了 `{query}`") + resp = get(URL, headers=headers) + if resp.status_code == 200: + soup = BeautifulSoup(resp.content, "html.parser") + results = "" + count = 0 + for g in soup.find_all('div', class_='r'): + if count == int(config['result_length']): + break + count += 1 + anchors = g.find_all('a') + if anchors: + title = g.find('h3').text + link = anchors[0]['href'] + results += f"\n[{title}]({link}) \n" + await context.edit(f"**Google** |`{query}`| 🎙 🔍 \n" + f"{results}", + link_preview=False) + await log(f"在Google搜索引擎上查询了 `{query}`") + else: + await context.edit("连接到 google服务器 失败") @listener(outgoing=True, command="fetchaudio", @@ -156,83 +162,3 @@ async def fetchaudio(context): await context.edit("出错了呜呜呜 ~ 原声带下载失败。") await log(f"从链接中获取了一条音频,链接: {url}.") await context.delete() - -class GoogleSearch: - USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:69.0) Gecko/20100101 Firefox/69.0" - SEARCH_URL = "https://google.com/search" - RESULT_SELECTOR = "div.r > a" - TOTAL_SELECTOR = "#resultStats" - RESULTS_PER_PAGE = 10 - DEFAULT_HEADERS = [ - ('User-Agent', USER_AGENT), - ("Accept-Language", "en-US,en;q=0.5"), - ] - - def search(self, query, num_results=10, prefetch_pages=True, prefetch_threads=10): - search_results = [] - pages = int(ceil(num_results / float(GoogleSearch.RESULTS_PER_PAGE))) - fetcher_threads = deque([]) - total = None - for i in range(pages): - start = i * GoogleSearch.RESULTS_PER_PAGE - opener = request.build_opener() - opener.addheaders = GoogleSearch.DEFAULT_HEADERS - response = opener.open(GoogleSearch.SEARCH_URL + "?q=" + parse.quote(query) + ("" if start == 0 else ( - "&start=" + str(start)))) - soup = BeautifulSoup(response.read(), "lxml") - response.close() - if total is None: - total_text = soup.select(GoogleSearch.TOTAL_SELECTOR)[0].children.__next__() - total = int(sub("[', ]", "", search("(([0-9]+[', ])*[0-9]+)", total_text).group(1))) - results = self.parse_results(soup.select(GoogleSearch.RESULT_SELECTOR)) - if len(search_results) + len(results) > num_results: - del results[num_results - len(search_results):] - search_results += results - if prefetch_pages: - for result in results: - while True: - running = 0 - for thread in fetcher_threads: - if thread.is_alive(): - running += 1 - if running < prefetch_threads: - break - sleep(1) - fetcher_thread = Thread(target=result.get_text) - fetcher_thread.start() - fetcher_threads.append(fetcher_thread) - for thread in fetcher_threads: - thread.join() - return SearchResponse(search_results, total) - - @staticmethod - def parse_results(results): - search_results = [] - for result in results: - url = result["href"] - title = result.find_all('h3')[0].text - text = result.parent.parent.find_all('div', {'class': 's'})[0].text - search_results.append(SearchResult(title, url, text)) - return search_results - - -class SearchResponse: - def __init__(self, results, total): - self.results = results - self.total = total - - -class SearchResult: - def __init__(self, title, url, text): - self.title = title - self.url = url - self.text = text - - def get_text(self): - return self.text - - def __str__(self): - return str(self.__dict__) - - def __repr__(self): - return self.__str__()