mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
Merge pull request #1656 from mhils/improve-export-2
Improve Flow Export
This commit is contained in:
commit
ef4e9b2b85
@ -1,9 +1,11 @@
|
|||||||
|
import io
|
||||||
import json
|
import json
|
||||||
|
import pprint
|
||||||
import re
|
import re
|
||||||
import textwrap
|
import textwrap
|
||||||
import urllib
|
from typing import Any
|
||||||
|
|
||||||
import mitmproxy.net.http
|
from mitmproxy import http
|
||||||
|
|
||||||
|
|
||||||
def _native(s):
|
def _native(s):
|
||||||
@ -12,14 +14,14 @@ def _native(s):
|
|||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
def dictstr(items, indent):
|
def dictstr(items, indent: str) -> str:
|
||||||
lines = []
|
lines = []
|
||||||
for k, v in items:
|
for k, v in items:
|
||||||
lines.append(indent + "%s: %s,\n" % (repr(_native(k)), repr(_native(v))))
|
lines.append(indent + "%s: %s,\n" % (repr(_native(k)), repr(_native(v))))
|
||||||
return "{\n%s}\n" % "".join(lines)
|
return "{\n%s}\n" % "".join(lines)
|
||||||
|
|
||||||
|
|
||||||
def curl_command(flow):
|
def curl_command(flow: http.HTTPFlow) -> str:
|
||||||
data = "curl "
|
data = "curl "
|
||||||
|
|
||||||
request = flow.request.copy()
|
request = flow.request.copy()
|
||||||
@ -31,8 +33,7 @@ def curl_command(flow):
|
|||||||
if request.method != "GET":
|
if request.method != "GET":
|
||||||
data += "-X %s " % request.method
|
data += "-X %s " % request.method
|
||||||
|
|
||||||
full_url = request.scheme + "://" + request.host + request.path
|
data += "'%s'" % request.url
|
||||||
data += "'%s'" % full_url
|
|
||||||
|
|
||||||
if request.content:
|
if request.content:
|
||||||
data += " --data-binary '%s'" % _native(request.content)
|
data += " --data-binary '%s'" % _native(request.content)
|
||||||
@ -40,64 +41,54 @@ def curl_command(flow):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def python_code(flow):
|
def python_arg(arg: str, val: Any) -> str:
|
||||||
code = textwrap.dedent("""
|
if not val:
|
||||||
import requests
|
return ""
|
||||||
|
if arg:
|
||||||
url = '{url}'
|
arg += "="
|
||||||
{headers}{params}{data}
|
arg_str = "{}{},\n".format(
|
||||||
response = requests.request(
|
arg,
|
||||||
method='{method}',
|
pprint.pformat(val, 79 - len(arg))
|
||||||
url=url,{args}
|
|
||||||
)
|
|
||||||
|
|
||||||
print(response.text)
|
|
||||||
""").strip()
|
|
||||||
|
|
||||||
components = [urllib.parse.quote(c, safe="") for c in flow.request.path_components]
|
|
||||||
url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components)
|
|
||||||
|
|
||||||
args = ""
|
|
||||||
headers = ""
|
|
||||||
if flow.request.headers:
|
|
||||||
headers += "\nheaders = %s\n" % dictstr(flow.request.headers.fields, " ")
|
|
||||||
args += "\n headers=headers,"
|
|
||||||
|
|
||||||
params = ""
|
|
||||||
if flow.request.query:
|
|
||||||
params = "\nparams = %s\n" % dictstr(flow.request.query.collect(), " ")
|
|
||||||
args += "\n params=params,"
|
|
||||||
|
|
||||||
data = ""
|
|
||||||
if flow.request.body:
|
|
||||||
json_obj = is_json(flow.request.headers, flow.request.content)
|
|
||||||
if json_obj:
|
|
||||||
data = "\njson = %s\n" % dictstr(sorted(json_obj.items()), " ")
|
|
||||||
args += "\n json=json,"
|
|
||||||
else:
|
|
||||||
data = "\ndata = '''%s'''\n" % _native(flow.request.content)
|
|
||||||
args += "\n data=data,"
|
|
||||||
|
|
||||||
code = code.format(
|
|
||||||
url=url,
|
|
||||||
headers=headers,
|
|
||||||
params=params,
|
|
||||||
data=data,
|
|
||||||
method=flow.request.method,
|
|
||||||
args=args,
|
|
||||||
)
|
)
|
||||||
return code
|
return textwrap.indent(arg_str, " " * 4)
|
||||||
|
|
||||||
|
|
||||||
def is_json(headers: mitmproxy.net.http.Headers, content: bytes) -> bool:
|
def python_code(flow: http.HTTPFlow):
|
||||||
if headers:
|
code = io.StringIO()
|
||||||
ct = mitmproxy.net.http.parse_content_type(headers.get("content-type", ""))
|
|
||||||
if ct and "%s/%s" % (ct[0], ct[1]) == "application/json":
|
def writearg(arg, val):
|
||||||
try:
|
code.write(python_arg(arg, val))
|
||||||
return json.loads(content.decode("utf8", "surrogateescape"))
|
|
||||||
except ValueError:
|
code.write("import requests\n")
|
||||||
return False
|
code.write("\n")
|
||||||
return False
|
if flow.request.method.lower() in ("get", "post", "put", "head", "delete", "patch"):
|
||||||
|
code.write("response = requests.{}(\n".format(flow.request.method.lower()))
|
||||||
|
else:
|
||||||
|
code.write("response = requests.request(\n")
|
||||||
|
writearg("", flow.request.method)
|
||||||
|
url_without_query = flow.request.url.split("?", 1)[0]
|
||||||
|
writearg("", url_without_query)
|
||||||
|
|
||||||
|
writearg("params", list(flow.request.query.fields))
|
||||||
|
|
||||||
|
headers = flow.request.headers.copy()
|
||||||
|
# requests adds those by default.
|
||||||
|
for x in ("host", "content-length"):
|
||||||
|
headers.pop(x, None)
|
||||||
|
writearg("headers", dict(headers))
|
||||||
|
try:
|
||||||
|
if "json" not in flow.request.headers.get("content-type", ""):
|
||||||
|
raise ValueError()
|
||||||
|
writearg("json", json.loads(flow.request.text))
|
||||||
|
except ValueError:
|
||||||
|
writearg("data", flow.request.content)
|
||||||
|
|
||||||
|
code.seek(code.tell() - 2) # remove last comma
|
||||||
|
code.write("\n)\n")
|
||||||
|
code.write("\n")
|
||||||
|
code.write("print(response.text)")
|
||||||
|
|
||||||
|
return code.getvalue()
|
||||||
|
|
||||||
|
|
||||||
def locust_code(flow):
|
def locust_code(flow):
|
||||||
@ -111,7 +102,7 @@ def locust_code(flow):
|
|||||||
|
|
||||||
@task()
|
@task()
|
||||||
def {name}(self):
|
def {name}(self):
|
||||||
url = '{url}'
|
url = self.locust.host + '{path}'
|
||||||
{headers}{params}{data}
|
{headers}{params}{data}
|
||||||
self.response = self.client.request(
|
self.response = self.client.request(
|
||||||
method='{method}',
|
method='{method}',
|
||||||
@ -127,13 +118,12 @@ def locust_code(flow):
|
|||||||
max_wait = 3000
|
max_wait = 3000
|
||||||
""").strip()
|
""").strip()
|
||||||
|
|
||||||
components = [urllib.parse.quote(c, safe="") for c in flow.request.path_components]
|
name = re.sub('\W|^(?=\d)', '_', flow.request.path.strip("/").split("?", 1)[0])
|
||||||
name = re.sub('\W|^(?=\d)', '_', "_".join(components))
|
if not name:
|
||||||
if name == "" or name is None:
|
|
||||||
new_name = "_".join([str(flow.request.host), str(flow.request.timestamp_start)])
|
new_name = "_".join([str(flow.request.host), str(flow.request.timestamp_start)])
|
||||||
name = re.sub('\W|^(?=\d)', '_', new_name)
|
name = re.sub('\W|^(?=\d)', '_', new_name)
|
||||||
|
|
||||||
url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components)
|
path_without_query = flow.request.path.split("?")[0]
|
||||||
|
|
||||||
args = ""
|
args = ""
|
||||||
headers = ""
|
headers = ""
|
||||||
@ -148,7 +138,11 @@ def locust_code(flow):
|
|||||||
|
|
||||||
params = ""
|
params = ""
|
||||||
if flow.request.query:
|
if flow.request.query:
|
||||||
lines = [" %s: %s,\n" % (repr(k), repr(v)) for k, v in flow.request.query.collect()]
|
lines = [
|
||||||
|
" %s: %s,\n" % (repr(k), repr(v))
|
||||||
|
for k, v in
|
||||||
|
flow.request.query.collect()
|
||||||
|
]
|
||||||
params = "\n params = {\n%s }\n" % "".join(lines)
|
params = "\n params = {\n%s }\n" % "".join(lines)
|
||||||
args += "\n params=params,"
|
args += "\n params=params,"
|
||||||
|
|
||||||
@ -159,7 +153,7 @@ def locust_code(flow):
|
|||||||
|
|
||||||
code = code.format(
|
code = code.format(
|
||||||
name=name,
|
name=name,
|
||||||
url=url,
|
path=path_without_query,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
params=params,
|
params=params,
|
||||||
data=data,
|
data=data,
|
||||||
@ -167,12 +161,6 @@ def locust_code(flow):
|
|||||||
args=args,
|
args=args,
|
||||||
)
|
)
|
||||||
|
|
||||||
host = flow.request.scheme + "://" + flow.request.host
|
|
||||||
code = code.replace(host, "' + self.locust.host + '")
|
|
||||||
code = code.replace(urllib.parse.quote_plus(host), "' + quote_plus(self.locust.host) + '")
|
|
||||||
code = code.replace(urllib.parse.quote(host), "' + quote(self.locust.host) + '")
|
|
||||||
code = code.replace("'' + ", "")
|
|
||||||
|
|
||||||
return code
|
return code
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
import cgi
|
import cgi
|
||||||
|
|
||||||
from mitmproxy import flow
|
from mitmproxy import flow
|
||||||
|
|
||||||
from mitmproxy.net import http
|
from mitmproxy.net import http
|
||||||
from mitmproxy import version
|
from mitmproxy import version
|
||||||
from mitmproxy.net import tcp
|
from mitmproxy.net import tcp
|
||||||
|
from mitmproxy import connections # noqa
|
||||||
|
|
||||||
|
|
||||||
class HTTPRequest(http.Request):
|
class HTTPRequest(http.Request):
|
||||||
@ -155,22 +157,22 @@ class HTTPFlow(flow.Flow):
|
|||||||
def __init__(self, client_conn, server_conn, live=None):
|
def __init__(self, client_conn, server_conn, live=None):
|
||||||
super().__init__("http", client_conn, server_conn, live)
|
super().__init__("http", client_conn, server_conn, live)
|
||||||
|
|
||||||
self.request = None
|
self.request = None # type: HTTPRequest
|
||||||
""" :py:class:`HTTPRequest` object """
|
""" :py:class:`HTTPRequest` object """
|
||||||
self.response = None
|
self.response = None # type: HTTPResponse
|
||||||
""" :py:class:`HTTPResponse` object """
|
""" :py:class:`HTTPResponse` object """
|
||||||
self.error = None
|
self.error = None # type: flow.Error
|
||||||
""" :py:class:`Error` object
|
""" :py:class:`Error` object
|
||||||
|
|
||||||
Note that it's possible for a Flow to have both a response and an error
|
Note that it's possible for a Flow to have both a response and an error
|
||||||
object. This might happen, for instance, when a response was received
|
object. This might happen, for instance, when a response was received
|
||||||
from the server, but there was an error sending it back to the client.
|
from the server, but there was an error sending it back to the client.
|
||||||
"""
|
"""
|
||||||
self.server_conn = server_conn
|
self.server_conn = server_conn # type: connections.ServerConnection
|
||||||
""" :py:class:`ServerConnection` object """
|
""" :py:class:`ServerConnection` object """
|
||||||
self.client_conn = client_conn
|
self.client_conn = client_conn # type: connections.ClientConnection
|
||||||
""":py:class:`ClientConnection` object """
|
""":py:class:`ClientConnection` object """
|
||||||
self.intercepted = False
|
self.intercepted = False # type: bool
|
||||||
""" Is this flow currently being intercepted? """
|
""" Is this flow currently being intercepted? """
|
||||||
|
|
||||||
_stateobject_attributes = flow.Flow._stateobject_attributes.copy()
|
_stateobject_attributes = flow.Flow._stateobject_attributes.copy()
|
||||||
|
@ -1,22 +1,9 @@
|
|||||||
import requests
|
import requests
|
||||||
|
|
||||||
url = 'http://address/path'
|
response = requests.get(
|
||||||
|
'http://address:22/path',
|
||||||
headers = {
|
params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')],
|
||||||
'header': 'qvalue',
|
headers={'header': 'qvalue'}
|
||||||
'content-length': '7',
|
|
||||||
}
|
|
||||||
|
|
||||||
params = {
|
|
||||||
'a': ['foo', 'bar'],
|
|
||||||
'b': 'baz',
|
|
||||||
}
|
|
||||||
|
|
||||||
response = requests.request(
|
|
||||||
method='GET',
|
|
||||||
url=url,
|
|
||||||
headers=headers,
|
|
||||||
params=params,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print(response.text)
|
print(response.text)
|
@ -1,24 +1,10 @@
|
|||||||
import requests
|
import requests
|
||||||
|
|
||||||
url = 'http://address/path'
|
response = requests.patch(
|
||||||
|
'http://address:22/path',
|
||||||
headers = {
|
params=[('query', 'param')],
|
||||||
'header': 'qvalue',
|
headers={'header': 'qvalue'},
|
||||||
'content-length': '7',
|
data=b'content'
|
||||||
}
|
|
||||||
|
|
||||||
params = {
|
|
||||||
'query': 'param',
|
|
||||||
}
|
|
||||||
|
|
||||||
data = '''content'''
|
|
||||||
|
|
||||||
response = requests.request(
|
|
||||||
method='PATCH',
|
|
||||||
url=url,
|
|
||||||
headers=headers,
|
|
||||||
params=params,
|
|
||||||
data=data,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print(response.text)
|
print(response.text)
|
@ -1,13 +1,8 @@
|
|||||||
import requests
|
import requests
|
||||||
|
|
||||||
url = 'http://address/path'
|
response = requests.post(
|
||||||
|
'http://address:22/path',
|
||||||
data = '''content'''
|
data=b'content'
|
||||||
|
|
||||||
response = requests.request(
|
|
||||||
method='POST',
|
|
||||||
url=url,
|
|
||||||
data=data,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print(response.text)
|
print(response.text)
|
||||||
|
@ -1,23 +1,9 @@
|
|||||||
import requests
|
import requests
|
||||||
|
|
||||||
url = 'http://address/path'
|
response = requests.post(
|
||||||
|
'http://address:22/path',
|
||||||
headers = {
|
headers={'content-type': 'application/json'},
|
||||||
'content-type': 'application/json',
|
json={'email': 'example@example.com', 'name': 'example'}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
json = {
|
|
||||||
'email': 'example@example.com',
|
|
||||||
'name': 'example',
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
response = requests.request(
|
|
||||||
method='POST',
|
|
||||||
url=url,
|
|
||||||
headers=headers,
|
|
||||||
json=json,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print(response.text)
|
print(response.text)
|
@ -34,17 +34,17 @@ def req_patch():
|
|||||||
class TestExportCurlCommand:
|
class TestExportCurlCommand:
|
||||||
def test_get(self):
|
def test_get(self):
|
||||||
flow = tutils.tflow(req=req_get())
|
flow = tutils.tflow(req=req_get())
|
||||||
result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address/path?a=foo&a=bar&b=baz'"""
|
result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
|
||||||
assert export.curl_command(flow) == result
|
assert export.curl_command(flow) == result
|
||||||
|
|
||||||
def test_post(self):
|
def test_post(self):
|
||||||
flow = tutils.tflow(req=req_post())
|
flow = tutils.tflow(req=req_post())
|
||||||
result = """curl -X POST 'http://address/path' --data-binary 'content'"""
|
result = """curl -X POST 'http://address:22/path' --data-binary 'content'"""
|
||||||
assert export.curl_command(flow) == result
|
assert export.curl_command(flow) == result
|
||||||
|
|
||||||
def test_patch(self):
|
def test_patch(self):
|
||||||
flow = tutils.tflow(req=req_patch())
|
flow = tutils.tflow(req=req_patch())
|
||||||
result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address/path?query=param' --data-binary 'content'"""
|
result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
|
||||||
assert export.curl_command(flow) == result
|
assert export.curl_command(flow) == result
|
||||||
|
|
||||||
|
|
||||||
@ -100,25 +100,6 @@ class TestExportLocustTask:
|
|||||||
python_equals("data/test_flow_export/locust_task_patch.py", export.locust_task(flow))
|
python_equals("data/test_flow_export/locust_task_patch.py", export.locust_task(flow))
|
||||||
|
|
||||||
|
|
||||||
class TestIsJson:
|
|
||||||
def test_empty(self):
|
|
||||||
assert export.is_json(None, None) is False
|
|
||||||
|
|
||||||
def test_json_type(self):
|
|
||||||
headers = Headers(content_type="application/json")
|
|
||||||
assert export.is_json(headers, b"foobar") is False
|
|
||||||
|
|
||||||
def test_valid(self):
|
|
||||||
headers = Headers(content_type="application/foobar")
|
|
||||||
j = export.is_json(headers, b'{"name": "example", "email": "example@example.com"}')
|
|
||||||
assert j is False
|
|
||||||
|
|
||||||
def test_valid2(self):
|
|
||||||
headers = Headers(content_type="application/json")
|
|
||||||
j = export.is_json(headers, b'{"name": "example", "email": "example@example.com"}')
|
|
||||||
assert isinstance(j, dict)
|
|
||||||
|
|
||||||
|
|
||||||
class TestURL:
|
class TestURL:
|
||||||
def test_url(self):
|
def test_url(self):
|
||||||
flow = tutils.tflow()
|
flow = tutils.tflow()
|
||||||
|
Loading…
Reference in New Issue
Block a user