From 605950bfdf221854b2b0fc4b875777891d90f34b Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sun, 15 Dec 2013 02:43:16 +0100 Subject: [PATCH] add @concurrent decorator for inline scripts, fixes #176 --- doc-src/scripting/inlinescripts.html | 7 +++++ examples/nonblocking.py | 8 +++++ libmproxy/script.py | 26 +++++++++++++++- test/scripts/concurrent_decorator.py | 31 +++++++++++++++++++ test/test_script.py | 46 ++++++++++++++++++++++++++++ test/tutils.py | 19 +++++++----- 6 files changed, 128 insertions(+), 9 deletions(-) create mode 100644 examples/nonblocking.py create mode 100644 test/scripts/concurrent_decorator.py diff --git a/doc-src/scripting/inlinescripts.html b/doc-src/scripting/inlinescripts.html index c9e188fcb..19ae89a1e 100644 --- a/doc-src/scripting/inlinescripts.html +++ b/doc-src/scripting/inlinescripts.html @@ -123,6 +123,13 @@ using pydoc (which is installed with Python by default), like this: +## Running scripts in parallel + +We have a single flow primitive, so when a script is handling something, other requests block. +While that's a very desirable behaviour under some circumstances, scripts can be run threaded by using the libmproxy.script.concurrent decorator. + +$!example("examples/nonblocking.py")!$ + ## Running scripts on saved flows Sometimes, we want to run a script on __Flow__ objects that are already diff --git a/examples/nonblocking.py b/examples/nonblocking.py new file mode 100644 index 000000000..ed8cc7c95 --- /dev/null +++ b/examples/nonblocking.py @@ -0,0 +1,8 @@ +import time +from libmproxy.script import concurrent + +@concurrent +def request(context, flow): + print "handle request: %s%s" % (flow.request.host, flow.request.path) + time.sleep(5) + print "start request: %s%s" % (flow.request.host, flow.request.path) \ No newline at end of file diff --git a/libmproxy/script.py b/libmproxy/script.py index 623f2b928..d1b714dbb 100644 --- a/libmproxy/script.py +++ b/libmproxy/script.py @@ -1,4 +1,5 @@ -import os, traceback +import os, traceback, threading +import controller class ScriptError(Exception): pass @@ -59,3 +60,26 @@ class Script: return (False, (v, traceback.format_exc(v))) else: return (False, None) + + +def _handle_concurrent_reply(fn, o, args=[], kwargs={}): + reply = o.reply + o.reply = controller.DummyReply() + + def run(): + fn(*args, **kwargs) + reply(o) + threading.Thread(target=run).start() + + +def concurrent(fn): + if fn.func_name in ["request", "response", "error"]: + def _concurrent(ctx, flow): + r = getattr(flow, fn.func_name) + _handle_concurrent_reply(fn, r, [ctx, flow]) + return _concurrent + elif fn.func_name in ["clientconnect", "clientdisconnect", "serverconnect"]: + def _concurrent(ctx, conn): + _handle_concurrent_reply(fn, conn, [ctx, conn]) + return _concurrent + raise NotImplementedError("Concurrent decorator not supported for this method.") \ No newline at end of file diff --git a/test/scripts/concurrent_decorator.py b/test/scripts/concurrent_decorator.py new file mode 100644 index 000000000..c1c2651e8 --- /dev/null +++ b/test/scripts/concurrent_decorator.py @@ -0,0 +1,31 @@ +import time +from libmproxy.script import concurrent + +@concurrent +def request(context, flow): + time.sleep(0.1) + + +@concurrent +def response(context, flow): + context.log("response") + + +@concurrent +def error(context, err): + context.log("error") + + +@concurrent +def clientconnect(context, cc): + context.log("clientconnect") + + +@concurrent +def clientdisconnect(context, dc): + context.log("clientdisconnect") + + +@concurrent +def serverconnect(context, sc): + context.log("serverconnect") \ No newline at end of file diff --git a/test/test_script.py b/test/test_script.py index 9033c4fca..296ec828b 100644 --- a/test/test_script.py +++ b/test/test_script.py @@ -2,6 +2,19 @@ from libmproxy import script, flow import tutils import shlex import os +import time + + +class TCounter: + count = 0 + + def __call__(self, *args, **kwargs): + self.count += 1 + + +class TScriptContext(TCounter): + def log(self, msg): + self.__call__() class TestScript: def test_simple(self): @@ -64,3 +77,36 @@ class TestScript: s.load ) + def test_concurrent(self): + s = flow.State() + fm = flow.FlowMaster(None, s) + fm.load_script([tutils.test_data.path("scripts/concurrent_decorator.py")]) + + reply = TCounter() + r1, r2 = tutils.treq(), tutils.treq() + r1.reply, r2.reply = reply, reply + t_start = time.time() + fm.handle_request(r1) + r1.reply() + fm.handle_request(r2) + r2.reply() + assert reply.count < 2 + assert (time.time() - t_start) < 0.09 + time.sleep(0.2) + assert reply.count == 2 + + def test_concurrent2(self): + ctx = TScriptContext() + s = script.Script(["scripts/concurrent_decorator.py"], ctx) + s.load() + f = tutils.tflow_full() + f.error = tutils.terr(f.request) + f.reply = f.request.reply + + print s.run("response", f) + print s.run("error", f) + print s.run("clientconnect", f) + print s.run("clientdisconnect", f) + print s.run("serverconnect", f) + time.sleep(0.1) + assert ctx.count == 5 \ No newline at end of file diff --git a/test/tutils.py b/test/tutils.py index e42256edf..4cd7b7f81 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -33,6 +33,13 @@ def tresp(req=None): resp.reply = controller.DummyReply() return resp +def terr(req=None): + if not req: + req = treq() + err = flow.Error(req, "error") + err.reply = controller.DummyReply() + return err + def tflow(): r = treq() @@ -40,21 +47,17 @@ def tflow(): def tflow_full(): - r = treq() - f = flow.Flow(r) - f.response = tresp(r) + f = tflow() + f.response = tresp(f.request) return f def tflow_err(): - r = treq() - f = flow.Flow(r) - f.error = flow.Error(r, "error") - f.error.reply = controller.DummyReply() + f = tflow() + f.error = terr(f.request) return f - @contextmanager def tmpdir(*args, **kwargs): orig_workdir = os.getcwd()