add @concurrent decorator for inline scripts, fixes #176

This commit is contained in:
Maximilian Hils 2013-12-15 02:43:16 +01:00
parent 224cd41dc2
commit 605950bfdf
6 changed files with 128 additions and 9 deletions

View File

@ -123,6 +123,13 @@ using pydoc (which is installed with Python by default), like this:
</pre> </pre>
## Running scripts in parallel
We have a single flow primitive, so when a script is handling something, other requests block.
While that's a very desirable behaviour under some circumstances, scripts can be run threaded by using the <code>libmproxy.script.concurrent</code> decorator.
$!example("examples/nonblocking.py")!$
## Running scripts on saved flows ## Running scripts on saved flows
Sometimes, we want to run a script on __Flow__ objects that are already Sometimes, we want to run a script on __Flow__ objects that are already

8
examples/nonblocking.py Normal file
View File

@ -0,0 +1,8 @@
import time
from libmproxy.script import concurrent
@concurrent
def request(context, flow):
print "handle request: %s%s" % (flow.request.host, flow.request.path)
time.sleep(5)
print "start request: %s%s" % (flow.request.host, flow.request.path)

View File

@ -1,4 +1,5 @@
import os, traceback import os, traceback, threading
import controller
class ScriptError(Exception): class ScriptError(Exception):
pass pass
@ -59,3 +60,26 @@ class Script:
return (False, (v, traceback.format_exc(v))) return (False, (v, traceback.format_exc(v)))
else: else:
return (False, None) return (False, None)
def _handle_concurrent_reply(fn, o, args=[], kwargs={}):
reply = o.reply
o.reply = controller.DummyReply()
def run():
fn(*args, **kwargs)
reply(o)
threading.Thread(target=run).start()
def concurrent(fn):
if fn.func_name in ["request", "response", "error"]:
def _concurrent(ctx, flow):
r = getattr(flow, fn.func_name)
_handle_concurrent_reply(fn, r, [ctx, flow])
return _concurrent
elif fn.func_name in ["clientconnect", "clientdisconnect", "serverconnect"]:
def _concurrent(ctx, conn):
_handle_concurrent_reply(fn, conn, [ctx, conn])
return _concurrent
raise NotImplementedError("Concurrent decorator not supported for this method.")

View File

@ -0,0 +1,31 @@
import time
from libmproxy.script import concurrent
@concurrent
def request(context, flow):
time.sleep(0.1)
@concurrent
def response(context, flow):
context.log("response")
@concurrent
def error(context, err):
context.log("error")
@concurrent
def clientconnect(context, cc):
context.log("clientconnect")
@concurrent
def clientdisconnect(context, dc):
context.log("clientdisconnect")
@concurrent
def serverconnect(context, sc):
context.log("serverconnect")

View File

@ -2,6 +2,19 @@ from libmproxy import script, flow
import tutils import tutils
import shlex import shlex
import os import os
import time
class TCounter:
count = 0
def __call__(self, *args, **kwargs):
self.count += 1
class TScriptContext(TCounter):
def log(self, msg):
self.__call__()
class TestScript: class TestScript:
def test_simple(self): def test_simple(self):
@ -64,3 +77,36 @@ class TestScript:
s.load s.load
) )
def test_concurrent(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_script([tutils.test_data.path("scripts/concurrent_decorator.py")])
reply = TCounter()
r1, r2 = tutils.treq(), tutils.treq()
r1.reply, r2.reply = reply, reply
t_start = time.time()
fm.handle_request(r1)
r1.reply()
fm.handle_request(r2)
r2.reply()
assert reply.count < 2
assert (time.time() - t_start) < 0.09
time.sleep(0.2)
assert reply.count == 2
def test_concurrent2(self):
ctx = TScriptContext()
s = script.Script(["scripts/concurrent_decorator.py"], ctx)
s.load()
f = tutils.tflow_full()
f.error = tutils.terr(f.request)
f.reply = f.request.reply
print s.run("response", f)
print s.run("error", f)
print s.run("clientconnect", f)
print s.run("clientdisconnect", f)
print s.run("serverconnect", f)
time.sleep(0.1)
assert ctx.count == 5

View File

@ -33,6 +33,13 @@ def tresp(req=None):
resp.reply = controller.DummyReply() resp.reply = controller.DummyReply()
return resp return resp
def terr(req=None):
if not req:
req = treq()
err = flow.Error(req, "error")
err.reply = controller.DummyReply()
return err
def tflow(): def tflow():
r = treq() r = treq()
@ -40,21 +47,17 @@ def tflow():
def tflow_full(): def tflow_full():
r = treq() f = tflow()
f = flow.Flow(r) f.response = tresp(f.request)
f.response = tresp(r)
return f return f
def tflow_err(): def tflow_err():
r = treq() f = tflow()
f = flow.Flow(r) f.error = terr(f.request)
f.error = flow.Error(r, "error")
f.error.reply = controller.DummyReply()
return f return f
@contextmanager @contextmanager
def tmpdir(*args, **kwargs): def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd() orig_workdir = os.getcwd()