mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
commit
695cc23696
@ -1,4 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
|
import urllib
|
||||||
|
|
||||||
from mitmproxy.test import tutils
|
from mitmproxy.test import tutils
|
||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.test import taddons
|
from mitmproxy.test import taddons
|
||||||
@ -22,8 +24,13 @@ def test_config():
|
|||||||
with taddons.context() as tctx:
|
with taddons.context() as tctx:
|
||||||
fpath = os.path.join(p, "flows")
|
fpath = os.path.join(p, "flows")
|
||||||
tdump(fpath, [tflow.tflow(resp=True)])
|
tdump(fpath, [tflow.tflow(resp=True)])
|
||||||
tctx.configure(s, server_replay = [fpath])
|
tctx.configure(s, server_replay=[fpath])
|
||||||
tutils.raises(exceptions.OptionsError, tctx.configure, s, server_replay = [p])
|
tutils.raises(
|
||||||
|
exceptions.OptionsError,
|
||||||
|
tctx.configure,
|
||||||
|
s,
|
||||||
|
server_replay=[p]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_tick():
|
def test_tick():
|
||||||
@ -246,7 +253,7 @@ def test_ignore_params():
|
|||||||
assert not s._hash(r) == s._hash(r2)
|
assert not s._hash(r) == s._hash(r2)
|
||||||
|
|
||||||
|
|
||||||
def test_ignore_payload_params():
|
def thash(r, r2, setter):
|
||||||
s = serverplayback.ServerPlayback()
|
s = serverplayback.ServerPlayback()
|
||||||
s.configure(
|
s.configure(
|
||||||
options.Options(
|
options.Options(
|
||||||
@ -255,31 +262,59 @@ def test_ignore_payload_params():
|
|||||||
[]
|
[]
|
||||||
)
|
)
|
||||||
|
|
||||||
r = tflow.tflow(resp=True)
|
setter(r, paramx="x", param1="1")
|
||||||
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
|
||||||
r.request.content = b"paramx=x¶m1=1"
|
setter(r2, paramx="x", param1="1")
|
||||||
r2 = tflow.tflow(resp=True)
|
|
||||||
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
|
||||||
r2.request.content = b"paramx=x¶m1=1"
|
|
||||||
# same parameters
|
# same parameters
|
||||||
assert s._hash(r) == s._hash(r2)
|
assert s._hash(r) == s._hash(r2)
|
||||||
# ignored parameters !=
|
# ignored parameters !=
|
||||||
r2.request.content = b"paramx=x¶m1=2"
|
setter(r2, paramx="x", param1="2")
|
||||||
assert s._hash(r) == s._hash(r2)
|
assert s._hash(r) == s._hash(r2)
|
||||||
# missing parameter
|
# missing parameter
|
||||||
r2.request.content = b"paramx=x"
|
setter(r2, paramx="x")
|
||||||
assert s._hash(r) == s._hash(r2)
|
assert s._hash(r) == s._hash(r2)
|
||||||
# ignorable parameter added
|
# ignorable parameter added
|
||||||
r2.request.content = b"paramx=x¶m1=2"
|
setter(r2, paramx="x", param1="2")
|
||||||
assert s._hash(r) == s._hash(r2)
|
assert s._hash(r) == s._hash(r2)
|
||||||
# not ignorable parameter changed
|
# not ignorable parameter changed
|
||||||
r2.request.content = b"paramx=y¶m1=1"
|
setter(r2, paramx="y", param1="1")
|
||||||
assert not s._hash(r) == s._hash(r2)
|
assert not s._hash(r) == s._hash(r2)
|
||||||
# not ignorable parameter missing
|
# not ignorable parameter missing
|
||||||
|
setter(r2, param1="1")
|
||||||
r2.request.content = b"param1=1"
|
r2.request.content = b"param1=1"
|
||||||
assert not s._hash(r) == s._hash(r2)
|
assert not s._hash(r) == s._hash(r2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ignore_payload_params():
|
||||||
|
def urlencode_setter(r, **kwargs):
|
||||||
|
r.request.content = urllib.parse.urlencode(kwargs).encode()
|
||||||
|
|
||||||
|
r = tflow.tflow(resp=True)
|
||||||
|
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||||
|
r2 = tflow.tflow(resp=True)
|
||||||
|
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||||
|
thash(r, r2, urlencode_setter)
|
||||||
|
|
||||||
|
boundary = 'somefancyboundary'
|
||||||
|
|
||||||
|
def multipart_setter(r, **kwargs):
|
||||||
|
b = "--{0}\n".format(boundary)
|
||||||
|
parts = []
|
||||||
|
for k, v in kwargs.items():
|
||||||
|
parts.append(
|
||||||
|
"Content-Disposition: form-data; name=\"%s\"\n\n"
|
||||||
|
"%s\n" % (k, v)
|
||||||
|
)
|
||||||
|
c = b + b.join(parts) + b
|
||||||
|
r.request.content = c.encode()
|
||||||
|
r.request.headers["content-type"] = 'multipart/form-data; boundary=' +\
|
||||||
|
boundary
|
||||||
|
|
||||||
|
r = tflow.tflow(resp=True)
|
||||||
|
r2 = tflow.tflow(resp=True)
|
||||||
|
thash(r, r2, multipart_setter)
|
||||||
|
|
||||||
|
|
||||||
def test_server_playback_full():
|
def test_server_playback_full():
|
||||||
s = serverplayback.ServerPlayback()
|
s = serverplayback.ServerPlayback()
|
||||||
with taddons.context() as tctx:
|
with taddons.context() as tctx:
|
||||||
|
@ -10,7 +10,15 @@ def test_configure():
|
|||||||
r = stickyauth.StickyAuth()
|
r = stickyauth.StickyAuth()
|
||||||
with taddons.context() as tctx:
|
with taddons.context() as tctx:
|
||||||
tctx.configure(r, stickyauth="~s")
|
tctx.configure(r, stickyauth="~s")
|
||||||
tutils.raises(exceptions.OptionsError, tctx.configure, r, stickyauth="~~")
|
tutils.raises(
|
||||||
|
exceptions.OptionsError,
|
||||||
|
tctx.configure,
|
||||||
|
r,
|
||||||
|
stickyauth="~~"
|
||||||
|
)
|
||||||
|
|
||||||
|
tctx.configure(r, stickyauth=None)
|
||||||
|
assert not r.flt
|
||||||
|
|
||||||
|
|
||||||
def test_simple():
|
def test_simple():
|
||||||
|
@ -3,7 +3,6 @@ from mitmproxy.test import tutils
|
|||||||
from mitmproxy.test import taddons
|
from mitmproxy.test import taddons
|
||||||
|
|
||||||
from mitmproxy.addons import stickycookie
|
from mitmproxy.addons import stickycookie
|
||||||
from mitmproxy import options
|
|
||||||
from mitmproxy.test import tutils as ntutils
|
from mitmproxy.test import tutils as ntutils
|
||||||
|
|
||||||
|
|
||||||
@ -15,11 +14,15 @@ def test_domain_match():
|
|||||||
class TestStickyCookie:
|
class TestStickyCookie:
|
||||||
def test_config(self):
|
def test_config(self):
|
||||||
sc = stickycookie.StickyCookie()
|
sc = stickycookie.StickyCookie()
|
||||||
o = options.Options(stickycookie = "~b")
|
with taddons.context() as tctx:
|
||||||
tutils.raises(
|
tutils.raises(
|
||||||
"invalid filter",
|
"invalid filter", tctx.configure, sc, stickycookie="~b"
|
||||||
sc.configure, o, o.keys()
|
)
|
||||||
)
|
|
||||||
|
tctx.configure(sc, stickycookie="foo")
|
||||||
|
assert sc.flt
|
||||||
|
tctx.configure(sc, stickycookie=None)
|
||||||
|
assert not sc.flt
|
||||||
|
|
||||||
def test_simple(self):
|
def test_simple(self):
|
||||||
sc = stickycookie.StickyCookie()
|
sc = stickycookie.StickyCookie()
|
||||||
|
@ -22,6 +22,10 @@ def test_configure():
|
|||||||
"invalid filter",
|
"invalid filter",
|
||||||
tctx.configure, sa, streamfile=p, filtstr="~~"
|
tctx.configure, sa, streamfile=p, filtstr="~~"
|
||||||
)
|
)
|
||||||
|
tctx.configure(sa, filtstr="foo")
|
||||||
|
assert sa.filt
|
||||||
|
tctx.configure(sa, filtstr=None)
|
||||||
|
assert not sa.filt
|
||||||
|
|
||||||
|
|
||||||
def rd(p):
|
def rd(p):
|
||||||
|
@ -73,12 +73,15 @@ def test_simple():
|
|||||||
assert v.store_count() == 0
|
assert v.store_count() == 0
|
||||||
v.request(f)
|
v.request(f)
|
||||||
assert list(v) == [f]
|
assert list(v) == [f]
|
||||||
|
assert v.get_by_id(f.id)
|
||||||
|
assert not v.get_by_id("nonexistent")
|
||||||
|
|
||||||
# These all just call udpate
|
# These all just call udpate
|
||||||
v.error(f)
|
v.error(f)
|
||||||
v.response(f)
|
v.response(f)
|
||||||
v.intercept(f)
|
v.intercept(f)
|
||||||
v.resume(f)
|
v.resume(f)
|
||||||
|
v.kill(f)
|
||||||
assert list(v) == [f]
|
assert list(v) == [f]
|
||||||
|
|
||||||
v.request(f)
|
v.request(f)
|
||||||
|
Loading…
Reference in New Issue
Block a user