📚 修复 Google 搜索 无法使用的问题

This commit is contained in:
Xtao_dada 2020-07-24 20:04:29 +08:00 committed by GitHub
parent aa2fcc773d
commit 7e66443e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,15 +2,12 @@
from googletrans import Translator, LANGUAGES
from os import remove
from urllib import request, parse
from math import ceil
from requests import get
from time import sleep
from threading import Thread
from bs4 import BeautifulSoup
from gtts import gTTS
from re import compile as regex_compile
from re import search, sub
from collections import deque
from pagermaid import log
from pagermaid.listener import listener, config
from pagermaid.utils import clear_emojis, attach_log, fetch_youtube_audio
@ -101,31 +98,40 @@ async def tts(context):
await context.delete()
@listener(outgoing=True, command="google",
@listener(outgoing=True, command="googletest",
description="使用 Google 查询",
parameters="<query>")
async def google(context):
async def googletest(context):
""" Searches Google for a string. """
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0"
headers = {"user-agent": USER_AGENT}
if context.arguments == "":
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
query = context.arguments
query = query.replace(' ', '+')
URL = f"https://google.com/search?q={query}"
await context.edit("正在拉取结果 . . .")
search_results = GoogleSearch().search(query=query)
results = ""
count = 0
for result in search_results.results:
if count == int(config['result_length']):
break
count += 1
title = result.title
link = result.url
desc = result.text
results += f"\n[{title}]({link}) \n`{desc}`\n"
await context.edit(f"**Google** |`{query}`| 🎙 🔍 \n"
f"{results}",
link_preview=False)
await log(f"在Google搜索引擎上查询了 `{query}`")
resp = get(URL, headers=headers)
if resp.status_code == 200:
soup = BeautifulSoup(resp.content, "html.parser")
results = ""
count = 0
for g in soup.find_all('div', class_='r'):
if count == int(config['result_length']):
break
count += 1
anchors = g.find_all('a')
if anchors:
title = g.find('h3').text
link = anchors[0]['href']
results += f"\n[{title}]({link}) \n"
await context.edit(f"**Google** |`{query}`| 🎙 🔍 \n"
f"{results}",
link_preview=False)
await log(f"在Google搜索引擎上查询了 `{query}`")
else:
await context.edit("连接到 google服务器 失败")
@listener(outgoing=True, command="fetchaudio",
@ -156,83 +162,3 @@ async def fetchaudio(context):
await context.edit("出错了呜呜呜 ~ 原声带下载失败。")
await log(f"从链接中获取了一条音频,链接: {url}.")
await context.delete()
class GoogleSearch:
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:69.0) Gecko/20100101 Firefox/69.0"
SEARCH_URL = "https://google.com/search"
RESULT_SELECTOR = "div.r > a"
TOTAL_SELECTOR = "#resultStats"
RESULTS_PER_PAGE = 10
DEFAULT_HEADERS = [
('User-Agent', USER_AGENT),
("Accept-Language", "en-US,en;q=0.5"),
]
def search(self, query, num_results=10, prefetch_pages=True, prefetch_threads=10):
search_results = []
pages = int(ceil(num_results / float(GoogleSearch.RESULTS_PER_PAGE)))
fetcher_threads = deque([])
total = None
for i in range(pages):
start = i * GoogleSearch.RESULTS_PER_PAGE
opener = request.build_opener()
opener.addheaders = GoogleSearch.DEFAULT_HEADERS
response = opener.open(GoogleSearch.SEARCH_URL + "?q=" + parse.quote(query) + ("" if start == 0 else (
"&start=" + str(start))))
soup = BeautifulSoup(response.read(), "lxml")
response.close()
if total is None:
total_text = soup.select(GoogleSearch.TOTAL_SELECTOR)[0].children.__next__()
total = int(sub("[', ]", "", search("(([0-9]+[', ])*[0-9]+)", total_text).group(1)))
results = self.parse_results(soup.select(GoogleSearch.RESULT_SELECTOR))
if len(search_results) + len(results) > num_results:
del results[num_results - len(search_results):]
search_results += results
if prefetch_pages:
for result in results:
while True:
running = 0
for thread in fetcher_threads:
if thread.is_alive():
running += 1
if running < prefetch_threads:
break
sleep(1)
fetcher_thread = Thread(target=result.get_text)
fetcher_thread.start()
fetcher_threads.append(fetcher_thread)
for thread in fetcher_threads:
thread.join()
return SearchResponse(search_results, total)
@staticmethod
def parse_results(results):
search_results = []
for result in results:
url = result["href"]
title = result.find_all('h3')[0].text
text = result.parent.parent.find_all('div', {'class': 's'})[0].text
search_results.append(SearchResult(title, url, text))
return search_results
class SearchResponse:
def __init__(self, results, total):
self.results = results
self.total = total
class SearchResult:
def __init__(self, title, url, text):
self.title = title
self.url = url
self.text = text
def get_text(self):
return self.text
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return self.__str__()