Utils reorganisation: add netlib.strutils

Extract a number of string and format-related functions to netlib.strutils.
This commit is contained in:
Aldo Cortesi 2016-06-02 12:31:41 +12:00
parent eaa3b308f7
commit cccdc98426
17 changed files with 193 additions and 182 deletions

View File

@ -8,7 +8,7 @@ tcp_message Inline Script Hook API Demonstration
example cmdline invocation: example cmdline invocation:
mitmdump -T --host --tcp ".*" -q -s examples/tcp_message.py mitmdump -T --host --tcp ".*" -q -s examples/tcp_message.py
''' '''
from netlib.utils import clean_bin from netlib import strutils
def tcp_message(ctx, tcp_msg): def tcp_message(ctx, tcp_msg):
@ -22,4 +22,4 @@ def tcp_message(ctx, tcp_msg):
"client" if tcp_msg.sender == tcp_msg.client_conn else "server", "client" if tcp_msg.sender == tcp_msg.client_conn else "server",
tcp_msg.sender.address, tcp_msg.sender.address,
"server" if tcp_msg.receiver == tcp_msg.server_conn else "client", "server" if tcp_msg.receiver == tcp_msg.server_conn else "client",
tcp_msg.receiver.address, clean_bin(tcp_msg.message))) tcp_msg.receiver.address, strutils.clean_bin(tcp_msg.message)))

View File

@ -36,6 +36,7 @@ from netlib import encoding
from netlib import http from netlib import http
from netlib import odict from netlib import odict
from netlib.http import url from netlib.http import url
from netlib import strutils
import netlib.utils import netlib.utils
try: try:
@ -581,9 +582,9 @@ def safe_to_print(lines, encoding="utf8"):
clean_line = [] clean_line = []
for (style, text) in line: for (style, text) in line:
try: try:
text = netlib.utils.clean_bin(text.decode(encoding, "strict")) text = strutils.clean_bin(text.decode(encoding, "strict"))
except UnicodeDecodeError: except UnicodeDecodeError:
text = netlib.utils.clean_bin(text).decode(encoding, "strict") text = strutils.clean_bin(text).decode(encoding, "strict")
clean_line.append((style, text)) clean_line.append((style, text))
yield clean_line yield clean_line

View File

@ -13,7 +13,7 @@ from mitmproxy import filt
from mitmproxy import flow from mitmproxy import flow
from netlib import human from netlib import human
from netlib import tcp from netlib import tcp
from netlib import utils from netlib import strutils
class DumpError(Exception): class DumpError(Exception):
@ -181,8 +181,8 @@ class DumpMaster(flow.FlowMaster):
if self.o.flow_detail >= 2: if self.o.flow_detail >= 2:
headers = "\r\n".join( headers = "\r\n".join(
"{}: {}".format( "{}: {}".format(
click.style(utils.bytes_to_escaped_str(k), fg="blue", bold=True), click.style(strutils.bytes_to_escaped_str(k), fg="blue", bold=True),
click.style(utils.bytes_to_escaped_str(v), fg="blue")) click.style(strutils.bytes_to_escaped_str(v), fg="blue"))
for k, v in message.headers.fields for k, v in message.headers.fields
) )
self.echo(headers, indent=4) self.echo(headers, indent=4)
@ -244,7 +244,7 @@ class DumpMaster(flow.FlowMaster):
stickycookie = "" stickycookie = ""
if flow.client_conn: if flow.client_conn:
client = click.style(utils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True) client = click.style(strutils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True)
else: else:
client = click.style("[replay]", fg="yellow", bold=True) client = click.style("[replay]", fg="yellow", bold=True)
@ -253,12 +253,12 @@ class DumpMaster(flow.FlowMaster):
GET="green", GET="green",
DELETE="red" DELETE="red"
).get(method.upper(), "magenta") ).get(method.upper(), "magenta")
method = click.style(utils.bytes_to_escaped_str(method), fg=method_color, bold=True) method = click.style(strutils.bytes_to_escaped_str(method), fg=method_color, bold=True)
if self.showhost: if self.showhost:
url = flow.request.pretty_url url = flow.request.pretty_url
else: else:
url = flow.request.url url = flow.request.url
url = click.style(utils.bytes_to_escaped_str(url), bold=True) url = click.style(strutils.bytes_to_escaped_str(url), bold=True)
httpversion = "" httpversion = ""
if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"): if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"):
@ -288,7 +288,7 @@ class DumpMaster(flow.FlowMaster):
elif 400 <= code < 600: elif 400 <= code < 600:
code_color = "red" code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418)) code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
reason = click.style(utils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True) reason = click.style(strutils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True)
if flow.response.content is None: if flow.response.content is None:
size = "(content missing)" size = "(content missing)"

