mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
make load_script pure
This commit is contained in:
parent
9ffd42edea
commit
3b8e3e4aa9
@ -3,6 +3,7 @@ import importlib.util
|
||||
import importlib.machinery
|
||||
import time
|
||||
import sys
|
||||
import types
|
||||
import typing
|
||||
|
||||
from mitmproxy import addonmanager
|
||||
@ -13,27 +14,22 @@ from mitmproxy import eventsequence
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
def load_script(actx, path):
|
||||
if not os.path.exists(path):
|
||||
ctx.log.info("No such file: %s" % path)
|
||||
return
|
||||
|
||||
def load_script(path: str) -> types.ModuleType:
|
||||
fullname = "__mitmproxy_script__.{}".format(
|
||||
os.path.splitext(os.path.basename(path))[0]
|
||||
)
|
||||
# the fullname is not unique among scripts, so if there already is an existing script with said
|
||||
# fullname, remove it.
|
||||
sys.modules.pop(fullname, None)
|
||||
try:
|
||||
oldpath = sys.path
|
||||
sys.path.insert(0, os.path.dirname(path))
|
||||
with addonmanager.safecall():
|
||||
try:
|
||||
loader = importlib.machinery.SourceFileLoader(fullname, path)
|
||||
spec = importlib.util.spec_from_loader(fullname, loader=loader)
|
||||
m = importlib.util.module_from_spec(spec)
|
||||
loader.exec_module(m)
|
||||
if not getattr(m, "name", None):
|
||||
m.name = path
|
||||
m.name = path # type: ignore
|
||||
return m
|
||||
finally:
|
||||
sys.path[:] = oldpath
|
||||
@ -65,7 +61,7 @@ class Script:
|
||||
try:
|
||||
mtime = os.stat(self.fullpath).st_mtime
|
||||
except FileNotFoundError:
|
||||
scripts = ctx.options.scripts
|
||||
scripts = list(ctx.options.scripts)
|
||||
scripts.remove(self.path)
|
||||
ctx.options.update(scripts=scripts)
|
||||
return
|
||||
@ -74,7 +70,9 @@ class Script:
|
||||
ctx.log.info("Loading script: %s" % self.path)
|
||||
if self.ns:
|
||||
ctx.master.addons.remove(self.ns)
|
||||
self.ns = load_script(ctx, self.fullpath)
|
||||
self.ns = None
|
||||
with addonmanager.safecall():
|
||||
self.ns = load_script(self.fullpath)
|
||||
if self.ns:
|
||||
# We're already running, so we have to explicitly register and
|
||||
# configure the addon
|
||||
|
@ -1,6 +1,5 @@
|
||||
import traceback
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import pytest
|
||||
|
||||
@ -14,20 +13,17 @@ from mitmproxy.addons import script
|
||||
|
||||
|
||||
def test_load_script():
|
||||
with taddons.context() as tctx:
|
||||
ns = script.load_script(
|
||||
tctx.ctx(),
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/recorder/recorder.py"
|
||||
)
|
||||
)
|
||||
assert ns.addons
|
||||
|
||||
ns = script.load_script(
|
||||
tctx.ctx(),
|
||||
with pytest.raises(FileNotFoundError):
|
||||
script.load_script(
|
||||
"nonexistent"
|
||||
)
|
||||
assert not ns
|
||||
|
||||
|
||||
def test_load_fullname():
|
||||
@ -36,16 +32,13 @@ def test_load_fullname():
|
||||
This only succeeds if they get assigned different basenames.
|
||||
|
||||
"""
|
||||
with taddons.context() as tctx:
|
||||
ns = script.load_script(
|
||||
tctx.ctx(),
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/addon.py"
|
||||
)
|
||||
)
|
||||
assert ns.addons
|
||||
ns2 = script.load_script(
|
||||
tctx.ctx(),
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/same_filename/addon.py"
|
||||
)
|
||||
@ -59,7 +52,6 @@ def test_script_print_stdout():
|
||||
with mock.patch('mitmproxy.ctx.log.warn') as mock_warn:
|
||||
with addonmanager.safecall():
|
||||
ns = script.load_script(
|
||||
tctx.ctx(),
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/print.py"
|
||||
)
|
||||
@ -103,11 +95,13 @@ class TestScript:
|
||||
sc = script.Script(str(f))
|
||||
tctx.configure(sc)
|
||||
sc.tick()
|
||||
for _ in range(3):
|
||||
assert tctx.master.has_log("Loading")
|
||||
tctx.master.clear()
|
||||
assert not tctx.master.has_log("Loading")
|
||||
|
||||
sc.last_load, sc.last_mtime = 0, 0
|
||||
sc.tick()
|
||||
time.sleep(0.1)
|
||||
tctx.master.has_log("Loading")
|
||||
assert tctx.master.has_log("Loading")
|
||||
|
||||
def test_exception(self):
|
||||
with taddons.context() as tctx:
|
||||
@ -121,8 +115,8 @@ class TestScript:
|
||||
f = tflow.tflow(resp=True)
|
||||
tctx.master.addons.trigger("request", f)
|
||||
|
||||
tctx.master.has_log("ValueError: Error!")
|
||||
tctx.master.has_log("error.py")
|
||||
assert tctx.master.has_log("ValueError: Error!")
|
||||
assert tctx.master.has_log("error.py")
|
||||
|
||||
def test_addon(self):
|
||||
with taddons.context() as tctx:
|
||||
|
@ -115,7 +115,7 @@ def test_simple():
|
||||
a.add(TAddon("one"))
|
||||
a.trigger("done")
|
||||
a.trigger("tick")
|
||||
tctx.master.has_log("not callable")
|
||||
assert tctx.master.has_log("not callable")
|
||||
|
||||
a.remove(a.get("one"))
|
||||
assert not a.get("one")
|
||||
|
Loading…
Reference in New Issue
Block a user