intercept: don't fail on requests taken by other addons

This commit is contained in:
Maximilian Hils 2020-08-27 11:07:04 +02:00
parent 0203202b89
commit 28f04c93fd

View File

@ -1,20 +1,18 @@
import typing import typing
from mitmproxy import flowfilter from mitmproxy import flow, flowfilter
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import ctx from mitmproxy import ctx
class Intercept: class Intercept:
def __init__(self): filt: typing.Optional[flowfilter.TFilter] = None
self.filt = None
def load(self, loader): def load(self, loader):
loader.add_option( loader.add_option(
"intercept_active", bool, False, "intercept_active", bool, False,
"Intercept toggle" "Intercept toggle"
) )
loader.add_option( loader.add_option(
"intercept", typing.Optional[str], None, "intercept", typing.Optional[str], None,
"Intercept filter expression." "Intercept filter expression."
@ -22,24 +20,27 @@ class Intercept:
def configure(self, updated): def configure(self, updated):
if "intercept" in updated: if "intercept" in updated:
if not ctx.options.intercept: if ctx.options.intercept:
self.filt = None
ctx.options.intercept_active = False
return
self.filt = flowfilter.parse(ctx.options.intercept) self.filt = flowfilter.parse(ctx.options.intercept)
if not self.filt: if not self.filt:
raise exceptions.OptionsError( raise exceptions.OptionsError(f"Invalid interception filter: {ctx.options.intercept}")
"Invalid interception filter: %s" % ctx.options.intercept
)
ctx.options.intercept_active = True ctx.options.intercept_active = True
else:
self.filt = None
ctx.options.intercept_active = False
def process_flow(self, f): def should_intercept(self, f: flow.Flow) -> bool:
if self.filt: return (
should_intercept = all([ ctx.options.intercept_active
self.filt(f), and self.filt
not f.is_replay == "request", and self.filt(f)
]) and not f.is_replay
if should_intercept and ctx.options.intercept_active: )
def process_flow(self, f: flow.Flow) -> None:
if self.should_intercept(f):
if f.reply.state != "start":
return ctx.log.debug("Cannot intercept request that is already taken by another addon.")
f.intercept() f.intercept()
# Handlers # Handlers
@ -51,5 +52,4 @@ class Intercept:
self.process_flow(f) self.process_flow(f)
def tcp_message(self, f): def tcp_message(self, f):
if self.filt and ctx.options.intercept_active: self.process_flow(f)
f.intercept()