mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
Use ~bq / ~bs for messages sent from client / server
This commit is contained in:
parent
5f7d61f864
commit
262a420553
@ -219,18 +219,14 @@ class FBod(_Rex):
|
|||||||
|
|
||||||
@only(HTTPFlow, TCPFlow)
|
@only(HTTPFlow, TCPFlow)
|
||||||
def __call__(self, f):
|
def __call__(self, f):
|
||||||
|
if isinstance(f, HTTPFlow):
|
||||||
# HTTPFlow
|
|
||||||
if hasattr(f, 'request'):
|
|
||||||
if f.request and f.request.content:
|
if f.request and f.request.content:
|
||||||
if self.re.search(f.request.get_decoded_content()):
|
if self.re.search(f.request.get_decoded_content()):
|
||||||
return True
|
return True
|
||||||
if f.response and f.response.content:
|
if f.response and f.response.content:
|
||||||
if self.re.search(f.response.get_decoded_content()):
|
if self.re.search(f.response.get_decoded_content()):
|
||||||
return True
|
return True
|
||||||
|
elif isinstance(f, TCPFlow):
|
||||||
# TCPFlow
|
|
||||||
elif hasattr(f, 'messages'):
|
|
||||||
for msg in f.messages:
|
for msg in f.messages:
|
||||||
if self.re.search(msg.content):
|
if self.re.search(msg.content):
|
||||||
return True
|
return True
|
||||||
@ -242,22 +238,32 @@ class FBodRequest(_Rex):
|
|||||||
code = "bq"
|
code = "bq"
|
||||||
help = "Request body"
|
help = "Request body"
|
||||||
|
|
||||||
@only(HTTPFlow)
|
@only(HTTPFlow, TCPFlow)
|
||||||
def __call__(self, f):
|
def __call__(self, f):
|
||||||
if f.request and f.request.content:
|
if isinstance(f, HTTPFlow):
|
||||||
if self.re.search(f.request.get_decoded_content()):
|
if f.request and f.request.content:
|
||||||
return True
|
if self.re.search(f.request.get_decoded_content()):
|
||||||
|
return True
|
||||||
|
elif isinstance(f, TCPFlow):
|
||||||
|
for msg in f.messages:
|
||||||
|
if msg.from_client and self.re.search(msg.content):
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class FBodResponse(_Rex):
|
class FBodResponse(_Rex):
|
||||||
code = "bs"
|
code = "bs"
|
||||||
help = "Response body"
|
help = "Response body"
|
||||||
|
|
||||||
@only(HTTPFlow)
|
@only(HTTPFlow, TCPFlow)
|
||||||
def __call__(self, f):
|
def __call__(self, f):
|
||||||
if f.response and f.response.content:
|
if isinstance(f, HTTPFlow):
|
||||||
if self.re.search(f.response.get_decoded_content()):
|
if f.response and f.response.content:
|
||||||
return True
|
if self.re.search(f.response.get_decoded_content()):
|
||||||
|
return True
|
||||||
|
elif isinstance(f, TCPFlow):
|
||||||
|
for msg in f.messages:
|
||||||
|
if not msg.from_client and self.re.search(msg.content):
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class FMethod(_Rex):
|
class FMethod(_Rex):
|
||||||
|
@ -264,15 +264,21 @@ class TestMatchingTCPFlow:
|
|||||||
|
|
||||||
def test_body(self):
|
def test_body(self):
|
||||||
f = self.flow()
|
f = self.flow()
|
||||||
assert not self.q("~b nonexistent", f)
|
|
||||||
|
# Messages sent by client or server
|
||||||
assert self.q("~b hello", f)
|
assert self.q("~b hello", f)
|
||||||
assert self.q("~b me", f)
|
assert self.q("~b me", f)
|
||||||
|
assert not self.q("~b nonexistent", f)
|
||||||
|
|
||||||
# Request Body
|
# Messages sent by client
|
||||||
assert not self.q("~bq whatever", f)
|
assert self.q("~bq hello", f)
|
||||||
|
assert not self.q("~bq me", f)
|
||||||
|
assert not self.q("~bq nonexistent", f)
|
||||||
|
|
||||||
# Response Body
|
# Messages sent by server
|
||||||
assert not self.q("~bs whatever", f)
|
assert self.q("~bs me", f)
|
||||||
|
assert not self.q("~bs hello", f)
|
||||||
|
assert not self.q("~bs nonexistent", f)
|
||||||
|
|
||||||
def test_src(self):
|
def test_src(self):
|
||||||
f = self.flow()
|
f = self.flow()
|
||||||
|
Loading…
Reference in New Issue
Block a user