Add utility functions to search and replace strings in flows

This is a common task in pentesting scenarios. This commit adds the following
functions:

utils.Headers.replace
proxy.Request.replace
proxy.Response.replace
flow.Flow.replace
This commit is contained in:
Aldo Cortesi 2011-07-22 17:48:42 +12:00
parent 9c24401b18
commit 1b961fc4ad
6 changed files with 102 additions and 1 deletions

View File

@ -323,6 +323,18 @@ class Flow:
self.response.ack()
self.intercepting = False
def replace(self, pattern, repl, count=0, flags=0):
"""
Replaces a regular expression pattern with repl in all parts of the
flow . Returns the number of replacements made.
"""
c = self.request.replace(pattern, repl, count, flags)
if self.response:
c += self.response.replace(pattern, repl, count, flags)
if self.error:
c += self.error.replace(pattern, repl, count, flags)
return c
class State:
def __init__(self):

View File

@ -280,6 +280,16 @@ class Request(controller.Msg):
else:
return self.FMT_PROXY % (self.method, self.scheme, self.host, self.port, self.path, str(headers), content)
def replace(self, pattern, repl, count=0, flags=0):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, count, flags)
c += self.headers.replace(pattern, repl, count, flags)
return c
class Response(controller.Msg):
FMT = '%s\r\n%s\r\n%s'
@ -406,6 +416,16 @@ class Response(controller.Msg):
data = (proto, str(headers), content)
return self.FMT%data
def replace(self, pattern, repl, count=0, flags=0):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the response. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, count, flags)
c += self.headers.replace(pattern, repl, count, flags)
return c
class ClientDisconnect(controller.Msg):
def __init__(self, client_conn):
@ -473,6 +493,15 @@ class Error(controller.Msg):
def __eq__(self, other):
return self.get_state() == other.get_state()
def replace(self, pattern, repl, count=0, flags=0):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.msg, c = re.subn(pattern, repl, self.msg, count, flags)
return c
class FileLike:
def __init__(self, o):

View File

@ -279,6 +279,21 @@ class Headers:
ret.append([name, value])
self.lst = ret
def replace(self, pattern, repl, count=0, flags=0):
"""
Replaces a regular expression pattern with repl in both header keys
and values. Returns the number of replacements made.
"""
nlst, count = [], 0
for i in self.lst:
k, c = re.subn(pattern, repl, i[0], count, flags)
count += c
v, c = re.subn(pattern, repl, i[1], count, flags)
count += c
nlst.append([k, v])
self.lst = nlst
return count
def pretty_size(size):
suffixes = [

View File

@ -219,6 +219,21 @@ class uFlow(libpry.AutoTree):
f = flow.Flow(None)
f.request = tutils.treq()
def test_replace(self):
f = tutils.tflow_full()
f.request.headers["foo"] = ["foo"]
f.request.content = "afoob"
f.response.headers["foo"] = ["foo"]
f.response.content = "afoob"
assert f.replace("foo", "bar") == 6
assert f.request.headers["bar"] == ["bar"]
assert f.request.content == "abarb"
assert f.response.headers["bar"] == ["bar"]
assert f.response.content == "abarb"
class uState(libpry.AutoTree):
def test_backup(self):

View File

@ -1,4 +1,4 @@
import cStringIO, time
import cStringIO, time, re
import libpry
from libmproxy import proxy, controller, utils, dump, script
import email.utils
@ -128,6 +128,14 @@ class uRequest(libpry.AutoTree):
r.load_state(r2.get_state())
assert not r.client_conn
def test_replace(self):
r = tutils.treq()
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo", "boo", flags=re.I) == 3
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
class uResponse(libpry.AutoTree):
def test_simple(self):
@ -185,6 +193,14 @@ class uResponse(libpry.AutoTree):
resp.load_state(resp2.get_state())
assert resp == resp2
def test_replace(self):
r = tutils.tresp()
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo", "boo", flags=re.I) == 3
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
class uError(libpry.AutoTree):
def test_getset_state(self):
@ -203,6 +219,11 @@ class uError(libpry.AutoTree):
e3 = e.copy()
assert e3 == e
def test_replace(self):
e = proxy.Error(None, "amoop")
e.replace("moo", "bar")
assert e.msg == "abarp"
class uProxyError(libpry.AutoTree):
def test_simple(self):

View File

@ -137,6 +137,15 @@ class uHeaders(libpry.AutoTree):
del self.hd["foo"]
assert len(self.hd.lst) == 1
def test_replace(self):
self.hd.add("one", "two")
self.hd.add("two", "one")
assert self.hd.replace("one", "vun") == 2
assert self.hd.lst == [
["vun", "two"],
["two", "vun"],
]
class uisStringLike(libpry.AutoTree):
def test_all(self):