add proxy server addon (wip)

This commit is contained in:
Maximilian Hils 2017-06-28 14:09:50 +02:00
parent f28e7e5f0f
commit 4bca88608b
3 changed files with 72 additions and 11 deletions

View File

@ -10,6 +10,7 @@ from mitmproxy.addons import cut
from mitmproxy.addons import disable_h2c from mitmproxy.addons import disable_h2c
from mitmproxy.addons import export from mitmproxy.addons import export
from mitmproxy.addons import onboarding from mitmproxy.addons import onboarding
from mitmproxy.addons import proxyserver
from mitmproxy.addons import proxyauth from mitmproxy.addons import proxyauth
from mitmproxy.addons import script from mitmproxy.addons import script
from mitmproxy.addons import serverplayback from mitmproxy.addons import serverplayback
@ -39,6 +40,7 @@ def default_addons():
export.Export(), export.Export(),
onboarding.Onboarding(), onboarding.Onboarding(),
proxyauth.ProxyAuth(), proxyauth.ProxyAuth(),
proxyserver.Proxyserver(),
script.ScriptLoader(), script.ScriptLoader(),
serverplayback.ServerPlayback(), serverplayback.ServerPlayback(),
mapremote.MapRemote(), mapremote.MapRemote(),

View File

@ -0,0 +1,51 @@
import asyncio
import threading
from mitmproxy import ctx
from mitmproxy.proxy.protocol2.server.server_async import ConnectionHandler
class Proxyserver:
"""
This addon runs the actual proxy server.
"""
def __init__(self):
self.server = None
self.loop = asyncio.get_event_loop()
self.listen_port = None
self.event_queue = None
self._lock = asyncio.Lock()
def running(self):
self.event_queue = ctx.master.event_queue
threading.Thread(target=self.loop.run_forever, daemon=True).start()
async def start(self):
async with self._lock:
if self.server:
print("Stopping server...")
self.server.close()
await self.server.wait_closed()
print("Starting server...")
self.server = await asyncio.start_server(
self.handle_connection,
'127.0.0.1',
self.listen_port,
loop=self.loop
)
async def handle_connection(self, r, w):
await ConnectionHandler(self.event_queue, r, w).handle_client()
def configure(self, updated):
if "listen_port" in updated:
self.listen_port = ctx.options.listen_port + 1
# not sure if this actually required...
self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self.start()))
def request(self, flow):
print("Changing port...")
ctx.options.listen_port += 1

View File

@ -11,6 +11,7 @@ import collections
import socket import socket
from typing import MutableMapping from typing import MutableMapping
from mitmproxy import controller
from mitmproxy.proxy.protocol2 import events, commands from mitmproxy.proxy.protocol2 import events, commands
from mitmproxy.proxy.protocol2.context import Client, Context from mitmproxy.proxy.protocol2.context import Client, Context
from mitmproxy.proxy.protocol2.context import Connection from mitmproxy.proxy.protocol2.context import Connection
@ -20,11 +21,12 @@ StreamIO = collections.namedtuple('StreamIO', ['r', 'w'])
class ConnectionHandler: class ConnectionHandler:
def __init__(self, reader, writer): def __init__(self, event_queue, reader, writer):
addr = writer.get_extra_info('peername') addr = writer.get_extra_info('peername')
self.client = Client(addr) self.client = Client(addr)
self.context = Context(self.client) self.context = Context(self.client)
self.event_queue = event_queue
# self.layer = ReverseProxy(self.context, ("localhost", 443)) # self.layer = ReverseProxy(self.context, ("localhost", 443))
self.layer = ReverseProxy(self.context, ("localhost", 80)) self.layer = ReverseProxy(self.context, ("localhost", 80))
@ -35,6 +37,10 @@ class ConnectionHandler:
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
@classmethod
async def handle(cls, reader, writer):
await cls(reader, writer).handle_client()
async def handle_client(self): async def handle_client(self):
await self.server_event(events.Start()) await self.server_event(events.Start())
await self.handle_connection(self.client) await self.handle_connection(self.client)
@ -86,7 +92,16 @@ class ConnectionHandler:
await self.server_event(events.OpenConnectionReply(command, None)) await self.server_event(events.OpenConnectionReply(command, None))
await self.handle_connection(command.connection) await self.handle_connection(command.connection)
async def server_event(self, event: events.Event): async def handle_hook(self, hook: commands.Hook) -> None:
# TODO: temporary glue code - let's see how many years it survives.
hook.data.reply = controller.Reply(hook.data)
q = asyncio.Queue()
hook.data.reply.q = q
self.event_queue.put((hook.name, hook.data))
reply = await q.get()
await self.server_event(events.HookReply(hook, reply))
async def server_event(self, event: events.Event) -> None:
print("*", type(event).__name__) print("*", type(event).__name__)
async with self.lock: async with self.lock:
print("<#", event) print("<#", event)
@ -98,10 +113,9 @@ class ConnectionHandler:
elif isinstance(command, commands.SendData): elif isinstance(command, commands.SendData):
self.transports[command.connection].w.write(command.data) self.transports[command.connection].w.write(command.data)
elif isinstance(command, commands.Hook): elif isinstance(command, commands.Hook):
# TODO: pass to master here.
print(f"~ {command.name}: {command.data}") print(f"~ {command.name}: {command.data}")
asyncio.ensure_future( asyncio.ensure_future(
self.server_event(events.HookReply(command, None)) self.handle_hook(command)
) )
elif isinstance(command, commands.CloseConnection): elif isinstance(command, commands.CloseConnection):
asyncio.ensure_future( asyncio.ensure_future(
@ -111,16 +125,10 @@ class ConnectionHandler:
raise NotImplementedError(f"Unexpected event: {command}") raise NotImplementedError(f"Unexpected event: {command}")
print("#>") print("#>")
if __name__ == "__main__": if __name__ == "__main__":
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
coro = asyncio.start_server(ConnectionHandler.handle, '127.0.0.1', 8080, loop=loop)
async def handle(reader, writer):
await ConnectionHandler(reader, writer).handle_client()
coro = asyncio.start_server(handle, '127.0.0.1', 8080, loop=loop)
server = loop.run_until_complete(coro) server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed # Serve requests until Ctrl+C is pressed