View File

@ -16,7 +16,7 @@ from mitmproxy.flow import modules
from mitmproxy.onboarding import app from mitmproxy.onboarding import app
from mitmproxy.protocol import http_replay from mitmproxy.protocol import http_replay
from mitmproxy.proxy.config import HostMatcher from mitmproxy.proxy.config import HostMatcher
from netlib import utils from netlib import strutils
class FlowMaster(controller.Master): class FlowMaster(controller.Master):
@ -499,7 +499,7 @@ class FlowMaster(controller.Master):
server=repr(flow.server_conn.address), server=repr(flow.server_conn.address),
direction=direction, direction=direction,
), "info") ), "info")
self.add_event(utils.clean_bin(message.content), "debug") self.add_event(strutils.clean_bin(message.content), "debug")
@controller.handler @controller.handler
def tcp_error(self, flow): def tcp_error(self, flow):

View File

@ -4,7 +4,7 @@ import re
import six import six
from netlib import multidict from netlib import multidict
from netlib import utils from netlib import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/ # See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
@ -20,7 +20,7 @@ else:
return x.decode("utf-8", "surrogateescape") return x.decode("utf-8", "surrogateescape")
def _always_bytes(x): def _always_bytes(x):
return utils.always_bytes(x, "utf-8", "surrogateescape") return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict): class Headers(multidict.MultiDict):

View File

@ -4,7 +4,7 @@ import warnings
import six import six
from netlib import encoding, utils, basetypes from netlib import encoding, strutils, basetypes
from netlib.http import headers from netlib.http import headers
if six.PY2: # pragma: no cover if six.PY2: # pragma: no cover
@ -19,7 +19,7 @@ else:
return x.decode("utf-8", "surrogateescape") return x.decode("utf-8", "surrogateescape")
def _always_bytes(x): def _always_bytes(x):
return utils.always_bytes(x, "utf-8", "surrogateescape") return strutils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(basetypes.Serializable): class MessageData(basetypes.Serializable):
@ -200,7 +200,7 @@ class Message(basetypes.Serializable):
replacements = 0 replacements = 0
if self.content: if self.content:
with decoded(self): with decoded(self):
self.content, replacements = utils.safe_subn( self.content, replacements = strutils.safe_subn(
pattern, repl, self.content, flags=flags pattern, repl, self.content, flags=flags
) )
replacements += self.headers.replace(pattern, repl, flags) replacements += self.headers.replace(pattern, repl, flags)

View File

@ -7,7 +7,7 @@ from six.moves import urllib
from netlib import encoding from netlib import encoding
from netlib import multidict from netlib import multidict
from netlib import utils from netlib import strutils
from netlib.http import multipart from netlib.http import multipart
from netlib.http import cookies from netlib.http import cookies
from netlib.http import headers as nheaders from netlib.http import headers as nheaders
@ -67,7 +67,7 @@ class Request(message.Message):
""" """
# TODO: Proper distinction between text and bytes. # TODO: Proper distinction between text and bytes.
c = super(Request, self).replace(pattern, repl, flags) c = super(Request, self).replace(pattern, repl, flags)
self.path, pc = utils.safe_subn( self.path, pc = strutils.safe_subn(
pattern, repl, self.path, flags=flags pattern, repl, self.path, flags=flags
) )
c += pc c += pc

