mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
Merge pull request #3448 from cript0nauta/master
Fix command injection vulnerability when exporting to curl or httpie
This commit is contained in:
commit
5c0be1de4a
@ -1,56 +1,73 @@
|
|||||||
|
import shlex
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from mitmproxy import ctx
|
|
||||||
from mitmproxy import command
|
|
||||||
from mitmproxy import flow
|
|
||||||
from mitmproxy import exceptions
|
|
||||||
from mitmproxy.utils import strutils
|
|
||||||
from mitmproxy.net.http.http1 import assemble
|
|
||||||
import mitmproxy.types
|
|
||||||
|
|
||||||
import pyperclip
|
import pyperclip
|
||||||
|
|
||||||
|
import mitmproxy.types
|
||||||
|
from mitmproxy import command
|
||||||
|
from mitmproxy import ctx, http
|
||||||
|
from mitmproxy import exceptions
|
||||||
|
from mitmproxy import flow
|
||||||
|
from mitmproxy.net.http.http1 import assemble
|
||||||
|
from mitmproxy.utils import strutils
|
||||||
|
|
||||||
def cleanup_request(f: flow.Flow):
|
|
||||||
|
def cleanup_request(f: flow.Flow) -> http.HTTPRequest:
|
||||||
if not hasattr(f, "request"):
|
if not hasattr(f, "request"):
|
||||||
raise exceptions.CommandError("Can't export flow with no request.")
|
raise exceptions.CommandError("Can't export flow with no request.")
|
||||||
request = f.request.copy() # type: ignore
|
assert isinstance(f, http.HTTPFlow)
|
||||||
|
request = f.request.copy()
|
||||||
request.decode(strict=False)
|
request.decode(strict=False)
|
||||||
# a bit of clean-up
|
# a bit of clean-up - these headers should be automatically set by curl/httpie
|
||||||
if request.method == 'GET' and request.headers.get("content-length", None) == "0":
|
|
||||||
request.headers.pop('content-length')
|
request.headers.pop('content-length')
|
||||||
request.headers.pop(':authority', None)
|
if request.headers.get("host", "") == request.host:
|
||||||
|
request.headers.pop("host")
|
||||||
|
if request.headers.get(":authority", "") == request.host:
|
||||||
|
request.headers.pop(":authority")
|
||||||
return request
|
return request
|
||||||
|
|
||||||
|
|
||||||
def curl_command(f: flow.Flow) -> str:
|
def request_content_for_console(request: http.HTTPRequest) -> str:
|
||||||
data = "curl "
|
try:
|
||||||
request = cleanup_request(f)
|
text = request.get_text(strict=True)
|
||||||
for k, v in request.headers.items(multi=True):
|
assert text
|
||||||
data += "--compressed " if k == 'accept-encoding' else ""
|
except ValueError:
|
||||||
data += "-H '%s:%s' " % (k, v)
|
# shlex.quote doesn't support a bytes object
|
||||||
if request.method != "GET":
|
# see https://github.com/python/cpython/pull/10871
|
||||||
data += "-X %s " % request.method
|
raise exceptions.CommandError("Request content must be valid unicode")
|
||||||
data += "'%s'" % request.url
|
escape_control_chars = {chr(i): f"\\x{i:02x}" for i in range(32)}
|
||||||
if request.content:
|
return "".join(
|
||||||
data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
|
escape_control_chars.get(x, x)
|
||||||
request.content,
|
for x in text
|
||||||
escape_single_quotes=True
|
|
||||||
)
|
)
|
||||||
return data
|
|
||||||
|
|
||||||
|
def curl_command(f: flow.Flow) -> str:
|
||||||
|
request = cleanup_request(f)
|
||||||
|
args = ["curl"]
|
||||||
|
for k, v in request.headers.items(multi=True):
|
||||||
|
if k.lower() == "accept-encoding":
|
||||||
|
args.append("--compressed")
|
||||||
|
else:
|
||||||
|
args += ["-H", f"{k}: {v}"]
|
||||||
|
|
||||||
|
if request.method != "GET":
|
||||||
|
args += ["-X", request.method]
|
||||||
|
args.append(request.url)
|
||||||
|
if request.content:
|
||||||
|
args += ["-d", request_content_for_console(request)]
|
||||||
|
return ' '.join(shlex.quote(arg) for arg in args)
|
||||||
|
|
||||||
|
|
||||||
def httpie_command(f: flow.Flow) -> str:
|
def httpie_command(f: flow.Flow) -> str:
|
||||||
request = cleanup_request(f)
|
request = cleanup_request(f)
|
||||||
data = "http %s %s" % (request.method, request.url)
|
args = ["http", request.method, request.url]
|
||||||
for k, v in request.headers.items(multi=True):
|
for k, v in request.headers.items(multi=True):
|
||||||
data += " '%s:%s'" % (k, v)
|
args.append(f"{k}: {v}")
|
||||||
|
cmd = ' '.join(shlex.quote(arg) for arg in args)
|
||||||
if request.content:
|
if request.content:
|
||||||
data += " <<< '%s'" % strutils.bytes_to_escaped_str(
|
cmd += " <<< " + shlex.quote(request_content_for_console(request))
|
||||||
request.content,
|
return cmd
|
||||||
escape_single_quotes=True
|
|
||||||
)
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def raw(f: flow.Flow) -> bytes:
|
def raw(f: flow.Flow) -> bytes:
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import shlex
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import pyperclip
|
import pyperclip
|
||||||
@ -45,39 +46,83 @@ class TestExportCurlCommand:
|
|||||||
assert export.curl_command(get_request) == result
|
assert export.curl_command(get_request) == result
|
||||||
|
|
||||||
def test_post(self, post_request):
|
def test_post(self, post_request):
|
||||||
result = "curl -H 'content-length:256' -X POST 'http://address:22/path' --data-binary '{}'".format(
|
post_request.request.content = b'nobinarysupport'
|
||||||
str(bytes(range(256)))[2:-1]
|
result = "curl -X POST http://address:22/path -d nobinarysupport"
|
||||||
)
|
|
||||||
assert export.curl_command(post_request) == result
|
assert export.curl_command(post_request) == result
|
||||||
|
|
||||||
|
def test_fails_with_binary_data(self, post_request):
|
||||||
|
# shlex.quote doesn't support a bytes object
|
||||||
|
# see https://github.com/python/cpython/pull/10871
|
||||||
|
post_request.request.headers["Content-Type"] = "application/json; charset=utf-8"
|
||||||
|
with pytest.raises(exceptions.CommandError):
|
||||||
|
export.curl_command(post_request)
|
||||||
|
|
||||||
def test_patch(self, patch_request):
|
def test_patch(self, patch_request):
|
||||||
result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
|
result = """curl -H 'header: qvalue' -X PATCH 'http://address:22/path?query=param' -d content"""
|
||||||
assert export.curl_command(patch_request) == result
|
assert export.curl_command(patch_request) == result
|
||||||
|
|
||||||
def test_tcp(self, tcp_flow):
|
def test_tcp(self, tcp_flow):
|
||||||
with pytest.raises(exceptions.CommandError):
|
with pytest.raises(exceptions.CommandError):
|
||||||
export.curl_command(tcp_flow)
|
export.curl_command(tcp_flow)
|
||||||
|
|
||||||
|
def test_escape_single_quotes_in_body(self):
|
||||||
|
request = tflow.tflow(
|
||||||
|
req=tutils.treq(
|
||||||
|
method=b'POST',
|
||||||
|
headers=(),
|
||||||
|
content=b"'&#"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
command = export.curl_command(request)
|
||||||
|
assert shlex.split(command)[-2] == '-d'
|
||||||
|
assert shlex.split(command)[-1] == "'&#"
|
||||||
|
|
||||||
|
def test_strip_unnecessary(self, get_request):
|
||||||
|
get_request.request.headers.clear()
|
||||||
|
get_request.request.headers["host"] = "address"
|
||||||
|
get_request.request.headers[":authority"] = "address"
|
||||||
|
get_request.request.headers["accept-encoding"] = "br"
|
||||||
|
result = """curl --compressed 'http://address:22/path?a=foo&a=bar&b=baz'"""
|
||||||
|
assert export.curl_command(get_request) == result
|
||||||
|
|
||||||
|
|
||||||
class TestExportHttpieCommand:
|
class TestExportHttpieCommand:
|
||||||
def test_get(self, get_request):
|
def test_get(self, get_request):
|
||||||
result = """http GET http://address:22/path?a=foo&a=bar&b=baz 'header:qvalue'"""
|
result = """http GET 'http://address:22/path?a=foo&a=bar&b=baz' 'header: qvalue'"""
|
||||||
assert export.httpie_command(get_request) == result
|
assert export.httpie_command(get_request) == result
|
||||||
|
|
||||||
def test_post(self, post_request):
|
def test_post(self, post_request):
|
||||||
result = "http POST http://address:22/path 'content-length:256' <<< '{}'".format(
|
post_request.request.content = b'nobinarysupport'
|
||||||
str(bytes(range(256)))[2:-1]
|
result = "http POST http://address:22/path <<< nobinarysupport"
|
||||||
)
|
|
||||||
assert export.httpie_command(post_request) == result
|
assert export.httpie_command(post_request) == result
|
||||||
|
|
||||||
|
def test_fails_with_binary_data(self, post_request):
|
||||||
|
# shlex.quote doesn't support a bytes object
|
||||||
|
# see https://github.com/python/cpython/pull/10871
|
||||||
|
post_request.request.headers["Content-Type"] = "application/json; charset=utf-8"
|
||||||
|
with pytest.raises(exceptions.CommandError):
|
||||||
|
export.httpie_command(post_request)
|
||||||
|
|
||||||
def test_patch(self, patch_request):
|
def test_patch(self, patch_request):
|
||||||
result = """http PATCH http://address:22/path?query=param 'header:qvalue' 'content-length:7' <<< 'content'"""
|
result = """http PATCH 'http://address:22/path?query=param' 'header: qvalue' <<< content"""
|
||||||
assert export.httpie_command(patch_request) == result
|
assert export.httpie_command(patch_request) == result
|
||||||
|
|
||||||
def test_tcp(self, tcp_flow):
|
def test_tcp(self, tcp_flow):
|
||||||
with pytest.raises(exceptions.CommandError):
|
with pytest.raises(exceptions.CommandError):
|
||||||
export.httpie_command(tcp_flow)
|
export.httpie_command(tcp_flow)
|
||||||
|
|
||||||
|
def test_escape_single_quotes_in_body(self):
|
||||||
|
request = tflow.tflow(
|
||||||
|
req=tutils.treq(
|
||||||
|
method=b'POST',
|
||||||
|
headers=(),
|
||||||
|
content=b"'&#"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
command = export.httpie_command(request)
|
||||||
|
assert shlex.split(command)[-2] == '<<<'
|
||||||
|
assert shlex.split(command)[-1] == "'&#"
|
||||||
|
|
||||||
|
|
||||||
class TestRaw:
|
class TestRaw:
|
||||||
def test_get(self, get_request):
|
def test_get(self, get_request):
|
||||||
|
Loading…
Reference in New Issue
Block a user