Merge pull request #4177 from mhils/intercept-taken

Intercept: don't fail on requests taken by other addons
This commit is contained in:
Maximilian Hils 2020-08-27 12:09:50 +02:00 committed by GitHub
commit 72dd567994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 25 deletions

View File

@ -1,20 +1,18 @@
import typing import typing
from mitmproxy import flowfilter from mitmproxy import flow, flowfilter
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import ctx from mitmproxy import ctx
class Intercept: class Intercept:
def __init__(self): filt: typing.Optional[flowfilter.TFilter] = None
self.filt = None
def load(self, loader): def load(self, loader):
loader.add_option( loader.add_option(
"intercept_active", bool, False, "intercept_active", bool, False,
"Intercept toggle" "Intercept toggle"
) )
loader.add_option( loader.add_option(
"intercept", typing.Optional[str], None, "intercept", typing.Optional[str], None,
"Intercept filter expression." "Intercept filter expression."
@ -22,24 +20,28 @@ class Intercept:
def configure(self, updated): def configure(self, updated):
if "intercept" in updated: if "intercept" in updated:
if not ctx.options.intercept: if ctx.options.intercept:
self.filt = None
ctx.options.intercept_active = False
return
self.filt = flowfilter.parse(ctx.options.intercept) self.filt = flowfilter.parse(ctx.options.intercept)
if not self.filt: if not self.filt:
raise exceptions.OptionsError( raise exceptions.OptionsError(f"Invalid interception filter: {ctx.options.intercept}")
"Invalid interception filter: %s" % ctx.options.intercept
)
ctx.options.intercept_active = True ctx.options.intercept_active = True
else:
self.filt = None
ctx.options.intercept_active = False
def process_flow(self, f): def should_intercept(self, f: flow.Flow) -> bool:
if self.filt: return bool(
should_intercept = all([ ctx.options.intercept_active
self.filt(f), and self.filt
not f.is_replay == "request", and self.filt(f)
]) and not f.is_replay
if should_intercept and ctx.options.intercept_active: )
def process_flow(self, f: flow.Flow) -> None:
if self.should_intercept(f):
assert f.reply
if f.reply.state != "start":
return ctx.log.debug("Cannot intercept request that is already taken by another addon.")
f.intercept() f.intercept()
# Handlers # Handlers
@ -51,5 +53,4 @@ class Intercept:
self.process_flow(f) self.process_flow(f)
def tcp_message(self, f): def tcp_message(self, f):
if self.filt and ctx.options.intercept_active: self.process_flow(f)
f.intercept()

View File

@ -43,12 +43,31 @@ def test_simple():
tctx.cycle(r, f) tctx.cycle(r, f)
assert f.intercepted assert f.intercepted
def test_tcp():
r = intercept.Intercept()
with taddons.context(r) as tctx:
tctx.configure(r, intercept="~tcp")
f = tflow.ttcpflow()
tctx.cycle(r, f)
assert f.intercepted
tctx.configure(r, intercept_active=False) tctx.configure(r, intercept_active=False)
f = tflow.ttcpflow() f = tflow.ttcpflow()
tctx.cycle(r, f) tctx.cycle(r, f)
assert not f.intercepted assert not f.intercepted
tctx.configure(r, intercept_active=True)
f = tflow.ttcpflow() def test_already_taken():
tctx.cycle(r, f) r = intercept.Intercept()
with taddons.context(r) as tctx:
tctx.configure(r, intercept="~q")
f = tflow.tflow()
tctx.invoke(r, "request", f)
assert f.intercepted assert f.intercepted
f = tflow.tflow()
f.reply.take()
tctx.invoke(r, "request", f)
assert not f.intercepted