View File

@ -3,7 +3,7 @@ import copy
import six import six
from netlib import basetypes, utils from netlib import basetypes, strutils
class ODict(basetypes.Serializable): class ODict(basetypes.Serializable):
@ -139,9 +139,9 @@ class ODict(basetypes.Serializable):
""" """
new, count = [], 0 new, count = [], 0
for k, v in self.lst: for k, v in self.lst:
k, c = utils.safe_subn(pattern, repl, k, *args, **kwargs) k, c = strutils.safe_subn(pattern, repl, k, *args, **kwargs)
count += c count += c
v, c = utils.safe_subn(pattern, repl, v, *args, **kwargs) v, c = strutils.safe_subn(pattern, repl, v, *args, **kwargs)
count += c count += c
new.append([k, v]) new.append([k, v])
self.lst = new self.lst = new

103
netlib/strutils.py Normal file
View File

@ -0,0 +1,103 @@
import re
import unicodedata
import codecs
import six
def always_bytes(unicode_or_bytes, *encode_args):
if isinstance(unicode_or_bytes, six.text_type):
return unicode_or_bytes.encode(*encode_args)
return unicode_or_bytes
def native(s, *encoding_opts):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using latin1 encoding if conversion is necessary.
https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
"""
if not isinstance(s, (six.binary_type, six.text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if six.PY3:
if isinstance(s, six.binary_type):
return s.decode(*encoding_opts)
else:
if isinstance(s, six.text_type):
return s.encode(*encoding_opts)
return s
def clean_bin(s, keep_spacing=True):
"""
Cleans binary data to make it safe to display.
Args:
keep_spacing: If False, tabs and newlines will also be replaced.
"""
if isinstance(s, six.text_type):
if keep_spacing:
keep = u" \n\r\t"
else:
keep = u" "
return u"".join(
ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
for ch in s
)
else:
if keep_spacing:
keep = (9, 10, 13) # \t, \n, \r,
else:
keep = ()
return b"".join(
six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
for ch in six.iterbytes(s)
)
def safe_subn(pattern, repl, target, *args, **kwargs):
"""
There are Unicode conversion problems with re.subn. We try to smooth
that over by casting the pattern and replacement to strings. We really
need a better solution that is aware of the actual content ecoding.
"""
return re.subn(str(pattern), str(repl), target, *args, **kwargs)
def bytes_to_escaped_str(data):
"""
Take bytes and return a safe string that can be displayed to the user.
Single quotes are always escaped, double quotes are never escaped:
"'" + bytes_to_escaped_str(...) + "'"
gives a valid Python string.
"""
# TODO: We may want to support multi-byte characters without escaping them.
# One way to do would be calling .decode("utf8", "backslashreplace") first
# and then escaping UTF8 control chars (see clean_bin).
if not isinstance(data, bytes):
raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
# We always insert a double-quote here so that we get a single-quoted string back
# https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
return repr(b'"' + data).lstrip("b")[2:-1]
def escaped_str_to_bytes(data):
"""
Take an escaped string and return the unescaped bytes equivalent.
"""
if not isinstance(data, six.string_types):
if six.PY2:
raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
if six.PY2:
if isinstance(data, unicode):
data = data.encode("utf8")
return data.decode("string-escape")
# This one is difficult - we use an undocumented Python API here
# as per http://stackoverflow.com/a/23151714/934719
return codecs.escape_decode(data)[0]

View File

@ -1,63 +1,12 @@
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
import os.path import os.path
import re import re
import codecs
import unicodedata
import importlib import importlib
import inspect import inspect
import six import six
from netlib import strutils
def always_bytes(unicode_or_bytes, *encode_args):
if isinstance(unicode_or_bytes, six.text_type):
return unicode_or_bytes.encode(*encode_args)
return unicode_or_bytes
def native(s, *encoding_opts):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using latin1 encoding if conversion is necessary.
https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
"""
if not isinstance(s, (six.binary_type, six.text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if six.PY3:
if isinstance(s, six.binary_type):
return s.decode(*encoding_opts)
else:
if isinstance(s, six.text_type):
return s.encode(*encoding_opts)
return s
def clean_bin(s, keep_spacing=True):
"""
Cleans binary data to make it safe to display.
Args:
keep_spacing: If False, tabs and newlines will also be replaced.
"""
if isinstance(s, six.text_type):
if keep_spacing:
keep = u" \n\r\t"
else:
keep = u" "
return u"".join(
ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
for ch in s
)
else:
if keep_spacing:
keep = (9, 10, 13) # \t, \n, \r,
else:
keep = ()
return b"".join(
six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
for ch in six.iterbytes(s)
)
def hexdump(s): def hexdump(s):
@ -70,7 +19,7 @@ def hexdump(s):
part = s[i:i + 16] part = s[i:i + 16]
x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part)) x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
x = x.ljust(47) # 16*2 + 15 x = x.ljust(47) # 16*2 + 15
yield (offset, x, clean_bin(part, False)) yield (offset, x, strutils.clean_bin(part, False))
def setbit(byte, offset, value): def setbit(byte, offset, value):
@ -173,50 +122,3 @@ def hostport(scheme, host, port):
return b"%s:%d" % (host, port) return b"%s:%d" % (host, port)
else: else:
return "%s:%d" % (host, port) return "%s:%d" % (host, port)
def safe_subn(pattern, repl, target, *args, **kwargs):
"""
There are Unicode conversion problems with re.subn. We try to smooth
that over by casting the pattern and replacement to strings. We really
need a better solution that is aware of the actual content ecoding.
"""
return re.subn(str(pattern), str(repl), target, *args, **kwargs)
def bytes_to_escaped_str(data):
"""
Take bytes and return a safe string that can be displayed to the user.
Single quotes are always escaped, double quotes are never escaped:
"'" + bytes_to_escaped_str(...) + "'"
gives a valid Python string.
"""
# TODO: We may want to support multi-byte characters without escaping them.
# One way to do would be calling .decode("utf8", "backslashreplace") first
# and then escaping UTF8 control chars (see clean_bin).
if not isinstance(data, bytes):
raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
# We always insert a double-quote here so that we get a single-quoted string back
# https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
return repr(b'"' + data).lstrip("b")[2:-1]
def escaped_str_to_bytes(data):
"""
Take an escaped string and return the unescaped bytes equivalent.
"""
if not isinstance(data, six.string_types):
if six.PY2:
raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
if six.PY2:
if isinstance(data, unicode):
data = data.encode("utf8")
return data.decode("string-escape")
# This one is difficult - we use an undocumented Python API here
# as per http://stackoverflow.com/a/23151714/934719
return codecs.escape_decode(data)[0]

