mitmproxy/examples/contrib/mitmproxywrapper.py

166 lines
5.6 KiB
Python
Raw Permalink Normal View History

2013-05-15 22:24:18 +00:00
#!/usr/bin/env python
#
# Helper tool to enable/disable OS X proxy and wrap mitmproxy
#
# Get usage information with:
#
# mitmproxywrapper.py -h
#
import subprocess
import re
import argparse
import contextlib
import os
import sys
2013-05-15 22:24:18 +00:00
2015-05-30 00:03:28 +00:00
2016-10-17 04:29:45 +00:00
class Wrapper:
def __init__(self, port, extra_arguments=None):
2013-05-15 22:24:18 +00:00
self.port = port
self.extra_arguments = extra_arguments
2013-05-15 22:24:18 +00:00
def run_networksetup_command(self, *arguments):
2015-05-30 00:03:28 +00:00
return subprocess.check_output(
['sudo', 'networksetup'] + list(arguments))
2013-05-15 22:24:18 +00:00
def proxy_state_for_service(self, service):
2015-05-30 00:03:28 +00:00
state = self.run_networksetup_command(
'-getwebproxy',
service).splitlines()
2013-05-15 22:24:18 +00:00
return dict([re.findall(r'([^:]+): (.*)', line)[0] for line in state])
def enable_proxy_for_service(self, service):
print(f'Enabling proxy on {service}...')
2013-05-15 22:24:18 +00:00
for subcommand in ['-setwebproxy', '-setsecurewebproxy']:
2015-05-30 00:03:28 +00:00
self.run_networksetup_command(
subcommand, service, '127.0.0.1', str(
self.port))
2013-05-15 22:24:18 +00:00
def disable_proxy_for_service(self, service):
print(f'Disabling proxy on {service}...')
2013-05-15 22:24:18 +00:00
for subcommand in ['-setwebproxystate', '-setsecurewebproxystate']:
self.run_networksetup_command(subcommand, service, 'Off')
def interface_name_to_service_name_map(self):
order = self.run_networksetup_command('-listnetworkserviceorder')
2015-05-30 00:03:28 +00:00
mapping = re.findall(
r'\(\d+\)\s(.*)$\n\(.*Device: (.+)\)$',
order,
re.MULTILINE)
return {b: a for (a, b) in mapping}
2013-05-15 22:24:18 +00:00
def run_command_with_input(self, command, input):
2015-05-30 00:03:28 +00:00
popen = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
2013-05-15 22:24:18 +00:00
(stdout, stderr) = popen.communicate(input)
return stdout
2015-05-30 00:03:28 +00:00
2013-05-15 22:24:18 +00:00
def primary_interace_name(self):
scutil_script = 'get State:/Network/Global/IPv4\nd.show\n'
stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script)
interface, = re.findall(r'PrimaryInterface\s*:\s*(.+)', stdout)
return interface
def primary_service_name(self):
2015-05-30 00:03:28 +00:00
return self.interface_name_to_service_name_map()[
self.primary_interace_name()]
2013-05-15 22:24:18 +00:00
def proxy_enabled_for_service(self, service):
return self.proxy_state_for_service(service)['Enabled'] == 'Yes'
def toggle_proxy(self):
2015-05-30 00:03:28 +00:00
new_state = not self.proxy_enabled_for_service(
self.primary_service_name())
for service_name in self.connected_service_names():
if self.proxy_enabled_for_service(service_name) and not new_state:
self.disable_proxy_for_service(service_name)
elif not self.proxy_enabled_for_service(service_name) and new_state:
self.enable_proxy_for_service(service_name)
def connected_service_names(self):
scutil_script = 'list\n'
stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script)
service_ids = re.findall(r'State:/Network/Service/(.+)/IPv4', stdout)
2013-05-15 22:24:18 +00:00
service_names = []
for service_id in service_ids:
2015-05-30 00:03:28 +00:00
scutil_script = 'show Setup:/Network/Service/{}\n'.format(
service_id)
stdout = self.run_command_with_input(
'/usr/sbin/scutil',
scutil_script)
service_name, = re.findall(r'UserDefinedName\s*:\s*(.+)', stdout)
service_names.append(service_name)
2013-05-15 22:24:18 +00:00
return service_names
2013-05-15 22:24:18 +00:00
def wrap_mitmproxy(self):
with self.wrap_proxy():
cmd = ['mitmproxy', '-p', str(self.port)]
if self.extra_arguments:
cmd.extend(self.extra_arguments)
subprocess.check_call(cmd)
def wrap_honeyproxy(self):
with self.wrap_proxy():
popen = subprocess.Popen('honeyproxy.sh')
try:
popen.wait()
except KeyboardInterrupt:
popen.terminate()
@contextlib.contextmanager
def wrap_proxy(self):
connected_service_names = self.connected_service_names()
for service_name in connected_service_names:
if not self.proxy_enabled_for_service(service_name):
self.enable_proxy_for_service(service_name)
2015-05-30 00:03:28 +00:00
yield
for service_name in connected_service_names:
if self.proxy_enabled_for_service(service_name):
self.disable_proxy_for_service(service_name)
2013-05-15 22:24:18 +00:00
@classmethod
def ensure_superuser(cls):
if os.getuid() != 0:
print('Relaunching with sudo...')
os.execv('/usr/bin/sudo', ['/usr/bin/sudo'] + sys.argv)
2013-05-15 22:24:18 +00:00
@classmethod
def main(cls):
parser = argparse.ArgumentParser(
description='Helper tool for OS X proxy configuration and mitmproxy.',
2015-05-30 00:03:28 +00:00
epilog='Any additional arguments will be passed on unchanged to mitmproxy.')
parser.add_argument(
'-t',
'--toggle',
action='store_true',
help='just toggle the proxy configuration')
# parser.add_argument('--honeyproxy', action='store_true', help='run honeyproxy instead of mitmproxy')
2015-05-30 00:03:28 +00:00
parser.add_argument(
'-p',
'--port',
type=int,
help='override the default port of 8080',
default=8080)
args, extra_arguments = parser.parse_known_args()
2013-05-15 22:24:18 +00:00
wrapper = cls(port=args.port, extra_arguments=extra_arguments)
2015-05-30 00:03:28 +00:00
2013-05-15 22:24:18 +00:00
if args.toggle:
wrapper.toggle_proxy()
2016-05-29 09:02:42 +00:00
# elif args.honeyproxy:
# wrapper.wrap_honeyproxy()
2013-05-15 22:24:18 +00:00
else:
wrapper.wrap_mitmproxy()
2013-05-15 22:24:18 +00:00
if __name__ == '__main__':
Wrapper.ensure_superuser()
2013-05-15 22:24:18 +00:00
Wrapper.main()