mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 23:09:44 +00:00
Feedback from PR #832
This commit is contained in:
parent
d3feaa3bc6
commit
e72a9a62a1
66
examples/custom_contentviews.py
Normal file
66
examples/custom_contentviews.py
Normal file
@ -0,0 +1,66 @@
|
||||
import string
|
||||
from libmproxy import script, flow, utils
|
||||
import libmproxy.contentviews as cv
|
||||
from netlib.http import Headers
|
||||
import lxml.html
|
||||
import lxml.etree
|
||||
|
||||
|
||||
class ViewPigLatin(cv.View):
|
||||
name = "pig_latin_HTML"
|
||||
prompt = ("pig latin HTML", "l")
|
||||
content_types = ["text/html"]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
if utils.isXML(data):
|
||||
parser = lxml.etree.HTMLParser(
|
||||
strip_cdata=True,
|
||||
remove_blank_text=True
|
||||
)
|
||||
d = lxml.html.fromstring(data, parser=parser)
|
||||
docinfo = d.getroottree().docinfo
|
||||
|
||||
def piglify(src):
|
||||
words = string.split(src)
|
||||
ret = ''
|
||||
for word in words:
|
||||
idx = -1
|
||||
while word[idx] in string.punctuation and (idx * -1) != len(word): idx -= 1
|
||||
if word[0].lower() in 'aeiou':
|
||||
if idx == -1: ret += word[0:] + "hay"
|
||||
else: ret += word[0:len(word)+idx+1] + "hay" + word[idx+1:]
|
||||
else:
|
||||
if idx == -1: ret += word[1:] + word[0] + "ay"
|
||||
else: ret += word[1:len(word)+idx+1] + word[0] + "ay" + word[idx+1:]
|
||||
ret += ' '
|
||||
return ret.strip()
|
||||
|
||||
def recurse(root):
|
||||
if hasattr(root, 'text') and root.text:
|
||||
root.text = piglify(root.text)
|
||||
if hasattr(root, 'tail') and root.tail:
|
||||
root.tail = piglify(root.tail)
|
||||
|
||||
if len(root):
|
||||
for child in root:
|
||||
recurse(child)
|
||||
|
||||
recurse(d)
|
||||
|
||||
s = lxml.etree.tostring(
|
||||
d,
|
||||
pretty_print=True,
|
||||
doctype=docinfo.doctype
|
||||
)
|
||||
return "HTML", cv.format_text(s)
|
||||
|
||||
|
||||
pig_view = ViewPigLatin()
|
||||
|
||||
|
||||
def start(context, argv):
|
||||
context.add_contentview(pig_view)
|
||||
|
||||
|
||||
def stop(context):
|
||||
context.remove_contentview(pig_view)
|
@ -479,34 +479,9 @@ class ViewWBXML(View):
|
||||
return None
|
||||
|
||||
|
||||
views = [
|
||||
ViewAuto(),
|
||||
ViewRaw(),
|
||||
ViewHex(),
|
||||
ViewJSON(),
|
||||
ViewXML(),
|
||||
ViewWBXML(),
|
||||
ViewHTML(),
|
||||
ViewHTMLOutline(),
|
||||
ViewJavaScript(),
|
||||
ViewCSS(),
|
||||
ViewURLEncoded(),
|
||||
ViewMultipart(),
|
||||
ViewImage(),
|
||||
]
|
||||
if pyamf:
|
||||
views.append(ViewAMF())
|
||||
|
||||
if ViewProtobuf.is_available():
|
||||
views.append(ViewProtobuf())
|
||||
|
||||
views = []
|
||||
content_types_map = {}
|
||||
for i in views:
|
||||
for ct in i.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.append(i)
|
||||
|
||||
view_prompts = [i.prompt for i in views]
|
||||
view_prompts = []
|
||||
|
||||
|
||||
def get_by_shortcut(c):
|
||||
@ -515,24 +490,58 @@ def get_by_shortcut(c):
|
||||
return i
|
||||
|
||||
|
||||
def add(obj):
|
||||
def add(view):
|
||||
# TODO: auto-select a different name (append an integer?)
|
||||
for i in views:
|
||||
if i.name == obj.name:
|
||||
raise ContentViewException("Duplicate view: " + obj.name)
|
||||
if i.name == view.name:
|
||||
raise ContentViewException("Duplicate view: " + view.name)
|
||||
|
||||
# TODO: the UI should auto-prompt for a replacement shortcut
|
||||
for prompt in view_prompts:
|
||||
if prompt[1] == obj.prompt[1]:
|
||||
raise ContentViewException("Duplicate view shortcut: " + obj.prompt[1])
|
||||
if prompt[1] == view.prompt[1]:
|
||||
raise ContentViewException("Duplicate view shortcut: " + view.prompt[1])
|
||||
|
||||
views.append(obj)
|
||||
views.append(view)
|
||||
|
||||
for ct in obj.content_types:
|
||||
for ct in view.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.append(obj)
|
||||
l.append(view)
|
||||
|
||||
view_prompts.append(obj.prompt)
|
||||
view_prompts.append(view.prompt)
|
||||
|
||||
|
||||
def remove(view):
|
||||
for ct in view.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.remove(view)
|
||||
|
||||
if not len(l):
|
||||
del content_types_map[ct]
|
||||
|
||||
view_prompts.remove(view.prompt)
|
||||
views.remove(view)
|
||||
|
||||
|
||||
add(ViewAuto())
|
||||
add(ViewRaw())
|
||||
add(ViewHex())
|
||||
add(ViewJSON())
|
||||
add(ViewXML())
|
||||
add(ViewWBXML())
|
||||
add(ViewHTML())
|
||||
add(ViewHTMLOutline())
|
||||
add(ViewJavaScript())
|
||||
add(ViewCSS())
|
||||
add(ViewURLEncoded())
|
||||
add(ViewMultipart())
|
||||
add(ViewImage())
|
||||
|
||||
if pyamf:
|
||||
add(ViewAMF())
|
||||
|
||||
if ViewProtobuf.is_available():
|
||||
add(ViewProtobuf())
|
||||
|
||||
def get(name):
|
||||
for i in views:
|
||||
if i.name == name:
|
||||
|
@ -24,10 +24,6 @@ from .models import ClientConnection, ServerConnection, HTTPResponse, HTTPFlow,
|
||||
from . import contentviews as cv
|
||||
|
||||
|
||||
class PluginError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AppRegistry:
|
||||
def __init__(self):
|
||||
self.apps = {}
|
||||
@ -619,43 +615,6 @@ class State(object):
|
||||
self.flows.kill_all(master)
|
||||
|
||||
|
||||
class Plugins(object):
|
||||
def __init__(self):
|
||||
self._view_plugins = {}
|
||||
|
||||
def __iter__(self):
|
||||
for plugin_type in ('view_plugins',):
|
||||
yield (plugin_type, getattr(self, '_' + plugin_type))
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key in ('view_plugins',):
|
||||
return getattr(self, '_' + key)
|
||||
else:
|
||||
return None
|
||||
|
||||
def register_view(self, id, **kwargs):
|
||||
if self._view_plugins.get(id):
|
||||
raise PluginError("Duplicate view registration for %s" % (id, ))
|
||||
|
||||
if not kwargs.get('class_ref') or not \
|
||||
callable(kwargs['class_ref']) or not \
|
||||
isinstance(kwargs['class_ref'], type):
|
||||
raise PluginError("No custom content view class passed for view %s" % (id, ))
|
||||
|
||||
script_path = inspect.stack()[1][1]
|
||||
|
||||
view_plugin = {
|
||||
'title': kwargs.get('title') or id,
|
||||
'class_ref': kwargs['class_ref'],
|
||||
'script_path': script_path,
|
||||
}
|
||||
self._view_plugins[id] = view_plugin
|
||||
|
||||
cv.add(kwargs['class_ref']())
|
||||
|
||||
print("Registered view plugin %s from script %s" % (kwargs['title'], script_path))
|
||||
|
||||
|
||||
class FlowMaster(controller.Master):
|
||||
def __init__(self, server, state):
|
||||
controller.Master.__init__(self, server)
|
||||
@ -685,8 +644,6 @@ class FlowMaster(controller.Master):
|
||||
self.stream = None
|
||||
self.apps = AppRegistry()
|
||||
|
||||
self.plugins = Plugins()
|
||||
|
||||
def start_app(self, host, port):
|
||||
self.apps.add(
|
||||
app.mapp,
|
||||
|
@ -5,6 +5,8 @@ import threading
|
||||
import shlex
|
||||
import sys
|
||||
|
||||
from . import contentviews as cv
|
||||
|
||||
|
||||
class ScriptError(Exception):
|
||||
pass
|
||||
@ -56,12 +58,11 @@ class ScriptContext:
|
||||
def app_registry(self):
|
||||
return self._master.apps
|
||||
|
||||
@property
|
||||
def plugins(self):
|
||||
if hasattr(self._master, 'plugins'):
|
||||
return self._master.plugins
|
||||
def add_contentview(self, view_obj):
|
||||
cv.add(view_obj)
|
||||
|
||||
return None
|
||||
def remove_contentview(self, view_obj):
|
||||
cv.remove(view_obj)
|
||||
|
||||
|
||||
class Script:
|
||||
|
@ -213,6 +213,7 @@ Larry
|
||||
def test_add_cv(self):
|
||||
class TestContentView(cv.View):
|
||||
name = "test"
|
||||
prompt = ("t", "test")
|
||||
|
||||
tcv = TestContentView()
|
||||
cv.add(tcv)
|
||||
|
@ -4,14 +4,6 @@ from netlib.http import Headers
|
||||
|
||||
|
||||
def test_custom_views():
|
||||
plugins = flow.Plugins()
|
||||
|
||||
# two types: view and action
|
||||
assert 'view_plugins' in dict(plugins).keys()
|
||||
|
||||
view_plugins = plugins['view_plugins']
|
||||
assert len(view_plugins) == 0
|
||||
|
||||
class ViewNoop(cv.View):
|
||||
name = "noop"
|
||||
prompt = ("noop", "n")
|
||||
@ -20,12 +12,10 @@ def test_custom_views():
|
||||
def __call__(self, data, **metadata):
|
||||
return "noop", cv.format_text(data)
|
||||
|
||||
plugins.register_view('noop',
|
||||
title='Noop View Plugin',
|
||||
class_ref=ViewNoop)
|
||||
|
||||
assert len(view_plugins) == 1
|
||||
assert view_plugins['noop']['title'] == 'Noop View Plugin'
|
||||
view_obj = ViewNoop()
|
||||
|
||||
cv.add(view_obj)
|
||||
|
||||
assert cv.get("noop")
|
||||
|
||||
@ -47,3 +37,16 @@ def test_custom_views():
|
||||
)
|
||||
)
|
||||
assert "noop" in r[0]
|
||||
|
||||
# now try removing the custom view
|
||||
cv.remove(view_obj)
|
||||
r = cv.get_content_view(
|
||||
cv.get("Auto"),
|
||||
"[1, 2, 3]",
|
||||
headers=Headers(
|
||||
content_type="text/none"
|
||||
)
|
||||
)
|
||||
assert "noop" not in r[0]
|
||||
|
||||
|
||||
|
@ -128,12 +128,3 @@ def test_command_parsing():
|
||||
s = script.Script(absfilepath, fm)
|
||||
assert os.path.isfile(s.args[0])
|
||||
|
||||
|
||||
def test_script_plugins():
|
||||
s = flow.State()
|
||||
fm = flow.FlowMaster(None, s)
|
||||
sp = tutils.test_data.path("scripts/a.py")
|
||||
p = script.Script("%s --var 40" % sp, fm)
|
||||
|
||||
assert hasattr(p.ctx, 'plugins')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user