View File

@ -7,6 +7,7 @@ import warnings
import six import six
from netlib import tcp from netlib import tcp
from netlib import strutils
from netlib import utils from netlib import utils
from netlib import human from netlib import human
from netlib.websockets import protocol from netlib.websockets import protocol
@ -254,7 +255,7 @@ class Frame(object):
def __repr__(self): def __repr__(self):
ret = repr(self.header) ret = repr(self.header)
if self.payload: if self.payload:
ret = ret + "\nPayload:\n" + utils.clean_bin(self.payload).decode("ascii") ret = ret + "\nPayload:\n" + strutils.clean_bin(self.payload).decode("ascii")
return ret return ret
def human_readable(self): def human_readable(self):

View File

@ -6,7 +6,7 @@ import six
from io import BytesIO from io import BytesIO
from six.moves import urllib from six.moves import urllib
from netlib import http, tcp, utils from netlib import http, tcp, strutils
class ClientConn(object): class ClientConn(object):
@ -54,38 +54,38 @@ class WSGIAdaptor(object):
self.app, self.domain, self.port, self.sversion = app, domain, port, sversion self.app, self.domain, self.port, self.sversion = app, domain, port, sversion
def make_environ(self, flow, errsoc, **extra): def make_environ(self, flow, errsoc, **extra):
path = utils.native(flow.request.path, "latin-1") path = strutils.native(flow.request.path, "latin-1")
if '?' in path: if '?' in path:
path_info, query = utils.native(path, "latin-1").split('?', 1) path_info, query = strutils.native(path, "latin-1").split('?', 1)
else: else:
path_info = path path_info = path
query = '' query = ''
environ = { environ = {
'wsgi.version': (1, 0), 'wsgi.version': (1, 0),
'wsgi.url_scheme': utils.native(flow.request.scheme, "latin-1"), 'wsgi.url_scheme': strutils.native(flow.request.scheme, "latin-1"),
'wsgi.input': BytesIO(flow.request.content or b""), 'wsgi.input': BytesIO(flow.request.content or b""),
'wsgi.errors': errsoc, 'wsgi.errors': errsoc,
'wsgi.multithread': True, 'wsgi.multithread': True,
'wsgi.multiprocess': False, 'wsgi.multiprocess': False,
'wsgi.run_once': False, 'wsgi.run_once': False,
'SERVER_SOFTWARE': self.sversion, 'SERVER_SOFTWARE': self.sversion,
'REQUEST_METHOD': utils.native(flow.request.method, "latin-1"), 'REQUEST_METHOD': strutils.native(flow.request.method, "latin-1"),
'SCRIPT_NAME': '', 'SCRIPT_NAME': '',
'PATH_INFO': urllib.parse.unquote(path_info), 'PATH_INFO': urllib.parse.unquote(path_info),
'QUERY_STRING': query, 'QUERY_STRING': query,
'CONTENT_TYPE': utils.native(flow.request.headers.get('Content-Type', ''), "latin-1"), 'CONTENT_TYPE': strutils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
'CONTENT_LENGTH': utils.native(flow.request.headers.get('Content-Length', ''), "latin-1"), 'CONTENT_LENGTH': strutils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
'SERVER_NAME': self.domain, 'SERVER_NAME': self.domain,
'SERVER_PORT': str(self.port), 'SERVER_PORT': str(self.port),
'SERVER_PROTOCOL': utils.native(flow.request.http_version, "latin-1"), 'SERVER_PROTOCOL': strutils.native(flow.request.http_version, "latin-1"),
} }
environ.update(extra) environ.update(extra)
if flow.client_conn.address: if flow.client_conn.address:
environ["REMOTE_ADDR"] = utils.native(flow.client_conn.address.host, "latin-1") environ["REMOTE_ADDR"] = strutils.native(flow.client_conn.address.host, "latin-1")
environ["REMOTE_PORT"] = flow.client_conn.address.port environ["REMOTE_PORT"] = flow.client_conn.address.port
for key, value in flow.request.headers.items(): for key, value in flow.request.headers.items():
key = 'HTTP_' + utils.native(key, "latin-1").upper().replace('-', '_') key = 'HTTP_' + strutils.native(key, "latin-1").upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'): if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value environ[key] = value
return environ return environ
@ -139,7 +139,7 @@ class WSGIAdaptor(object):
elif state["status"]: elif state["status"]:
raise AssertionError('Response already started') raise AssertionError('Response already started')
state["status"] = status state["status"] = status
state["headers"] = http.Headers([[utils.always_bytes(k), utils.always_bytes(v)] for k, v in headers]) state["headers"] = http.Headers([[strutils.always_bytes(k), strutils.always_bytes(v)] for k, v in headers])
if exc_info: if exc_info:
self.error_page(soc, state["headers_sent"], traceback.format_tb(exc_info[2])) self.error_page(soc, state["headers_sent"], traceback.format_tb(exc_info[2]))
state["headers_sent"] = True state["headers_sent"] = True

