mitmproxy/libmproxy/flow.py
2011-08-03 23:02:33 +12:00

1201 lines
36 KiB
Python

"""
This module provides more sophisticated flow tracking. These match requests
with their responses, and provide filtering and interception facilities.
"""
import subprocess, sys, json, hashlib, Cookie, cookielib, base64, copy, re
import time
import netstring, filt, script, utils, encoding, proxy
from email.utils import parsedate_tz, formatdate, mktime_tz
import controller, version
class RunException(Exception):
def __init__(self, msg, returncode, errout):
Exception.__init__(self, msg)
self.returncode = returncode
self.errout = errout
class Headers:
def __init__(self, lst=None):
if lst:
self.lst = lst
else:
self.lst = []
def _kconv(self, s):
return s.lower()
def __eq__(self, other):
return self.lst == other.lst
def __getitem__(self, k):
ret = []
k = self._kconv(k)
for i in self.lst:
if self._kconv(i[0]) == k:
ret.append(i[1])
return ret
def _filter_lst(self, k, lst):
new = []
for i in lst:
if self._kconv(i[0]) != k:
new.append(i)
return new
def __setitem__(self, k, hdrs):
k = self._kconv(k)
new = self._filter_lst(k, self.lst)
for i in hdrs:
new.append((k, i))
self.lst = new
def __delitem__(self, k):
self.lst = self._filter_lst(k, self.lst)
def __contains__(self, k):
for i in self.lst:
if self._kconv(i[0]) == k:
return True
return False
def add(self, key, value):
self.lst.append([key, str(value)])
def _get_state(self):
return [tuple(i) for i in self.lst]
@classmethod
def _from_state(klass, state):
return klass([list(i) for i in state])
def copy(self):
lst = copy.deepcopy(self.lst)
return Headers(lst)
def __repr__(self):
"""
Returns a string containing a formatted header string.
"""
headerElements = []
for itm in self.lst:
headerElements.append(itm[0] + ": " + itm[1])
headerElements.append("")
return "\r\n".join(headerElements)
def match_re(self, expr):
"""
Match the regular expression against each header. For each (key,
value) pair a string of the following format is matched against:
"key: value"
"""
for k, v in self.lst:
s = "%s: %s"%(k, v)
if re.search(expr, s):
return True
return False
def read(self, fp):
"""
Read a set of headers from a file pointer. Stop once a blank line
is reached.
"""
ret = []
name = ''
while 1:
line = fp.readline()
if not line or line == '\r\n' or line == '\n':
break
if line[0] in ' \t':
# continued header
ret[-1][1] = ret[-1][1] + '\r\n ' + line.strip()
else:
i = line.find(':')
# We're being liberal in what we accept, here.
if i > 0:
name = line[:i]
value = line[i+1:].strip()
ret.append([name, value])
self.lst = ret
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both header keys
and values. Returns the number of replacements made.
"""
nlst, count = [], 0
for i in self.lst:
k, c = re.subn(pattern, repl, i[0], *args, **kwargs)
count += c
v, c = re.subn(pattern, repl, i[1], *args, **kwargs)
count += c
nlst.append([k, v])
self.lst = nlst
return count
class HTTPMsg(controller.Msg):
def decode(self):
"""
Decodes content based on the current Content-Encoding header, then
removes the header. If there is no Content-Encoding header, no
action is taken.
"""
ce = self.headers["content-encoding"]
if not ce or ce[0] not in encoding.ENCODINGS:
return
self.content = encoding.decode(
ce[0],
self.content
)
del self.headers["content-encoding"]
def encode(self, e):
"""
Encodes content with the encoding e, where e is "gzip", "deflate"
or "identity".
"""
# FIXME: Error if there's an existing encoding header?
self.content = encoding.encode(e, self.content)
self.headers["content-encoding"] = [e]
class Request(HTTPMsg):
FMT = '%s %s HTTP/1.1\r\n%s\r\n%s'
FMT_PROXY = '%s %s://%s:%s%s HTTP/1.1\r\n%s\r\n%s'
def __init__(self, client_conn, host, port, scheme, method, path, headers, content, timestamp=None):
self.client_conn = client_conn
self.host, self.port, self.scheme = host, port, scheme
self.method, self.path, self.headers, self.content = method, path, headers, content
self.timestamp = timestamp or utils.timestamp()
self.close = False
controller.Msg.__init__(self)
# Have this request's cookies been modified by sticky cookies or auth?
self.stickycookie = False
self.stickyauth = False
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
del self.headers[i]
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = ["identity"]
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
if self.headers["accept-encoding"]:
self.headers["accept-encoding"] = [', '.join([
e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0]
])]
def set_replay(self):
self.client_conn = None
def is_replay(self):
if self.client_conn:
return False
else:
return True
def _load_state(self, state):
if state["client_conn"]:
if self.client_conn:
self.client_conn._load_state(state["client_conn"])
else:
self.client_conn = ClientConnect._from_state(state["client_conn"])
else:
self.client_conn = None
self.host = state["host"]
self.port = state["port"]
self.scheme = state["scheme"]
self.method = state["method"]
self.path = state["path"]
self.headers = Headers._from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def _get_state(self):
return dict(
client_conn = self.client_conn._get_state() if self.client_conn else None,
host = self.host,
port = self.port,
scheme = self.scheme,
method = self.method,
path = self.path,
headers = self.headers._get_state(),
content = base64.encodestring(self.content),
timestamp = self.timestamp,
)
@classmethod
def _from_state(klass, state):
return klass(
ClientConnect._from_state(state["client_conn"]),
str(state["host"]),
state["port"],
str(state["scheme"]),
str(state["method"]),
str(state["path"]),
Headers._from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"]
)
def __hash__(self):
return id(self)
def __eq__(self, other):
return self._get_state() == other._get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def hostport(self):
if (self.port, self.scheme) in [(80, "http"), (443, "https")]:
host = self.host
else:
host = "%s:%s"%(self.host, self.port)
return host
def url(self):
return "%s://%s%s"%(self.scheme, self.hostport(), self.path)
def set_url(self, url):
parts = utils.parse_url(url)
if not parts:
return False
self.scheme, self.host, self.port, self.path = parts
return True
def is_response(self):
return False
def _assemble(self, _proxy = False):
"""
Assembles the request for transmission to the server. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
[
'proxy-connection',
'keep-alive',
'connection',
'content-length',
'transfer-encoding'
]
)
if not 'host' in headers:
headers["host"] = [self.hostport()]
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.close:
headers["connection"] = ["close"]
if not _proxy:
return self.FMT % (self.method, self.path, str(headers), content)
else:
return self.FMT_PROXY % (self.method, self.scheme, self.host, self.port, self.path, str(headers), content)
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
self.path, pc = re.subn(pattern, repl, self.path, *args, **kwargs)
c += pc
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class Response(HTTPMsg):
FMT = '%s\r\n%s\r\n%s'
def __init__(self, request, code, msg, headers, content, timestamp=None):
self.request = request
self.code, self.msg = code, msg
self.headers, self.content = headers, content
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
self.replay = False
def _refresh_cookie(self, c, delta):
"""
Takes a cookie string c and a time delta in seconds, and returns
a refreshed cookie string.
"""
c = Cookie.SimpleCookie(str(c))
for i in c.values():
if "expires" in i:
d = parsedate_tz(i["expires"])
if d:
d = mktime_tz(d) + delta
i["expires"] = formatdate(d)
else:
# This can happen when the expires tag is invalid.
# reddit.com sends a an expires tag like this: "Thu, 31 Dec
# 2037 23:59:59 GMT", which is valid RFC 1123, but not
# strictly correct according tot he cookie spec. Browsers
# appear to parse this tolerantly - maybe we should too.
# For now, we just ignore this.
del i["expires"]
return c.output(header="").strip()
def refresh(self, now=None):
"""
This fairly complex and heuristic function refreshes a server
response for replay.
- It adjusts date, expires and last-modified headers.
- It adjusts cookie expiration.
"""
if not now:
now = time.time()
delta = now - self.timestamp
refresh_headers = [
"date",
"expires",
"last-modified",
]
for i in refresh_headers:
if i in self.headers:
d = parsedate_tz(self.headers[i][0])
if d:
new = mktime_tz(d) + delta
self.headers[i] = [formatdate(new)]
c = []
for i in self.headers["set-cookie"]:
c.append(self._refresh_cookie(i, delta))
if c:
self.headers["set-cookie"] = c
def set_replay(self):
self.replay = True
def is_replay(self):
return self.replay
def _load_state(self, state):
self.code = state["code"]
self.msg = state["msg"]
self.headers = Headers._from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def _get_state(self):
return dict(
code = self.code,
msg = self.msg,
headers = self.headers._get_state(),
timestamp = self.timestamp,
content = base64.encodestring(self.content)
)
@classmethod
def _from_state(klass, request, state):
return klass(
request,
state["code"],
str(state["msg"]),
Headers._from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"],
)
def __eq__(self, other):
return self._get_state() == other._get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def is_response(self):
return True
def _assemble(self):
"""
Assembles the response for transmission to the client. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
['proxy-connection', 'connection', 'keep-alive', 'transfer-encoding']
)
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.request.client_conn.close:
headers["connection"] = ["close"]
proto = "HTTP/1.1 %s %s"%(self.code, str(self.msg))
data = (proto, str(headers), content)
return self.FMT%data
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the response. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class ClientDisconnect(controller.Msg):
def __init__(self, client_conn):
controller.Msg.__init__(self)
self.client_conn = client_conn
class ClientConnect(controller.Msg):
def __init__(self, address):
"""
address is an (address, port) tuple, or None if this connection has
been replayed from within mitmproxy.
"""
self.address = address
self.close = False
self.requestcount = 0
self.connection_error = None
controller.Msg.__init__(self)
def __eq__(self, other):
return self._get_state() == other._get_state()
def _load_state(self, state):
self.address = state
def _get_state(self):
return list(self.address) if self.address else None
@classmethod
def _from_state(klass, state):
if state:
return klass(state)
else:
return None
def copy(self):
return copy.copy(self)
class Error(controller.Msg):
def __init__(self, request, msg, timestamp=None):
self.request, self.msg = request, msg
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
def _load_state(self, state):
self.msg = state["msg"]
self.timestamp = state["timestamp"]
def copy(self):
return copy.copy(self)
def _get_state(self):
return dict(
msg = self.msg,
timestamp = self.timestamp,
)
@classmethod
def _from_state(klass, state):
return klass(
None,
state["msg"],
state["timestamp"],
)
def __eq__(self, other):
return self._get_state() == other._get_state()
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.msg, c = re.subn(pattern, repl, self.msg, *args, **kwargs)
return c
class ClientPlaybackState:
def __init__(self, flows, exit):
self.flows, self.exit = flows, exit
self.current = None
def count(self):
return len(self.flows)
def done(self):
if len(self.flows) == 0 and not self.current:
return True
return False
def clear(self, flow):
"""
A request has returned in some way - if this is the one we're
servicing, go to the next flow.
"""
if flow is self.current:
self.current = None
def tick(self, master, testing=False):
"""
testing: Disables actual replay for testing.
"""
if self.flows and not self.current:
n = self.flows.pop(0)
n.request.client_conn = None
self.current = master.handle_request(n.request)
if not testing and not self.current.response:
#begin nocover
master.replay_request(self.current)
#end nocover
elif self.current.response:
master.handle_response(self.current.response)
class ServerPlaybackState:
def __init__(self, headers, flows, exit):
"""
headers: A case-insensitive list of request headers that should be
included in request-response matching.
"""
self.headers, self.exit = headers, exit
self.fmap = {}
for i in flows:
if i.response:
l = self.fmap.setdefault(self._hash(i), [])
l.append(i)
def count(self):
return sum([len(i) for i in self.fmap.values()])
def _hash(self, flow):
"""
Calculates a loose hash of the flow request.
"""
r = flow.request
key = [
str(r.host),
str(r.port),
str(r.scheme),
str(r.method),
str(r.path),
str(r.content),
]
if self.headers:
hdrs = []
for i in self.headers:
v = r.headers[i]
# Slightly subtle: we need to convert everything to strings
# to prevent a mismatch between unicode/non-unicode.
v = [str(x) for x in v]
hdrs.append((i, v))
key.append(repr(hdrs))
return hashlib.sha256(repr(key)).digest()
def next_flow(self, request):
"""
Returns the next flow object, or None if no matching flow was
found.
"""
l = self.fmap.get(self._hash(request))
if not l:
return None
return l.pop(0)
class StickyCookieState:
def __init__(self, flt):
"""
flt: A compiled filter.
"""
self.jar = {}
self.flt = flt
def ckey(self, m, f):
"""
Returns a (domain, port, path) tuple.
"""
return (
m["domain"] or f.request.host,
f.request.port,
m["path"] or "/"
)
def handle_response(self, f):
for i in f.response.headers["set-cookie"]:
# FIXME: We now know that Cookie.py screws up some cookies with
# valid RFC 822/1123 datetime specifications for expiry. Sigh.
c = Cookie.SimpleCookie(str(i))
m = c.values()[0]
k = self.ckey(m, f)
if cookielib.domain_match(f.request.host, k[0]):
self.jar[self.ckey(m, f)] = m
def handle_request(self, f):
if f.match(self.flt):
for i in self.jar.keys():
match = [
cookielib.domain_match(i[0], f.request.host),
f.request.port == i[1],
f.request.path.startswith(i[2])
]
if all(match):
l = f.request.headers["cookie"]
f.request.stickycookie = True
l.append(self.jar[i].output(header="").strip())
f.request.headers["cookie"] = l
class StickyAuthState:
def __init__(self, flt):
"""
flt: A compiled filter.
"""
self.flt = flt
self.hosts = {}
def handle_request(self, f):
if "authorization" in f.request.headers:
self.hosts[f.request.host] = f.request.headers["authorization"]
elif f.match(self.flt):
if f.request.host in self.hosts:
f.request.headers["authorization"] = self.hosts[f.request.host]
class Flow:
def __init__(self, request):
self.request = request
self.response, self.error = None, None
self.intercepting = False
self._backup = None
@classmethod
def _from_state(klass, state):
f = klass(None)
f._load_state(state)
return f
@classmethod
def script_deserialize(klass, data):
try:
data = json.loads(data)
except Exception:
return None
return klass._from_state(data)
def _get_state(self, nobackup=False):
d = dict(
request = self.request._get_state() if self.request else None,
response = self.response._get_state() if self.response else None,
error = self.error._get_state() if self.error else None,
version = version.IVERSION
)
if nobackup:
d["backup"] = None
else:
d["backup"] = self._backup
return d
def _load_state(self, state):
self._backup = state["backup"]
if self.request:
self.request._load_state(state["request"])
else:
self.request = Request._from_state(state["request"])
if state["response"]:
if self.response:
self.response._load_state(state["response"])
else:
self.response = Response._from_state(self.request, state["response"])
else:
self.response = None
if state["error"]:
if self.error:
self.error._load_state(state["error"])
else:
self.error = Error._from_state(state["error"])
else:
self.error = None
def modified(self):
# FIXME: Save a serialization in backup, compare current with
# backup to detect if flow has _really_ been modified.
if self._backup:
return True
else:
return False
def backup(self):
self._backup = self._get_state(nobackup=True)
def revert(self):
if self._backup:
self._load_state(self._backup)
self._backup = None
def match(self, pattern):
if pattern:
if self.response:
return pattern(self.response)
elif self.request:
return pattern(self.request)
else:
return True
def kill(self, master):
self.error = Error(self.request, "Connection killed")
if self.request and not self.request.acked:
self.request._ack(None)
elif self.response and not self.response.acked:
self.response._ack(None)
master.handle_error(self.error)
self.intercepting = False
def intercept(self):
self.intercepting = True
def accept_intercept(self):
if self.request:
if not self.request.acked:
self.request._ack()
elif self.response and not self.response.acked:
self.response._ack()
self.intercepting = False
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in all parts of the
flow . Returns the number of replacements made.
"""
c = self.request.replace(pattern, repl, *args, **kwargs)
if self.response:
c += self.response.replace(pattern, repl, *args, **kwargs)
if self.error:
c += self.error.replace(pattern, repl, *args, **kwargs)
return c
class State(object):
def __init__(self):
self._flow_map = {}
self._flow_list = []
self.view = []
# These are compiled filt expressions:
self._limit = None
self.intercept = None
self._limit_txt = None
@property
def limit_txt(self):
return self._limit_txt
def flow_count(self):
return len(self._flow_map)
def active_flow_count(self):
c = 0
for i in self._flow_list:
if not i.response and not i.error:
c += 1
return c
def add_request(self, req):
"""
Add a request to the state. Returns the matching flow.
"""
f = Flow(req)
self._flow_list.append(f)
self._flow_map[req] = f
if f.match(self._limit):
self.view.append(f)
return f
def add_response(self, resp):
"""
Add a response to the state. Returns the matching flow.
"""
f = self._flow_map.get(resp.request)
if not f:
return False
f.response = resp
if f.match(self._limit) and not f in self.view:
self.view.append(f)
return f
def add_error(self, err):
"""
Add an error response to the state. Returns the matching flow, or
None if there isn't one.
"""
if err.request:
f = self._flow_map.get(err.request)
if not f:
return None
else:
return None
f.error = err
if f.match(self._limit) and not f in self.view:
self.view.append(f)
return f
def load_flows(self, flows):
self._flow_list.extend(flows)
for i in flows:
self._flow_map[i.request] = i
self.recalculate_view()
def set_limit(self, txt):
if txt:
f = filt.parse(txt)
if not f:
return "Invalid filter expression."
self._limit = f
self._limit_txt = txt
else:
self._limit = None
self._limit_txt = None
self.recalculate_view()
def set_intercept(self, txt):
if txt:
f = filt.parse(txt)
if not f:
return "Invalid filter expression."
self.intercept = f
self.intercept_txt = txt
else:
self.intercept = None
self.intercept_txt = None
def recalculate_view(self):
if self._limit:
self.view = [i for i in self._flow_list if i.match(self._limit)]
else:
self.view = self._flow_list[:]
def delete_flow(self, f):
if f.request in self._flow_map:
del self._flow_map[f.request]
self._flow_list.remove(f)
if f.match(self._limit):
self.view.remove(f)
return True
def clear(self):
for i in self._flow_list[:]:
self.delete_flow(i)
def accept_all(self):
for i in self._flow_list[:]:
i.accept_intercept()
def revert(self, f):
f.revert()
def killall(self, master):
for i in self._flow_list:
i.kill(master)
class FlowMaster(controller.Master):
def __init__(self, server, state):
controller.Master.__init__(self, server)
self.state = state
self.server_playback = None
self.client_playback = None
self.kill_nonreplay = False
self.script = None
self.stickycookie_state = False
self.stickycookie_txt = None
self.stickyauth_state = False
self.stickyauth_txt = None
self.anticache = False
self.anticomp = False
self.autodecode = False
self.refresh_server_playback = False
def add_event(self, e, level="info"):
"""
level: info, error
"""
pass
def get_script(self, path):
"""
Returns an (error, script) tuple.
"""
s = script.Script(path, self)
try:
s.load()
except script.ScriptError, v:
return (v.args[0], None)
ret = s.run("start")
if not ret[0] and ret[1]:
return ("Error in script start:\n\n" + ret[1][1], None)
return (None, s)
def load_script(self, path):
"""
Loads a script. Returns an error description if something went
wrong.
"""
r = self.get_script(path)
if r[0]:
return r[0]
else:
self.script = r[1]
def set_stickycookie(self, txt):
if txt:
flt = filt.parse(txt)
if not flt:
return "Invalid filter expression."
self.stickycookie_state = StickyCookieState(flt)
self.stickycookie_txt = txt
else:
self.stickycookie_state = None
self.stickycookie_txt = None
def set_stickyauth(self, txt):
if txt:
flt = filt.parse(txt)
if not flt:
return "Invalid filter expression."
self.stickyauth_state = StickyAuthState(flt)
self.stickyauth_txt = txt
else:
self.stickyauth_state = None
self.stickyauth_txt = None
def start_client_playback(self, flows, exit):
"""
flows: A list of flows.
"""
self.client_playback = ClientPlaybackState(flows, exit)
def stop_client_playback(self):
self.client_playback = None
def start_server_playback(self, flows, kill, headers, exit):
"""
flows: A list of flows.
kill: Boolean, should we kill requests not part of the replay?
"""
self.server_playback = ServerPlaybackState(headers, flows, exit)
self.kill_nonreplay = kill
def stop_server_playback(self):
self.server_playback = None
def do_server_playback(self, flow):
"""
This method should be called by child classes in the handle_request
handler. Returns True if playback has taken place, None if not.
"""
if self.server_playback:
rflow = self.server_playback.next_flow(flow)
if not rflow:
return None
response = Response._from_state(flow.request, rflow.response._get_state())
response.set_replay()
flow.response = response
if self.refresh_server_playback:
response.refresh()
flow.request._ack(response)
return True
return None
def tick(self, q):
if self.client_playback:
e = [
self.client_playback.done(),
self.client_playback.exit,
self.state.active_flow_count() == 0
]
if all(e):
self.shutdown()
self.client_playback.tick(self)
if self.server_playback:
if self.server_playback.exit and self.server_playback.count() == 0:
self.shutdown()
return controller.Master.tick(self, q)
def load_flows(self, fr):
"""
Load flows from a FlowReader object.
"""
for i in fr.stream():
if i.request:
self.handle_request(i.request)
if i.response:
self.handle_response(i.response)
if i.error:
self.handle_error(i.error)
def process_new_request(self, f):
if self.stickycookie_state:
self.stickycookie_state.handle_request(f)
if self.stickyauth_state:
self.stickyauth_state.handle_request(f)
if self.anticache:
f.request.anticache()
if self.anticomp:
f.request.anticomp()
if self.server_playback:
pb = self.do_server_playback(f)
if not pb:
if self.kill_nonreplay:
f.kill(self)
else:
f.request._ack()
def process_new_response(self, f):
if self.stickycookie_state:
self.stickycookie_state.handle_response(f)
def replay_request(self, f):
"""
Returns None if successful, or error message if not.
"""
#begin nocover
if f.intercepting:
return "Can't replay while intercepting..."
if f.request:
f.request.set_replay()
if f.request.content:
f.request.headers["content-length"] = [str(len(f.request.content))]
f.response = None
f.error = None
self.process_new_request(f)
rt = proxy.RequestReplayThread(f, self.masterq)
rt.start()
#end nocover
def run_script(self, name, *args, **kwargs):
if self.script:
ret = self.script.run(name, *args, **kwargs)
if not ret[0] and ret[1]:
e = "Script error:\n" + ret[1][1]
self.add_event(e, "error")
def handle_clientconnect(self, cc):
self.run_script("clientconnect", cc)
self.add_event("Connect from: %s:%s"%cc.address)
cc._ack()
def handle_clientdisconnect(self, r):
self.run_script("clientdisconnect", r)
s = "Disconnect from: %s:%s"%r.client_conn.address
self.add_event(s)
if r.client_conn.requestcount:
s = " -> handled %s requests"%r.client_conn.requestcount
self.add_event(s)
if r.client_conn.connection_error:
self.add_event(
" -> error: %s"%r.client_conn.connection_error, "error"
)
r._ack()
def handle_error(self, r):
f = self.state.add_error(r)
if f:
self.run_script("error", f)
if self.client_playback:
self.client_playback.clear(f)
r._ack()
return f
def handle_request(self, r):
f = self.state.add_request(r)
self.run_script("request", f)
self.process_new_request(f)
return f
def handle_response(self, r):
f = self.state.add_response(r)
if f:
self.run_script("response", f)
if self.client_playback:
self.client_playback.clear(f)
if not f:
r._ack()
self.process_new_response(f)
return f
class FlowWriter:
def __init__(self, fo):
self.fo = fo
self.ns = netstring.FileEncoder(fo)
def add(self, flow):
d = flow._get_state()
s = json.dumps(d)
self.ns.write(s)
class FlowReadError(Exception):
@property
def strerror(self):
return self.args[0]
class FlowReader:
def __init__(self, fo):
self.fo = fo
self.ns = netstring.decode_file(fo)
def stream(self):
"""
Yields Flow objects from the dump.
"""
try:
for i in self.ns:
data = json.loads(i)
yield Flow._from_state(data)
except netstring.DecoderError:
raise FlowReadError("Invalid data format.")