import socket import asyncio from functools import partial from asyncio import get_running_loop from driver.core import aiohttpsession as session def _netcat(host, port, content): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) s.sendall(content.encode()) s.shutdown(socket.SHUT_WR) while True: data = s.recv(4096).decode("utf-8").strip("\n\x00") if not data: break return data s.close() async def paste_queue(content): loop = get_running_loop() link = await loop.run_in_executor( None, partial(_netcat, "ezup.dev", 9999, content) ) return link async def isPreviewUp(preview: str) -> bool: for _ in range(7): try: async with session.head(preview, timeout=2) as resp: status, size = resp.status, resp.content_length except asyncio.exceptions.TimeoutError: return False if status == 404 or (status == 200 and size == 0): await asyncio.sleep(0.4) else: return True if status == 200 else False return False