View File

@ -5,7 +5,7 @@ import pyparsing as pp
import six import six
from six.moves import reduce from six.moves import reduce
from netlib.utils import escaped_str_to_bytes, bytes_to_escaped_str from netlib import strutils
from netlib import human from netlib import human
from . import generators, exceptions from . import generators, exceptions
@ -110,7 +110,7 @@ class Token(object):
class _TokValueLiteral(Token): class _TokValueLiteral(Token):
def __init__(self, val): def __init__(self, val):
self.val = escaped_str_to_bytes(val) self.val = strutils.escaped_str_to_bytes(val)
def get_generator(self, settings_): def get_generator(self, settings_):
return self.val return self.val
@ -135,7 +135,7 @@ class TokValueLiteral(_TokValueLiteral):
return v return v
def spec(self): def spec(self):
inner = bytes_to_escaped_str(self.val) inner = strutils.bytes_to_escaped_str(self.val)
inner = inner.replace(r"\'", r"\x27") inner = inner.replace(r"\'", r"\x27")
return "'" + inner + "'" return "'" + inner + "'"
@ -148,7 +148,7 @@ class TokValueNakedLiteral(_TokValueLiteral):
return e.setParseAction(lambda x: cls(*x)) return e.setParseAction(lambda x: cls(*x))
def spec(self): def spec(self):
return bytes_to_escaped_str(self.val) return strutils.bytes_to_escaped_str(self.val)
class TokValueGenerate(Token): class TokValueGenerate(Token):
@ -166,7 +166,7 @@ class TokValueGenerate(Token):
def freeze(self, settings): def freeze(self, settings):
g = self.get_generator(settings) g = self.get_generator(settings)
return TokValueLiteral(bytes_to_escaped_str(g[:])) return TokValueLiteral(strutils.bytes_to_escaped_str(g[:]))
@classmethod @classmethod
def expr(cls): def expr(cls):
@ -226,7 +226,7 @@ class TokValueFile(Token):
return generators.FileGenerator(s) return generators.FileGenerator(s)
def spec(self): def spec(self):
return "<'%s'" % bytes_to_escaped_str(self.path) return "<'%s'" % strutils.bytes_to_escaped_str(self.path)
TokValue = pp.MatchFirst( TokValue = pp.MatchFirst(
@ -578,4 +578,4 @@ class NestedMessage(Token):
def freeze(self, settings): def freeze(self, settings):
f = self.parsed.freeze(settings).spec() f = self.parsed.freeze(settings).spec()
return self.__class__(TokValueLiteral(bytes_to_escaped_str(f))) return self.__class__(TokValueLiteral(strutils.bytes_to_escaped_str(f)))

View File

@ -5,6 +5,7 @@ import six
import netlib.utils import netlib.utils
import netlib.tcp import netlib.tcp
import netlib.http import netlib.http
from netlib import strutils
TIMEFMT = '%d-%m-%y %H:%M:%S' TIMEFMT = '%d-%m-%y %H:%M:%S'
@ -65,7 +66,7 @@ class LogCtx(object):
for line in netlib.utils.hexdump(data): for line in netlib.utils.hexdump(data):
self("\t%s %s %s" % line) self("\t%s %s %s" % line)
else: else:
for i in netlib.utils.clean_bin(data).split("\n"): for i in strutils.clean_bin(data).split("\n"):
self("\t%s" % i) self("\t%s" % i)
def __call__(self, line): def __call__(self, line):

View File

@ -2,7 +2,7 @@ import os
import sys import sys
import netlib.utils import netlib.utils
from netlib.utils import bytes_to_escaped_str from netlib import strutils
class MemBool(object): class MemBool(object):
@ -38,7 +38,7 @@ def escape_unprintables(s):
""" """
s = s.replace(b"\r\n", b"PATHOD_MARKER_RN") s = s.replace(b"\r\n", b"PATHOD_MARKER_RN")
s = s.replace(b"\n", b"PATHOD_MARKER_N") s = s.replace(b"\n", b"PATHOD_MARKER_N")
s = bytes_to_escaped_str(s) s = strutils.bytes_to_escaped_str(s)
s = s.replace("PATHOD_MARKER_RN", "\n") s = s.replace("PATHOD_MARKER_RN", "\n")
s = s.replace("PATHOD_MARKER_N", "\n") s = s.replace("PATHOD_MARKER_N", "\n")
return s return s

View File

@ -0,0 +1,40 @@
# coding=utf-8
from netlib import strutils
def test_clean_bin():
assert strutils.clean_bin(b"one") == b"one"
assert strutils.clean_bin(b"\00ne") == b".ne"
assert strutils.clean_bin(b"\nne") == b"\nne"
assert strutils.clean_bin(b"\nne", False) == b".ne"
assert strutils.clean_bin(u"\u2605".encode("utf8")) == b"..."
assert strutils.clean_bin(u"one") == u"one"
assert strutils.clean_bin(u"\00ne") == u".ne"
assert strutils.clean_bin(u"\nne") == u"\nne"
assert strutils.clean_bin(u"\nne", False) == u".ne"
assert strutils.clean_bin(u"\u2605") == u"\u2605"
def test_safe_subn():
assert strutils.safe_subn("foo", u"bar", "\xc2foo")
def test_bytes_to_escaped_str():
assert strutils.bytes_to_escaped_str(b"foo") == "foo"
assert strutils.bytes_to_escaped_str(b"\b") == r"\x08"
assert strutils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
assert strutils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
assert strutils.bytes_to_escaped_str(b"'") == r"\'"
assert strutils.bytes_to_escaped_str(b'"') == r'"'
def test_escaped_str_to_bytes():
assert strutils.escaped_str_to_bytes("foo") == b"foo"
assert strutils.escaped_str_to_bytes("\x08") == b"\b"
assert strutils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
assert strutils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
assert strutils.escaped_str_to_bytes(u"\\x08") == b"\b"
assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
assert strutils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'

View File

@ -14,40 +14,3 @@ def test_bidi():
def test_hexdump(): def test_hexdump():
assert list(utils.hexdump(b"one\0" * 10)) assert list(utils.hexdump(b"one\0" * 10))
def test_clean_bin():
assert utils.clean_bin(b"one") == b"one"
assert utils.clean_bin(b"\00ne") == b".ne"
assert utils.clean_bin(b"\nne") == b"\nne"
assert utils.clean_bin(b"\nne", False) == b".ne"
assert utils.clean_bin(u"\u2605".encode("utf8")) == b"..."
assert utils.clean_bin(u"one") == u"one"
assert utils.clean_bin(u"\00ne") == u".ne"
assert utils.clean_bin(u"\nne") == u"\nne"
assert utils.clean_bin(u"\nne", False) == u".ne"
assert utils.clean_bin(u"\u2605") == u"\u2605"
def test_safe_subn():
assert utils.safe_subn("foo", u"bar", "\xc2foo")
def test_bytes_to_escaped_str():
assert utils.bytes_to_escaped_str(b"foo") == "foo"
assert utils.bytes_to_escaped_str(b"\b") == r"\x08"
assert utils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
assert utils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
assert utils.bytes_to_escaped_str(b"'") == r"\'"
assert utils.bytes_to_escaped_str(b'"') == r'"'
def test_escaped_str_to_bytes():
assert utils.escaped_str_to_bytes("foo") == b"foo"
assert utils.escaped_str_to_bytes("\x08") == b"\b"
assert utils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
assert utils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
assert utils.escaped_str_to_bytes(u"\\x08") == b"\b"
assert utils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
assert utils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'