Merge pull request #1191 from cortesi/utils

Utils reorganisation
This commit is contained in:
Aldo Cortesi 2016-06-02 13:14:56 +12:00
commit 07f7905f91
23 changed files with 281 additions and 298 deletions

View File

@ -1,7 +1,8 @@
import string
import lxml.html
import lxml.etree
from mitmproxy import utils, contentviews
from mitmproxy import contentviews
from netlib import strutils
class ViewPigLatin(contentviews.View):
@ -10,7 +11,7 @@ class ViewPigLatin(contentviews.View):
content_types = ["text/html"]
def __call__(self, data, **metadata):
if utils.isXML(data):
if strutils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True

View File

@ -8,7 +8,7 @@ tcp_message Inline Script Hook API Demonstration
example cmdline invocation:
mitmdump -T --host --tcp ".*" -q -s examples/tcp_message.py
'''
from netlib.utils import clean_bin
from netlib import strutils
def tcp_message(ctx, tcp_msg):
@ -22,4 +22,4 @@ def tcp_message(ctx, tcp_msg):
"client" if tcp_msg.sender == tcp_msg.client_conn else "server",
tcp_msg.sender.address,
"server" if tcp_msg.receiver == tcp_msg.server_conn else "client",
tcp_msg.receiver.address, clean_bin(tcp_msg.message)))
tcp_msg.receiver.address, strutils.clean_bin(tcp_msg.message)))

View File

@ -36,7 +36,7 @@ from netlib import encoding
from netlib import http
from netlib import odict
from netlib.http import url
import netlib.utils
from netlib import strutils
try:
import pyamf
@ -129,11 +129,11 @@ class ViewAuto(View):
ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map:
return content_types_map[ct][0](data, **metadata)
elif mitmproxy.utils.isXML(data):
elif strutils.isXML(data):
return get("XML")(data, **metadata)
if metadata.get("query"):
return get("Query")(data, **metadata)
if data and mitmproxy.utils.isMostlyBin(data):
if data and strutils.isMostlyBin(data):
return get("Hex")(data)
if not data:
return "No content", []
@ -156,7 +156,7 @@ class ViewHex(View):
@staticmethod
def _format(data):
for offset, hexa, s in netlib.utils.hexdump(data):
for offset, hexa, s in strutils.hexdump(data):
yield [
("offset", offset + " "),
("text", hexa + " "),
@ -226,7 +226,7 @@ class ViewHTML(View):
content_types = ["text/html"]
def __call__(self, data, **metadata):
if mitmproxy.utils.isXML(data):
if strutils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
@ -581,9 +581,9 @@ def safe_to_print(lines, encoding="utf8"):
clean_line = []
for (style, text) in line:
try:
text = netlib.utils.clean_bin(text.decode(encoding, "strict"))
text = strutils.clean_bin(text.decode(encoding, "strict"))
except UnicodeDecodeError:
text = netlib.utils.clean_bin(text).decode(encoding, "strict")
text = strutils.clean_bin(text).decode(encoding, "strict")
clean_line.append((style, text))
yield clean_line

View File

@ -13,7 +13,7 @@ from mitmproxy import filt
from mitmproxy import flow
from netlib import human
from netlib import tcp
from netlib import utils
from netlib import strutils
class DumpError(Exception):
@ -181,8 +181,8 @@ class DumpMaster(flow.FlowMaster):
if self.o.flow_detail >= 2:
headers = "\r\n".join(
"{}: {}".format(
click.style(utils.bytes_to_escaped_str(k), fg="blue", bold=True),
click.style(utils.bytes_to_escaped_str(v), fg="blue"))
click.style(strutils.bytes_to_escaped_str(k), fg="blue", bold=True),
click.style(strutils.bytes_to_escaped_str(v), fg="blue"))
for k, v in message.headers.fields
)
self.echo(headers, indent=4)
@ -244,7 +244,7 @@ class DumpMaster(flow.FlowMaster):
stickycookie = ""
if flow.client_conn:
client = click.style(utils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True)
client = click.style(strutils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True)
else:
client = click.style("[replay]", fg="yellow", bold=True)
@ -253,12 +253,12 @@ class DumpMaster(flow.FlowMaster):
GET="green",
DELETE="red"
).get(method.upper(), "magenta")
method = click.style(utils.bytes_to_escaped_str(method), fg=method_color, bold=True)
method = click.style(strutils.bytes_to_escaped_str(method), fg=method_color, bold=True)
if self.showhost:
url = flow.request.pretty_url
else:
url = flow.request.url
url = click.style(utils.bytes_to_escaped_str(url), bold=True)
url = click.style(strutils.bytes_to_escaped_str(url), bold=True)
httpversion = ""
if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"):
@ -288,7 +288,7 @@ class DumpMaster(flow.FlowMaster):
elif 400 <= code < 600:
code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
reason = click.style(utils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True)
reason = click.style(strutils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True)
if flow.response.content is None:
size = "(content missing)"

View File

@ -16,7 +16,7 @@ from mitmproxy.flow import modules
from mitmproxy.onboarding import app
from mitmproxy.protocol import http_replay
from mitmproxy.proxy.config import HostMatcher
from netlib import utils
from netlib import strutils
class FlowMaster(controller.Master):
@ -499,7 +499,7 @@ class FlowMaster(controller.Master):
server=repr(flow.server_conn.address),
direction=direction,
), "info")
self.add_event(utils.clean_bin(message.content), "debug")
self.add_event(strutils.clean_bin(message.content), "debug")
@controller.handler
def tcp_error(self, flow):

View File

@ -25,32 +25,6 @@ def format_timestamp_with_milli(s):
return d.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def isBin(s):
"""
Does this string have any non-ASCII characters?
"""
for i in s:
i = ord(i)
if i < 9 or 13 < i < 32 or 126 < i:
return True
return False
def isMostlyBin(s):
s = s[:100]
return sum(isBin(ch) for ch in s) / len(s) > 0.3
def isXML(s):
for i in s:
if i in "\n \t":
continue
elif i == "<":
return True
else:
return False
def pretty_json(s):
try:
p = json.loads(s)
@ -92,15 +66,3 @@ class LRUCache:
d = self.cacheList.pop()
self.cache.pop(d)
return ret
def clean_hanging_newline(t):
"""
Many editors will silently add a newline to the final line of a
document (I'm looking at you, Vim). This function fixes this common
problem at the risk of removing a hanging newline in the rare cases
where the user actually intends it.
"""
if t and t[-1] == "\n":
return t[:-1]
return t

View File

@ -4,7 +4,7 @@ import re
import six
from netlib import multidict
from netlib import utils
from netlib import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
@ -20,7 +20,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
return utils.always_bytes(x, "utf-8", "surrogateescape")
return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):

View File

@ -4,7 +4,7 @@ import warnings
import six
from netlib import encoding, utils, basetypes
from netlib import encoding, strutils, basetypes
from netlib.http import headers
if six.PY2: # pragma: no cover
@ -19,7 +19,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
return utils.always_bytes(x, "utf-8", "surrogateescape")
return strutils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(basetypes.Serializable):
@ -200,7 +200,7 @@ class Message(basetypes.Serializable):
replacements = 0
if self.content:
with decoded(self):
self.content, replacements = utils.safe_subn(
self.content, replacements = strutils.safe_subn(
pattern, repl, self.content, flags=flags
)
replacements += self.headers.replace(pattern, repl, flags)

View File

@ -7,7 +7,7 @@ from six.moves import urllib
from netlib import encoding
from netlib import multidict
from netlib import utils
from netlib import strutils
from netlib.http import multipart
from netlib.http import cookies
from netlib.http import headers as nheaders
@ -67,7 +67,7 @@ class Request(message.Message):
"""
# TODO: Proper distinction between text and bytes.
c = super(Request, self).replace(pattern, repl, flags)
self.path, pc = utils.safe_subn(
self.path, pc = strutils.safe_subn(
pattern, repl, self.path, flags=flags
)
c += pc

View File

@ -3,7 +3,7 @@ import copy
import six
from netlib import basetypes, utils
from netlib import basetypes, strutils
class ODict(basetypes.Serializable):
@ -139,9 +139,9 @@ class ODict(basetypes.Serializable):
"""
new, count = [], 0
for k, v in self.lst:
k, c = utils.safe_subn(pattern, repl, k, *args, **kwargs)
k, c = strutils.safe_subn(pattern, repl, k, *args, **kwargs)
count += c
v, c = utils.safe_subn(pattern, repl, v, *args, **kwargs)
v, c = strutils.safe_subn(pattern, repl, v, *args, **kwargs)
count += c
new.append([k, v])
self.lst = new

154
netlib/strutils.py Normal file
View File

@ -0,0 +1,154 @@
import re
import unicodedata
import codecs
import six
def always_bytes(unicode_or_bytes, *encode_args):
if isinstance(unicode_or_bytes, six.text_type):
return unicode_or_bytes.encode(*encode_args)
return unicode_or_bytes
def native(s, *encoding_opts):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using latin1 encoding if conversion is necessary.
https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
"""
if not isinstance(s, (six.binary_type, six.text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if six.PY3:
if isinstance(s, six.binary_type):
return s.decode(*encoding_opts)
else:
if isinstance(s, six.text_type):
return s.encode(*encoding_opts)
return s
def clean_bin(s, keep_spacing=True):
"""
Cleans binary data to make it safe to display.
Args:
keep_spacing: If False, tabs and newlines will also be replaced.
"""
if isinstance(s, six.text_type):
if keep_spacing:
keep = u" \n\r\t"
else:
keep = u" "
return u"".join(
ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
for ch in s
)
else:
if keep_spacing:
keep = (9, 10, 13) # \t, \n, \r,
else:
keep = ()
return b"".join(
six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
for ch in six.iterbytes(s)
)
def safe_subn(pattern, repl, target, *args, **kwargs):
"""
There are Unicode conversion problems with re.subn. We try to smooth
that over by casting the pattern and replacement to strings. We really
need a better solution that is aware of the actual content ecoding.
"""
return re.subn(str(pattern), str(repl), target, *args, **kwargs)
def bytes_to_escaped_str(data):
"""
Take bytes and return a safe string that can be displayed to the user.
Single quotes are always escaped, double quotes are never escaped:
"'" + bytes_to_escaped_str(...) + "'"
gives a valid Python string.
"""
# TODO: We may want to support multi-byte characters without escaping them.
# One way to do would be calling .decode("utf8", "backslashreplace") first
# and then escaping UTF8 control chars (see clean_bin).
if not isinstance(data, bytes):
raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
# We always insert a double-quote here so that we get a single-quoted string back
# https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
return repr(b'"' + data).lstrip("b")[2:-1]
def escaped_str_to_bytes(data):
"""
Take an escaped string and return the unescaped bytes equivalent.
"""
if not isinstance(data, six.string_types):
if six.PY2:
raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
if six.PY2:
if isinstance(data, unicode):
data = data.encode("utf8")
return data.decode("string-escape")
# This one is difficult - we use an undocumented Python API here
# as per http://stackoverflow.com/a/23151714/934719
return codecs.escape_decode(data)[0]
def isBin(s):
"""
Does this string have any non-ASCII characters?
"""
for i in s:
i = ord(i)
if i < 9 or 13 < i < 32 or 126 < i:
return True
return False
def isMostlyBin(s):
s = s[:100]
return sum(isBin(ch) for ch in s) / len(s) > 0.3
def isXML(s):
for i in s:
if i in "\n \t":
continue
elif i == "<":
return True
else:
return False
def clean_hanging_newline(t):
"""
Many editors will silently add a newline to the final line of a
document (I'm looking at you, Vim). This function fixes this common
problem at the risk of removing a hanging newline in the rare cases
where the user actually intends it.
"""
if t and t[-1] == "\n":
return t[:-1]
return t
def hexdump(s):
"""
Returns:
A generator of (offset, hex, str) tuples
"""
for i in range(0, len(s), 16):
offset = "{:0=10x}".format(i).encode()
part = s[i:i + 16]
x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
x = x.ljust(47) # 16*2 + 15
yield (offset, x, clean_bin(part, False))

View File

@ -1,78 +1,12 @@
from __future__ import absolute_import, print_function, division
import os.path
import re
import codecs
import unicodedata
import importlib
import inspect
import six
def always_bytes(unicode_or_bytes, *encode_args):
if isinstance(unicode_or_bytes, six.text_type):
return unicode_or_bytes.encode(*encode_args)
return unicode_or_bytes
def native(s, *encoding_opts):
"""
Convert :py:class:`bytes` or :py:class:`unicode` to the native
:py:class:`str` type, using latin1 encoding if conversion is necessary.
https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
"""
if not isinstance(s, (six.binary_type, six.text_type)):
raise TypeError("%r is neither bytes nor unicode" % s)
if six.PY3:
if isinstance(s, six.binary_type):
return s.decode(*encoding_opts)
else:
if isinstance(s, six.text_type):
return s.encode(*encoding_opts)
return s
def clean_bin(s, keep_spacing=True):
"""
Cleans binary data to make it safe to display.
Args:
keep_spacing: If False, tabs and newlines will also be replaced.
"""
if isinstance(s, six.text_type):
if keep_spacing:
keep = u" \n\r\t"
else:
keep = u" "
return u"".join(
ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
for ch in s
)
else:
if keep_spacing:
keep = (9, 10, 13) # \t, \n, \r,
else:
keep = ()
return b"".join(
six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
for ch in six.iterbytes(s)
)
def hexdump(s):
"""
Returns:
A generator of (offset, hex, str) tuples
"""
for i in range(0, len(s), 16):
offset = "{:0=10x}".format(i).encode()
part = s[i:i + 16]
x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
x = x.ljust(47) # 16*2 + 15
yield (offset, x, clean_bin(part, False))
def setbit(byte, offset, value):
"""
Set a bit in a byte to 1 if value is truthy, 0 if not.
@ -173,50 +107,3 @@ def hostport(scheme, host, port):
return b"%s:%d" % (host, port)
else:
return "%s:%d" % (host, port)
def safe_subn(pattern, repl, target, *args, **kwargs):
"""
There are Unicode conversion problems with re.subn. We try to smooth
that over by casting the pattern and replacement to strings. We really
need a better solution that is aware of the actual content ecoding.
"""
return re.subn(str(pattern), str(repl), target, *args, **kwargs)
def bytes_to_escaped_str(data):
"""
Take bytes and return a safe string that can be displayed to the user.
Single quotes are always escaped, double quotes are never escaped:
"'" + bytes_to_escaped_str(...) + "'"
gives a valid Python string.
"""
# TODO: We may want to support multi-byte characters without escaping them.
# One way to do would be calling .decode("utf8", "backslashreplace") first
# and then escaping UTF8 control chars (see clean_bin).
if not isinstance(data, bytes):
raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
# We always insert a double-quote here so that we get a single-quoted string back
# https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
return repr(b'"' + data).lstrip("b")[2:-1]
def escaped_str_to_bytes(data):
"""
Take an escaped string and return the unescaped bytes equivalent.
"""
if not isinstance(data, six.string_types):
if six.PY2:
raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
if six.PY2:
if isinstance(data, unicode):
data = data.encode("utf8")
return data.decode("string-escape")
# This one is difficult - we use an undocumented Python API here
# as per http://stackoverflow.com/a/23151714/934719
return codecs.escape_decode(data)[0]

View File

@ -7,6 +7,7 @@ import warnings
import six
from netlib import tcp
from netlib import strutils
from netlib import utils
from netlib import human
from netlib.websockets import protocol
@ -254,7 +255,7 @@ class Frame(object):
def __repr__(self):
ret = repr(self.header)
if self.payload:
ret = ret + "\nPayload:\n" + utils.clean_bin(self.payload).decode("ascii")
ret = ret + "\nPayload:\n" + strutils.clean_bin(self.payload).decode("ascii")
return ret
def human_readable(self):

View File

@ -6,7 +6,7 @@ import six
from io import BytesIO
from six.moves import urllib
from netlib import http, tcp, utils
from netlib import http, tcp, strutils
class ClientConn(object):
@ -54,38 +54,38 @@ class WSGIAdaptor(object):
self.app, self.domain, self.port, self.sversion = app, domain, port, sversion
def make_environ(self, flow, errsoc, **extra):
path = utils.native(flow.request.path, "latin-1")
path = strutils.native(flow.request.path, "latin-1")
if '?' in path:
path_info, query = utils.native(path, "latin-1").split('?', 1)
path_info, query = strutils.native(path, "latin-1").split('?', 1)
else:
path_info = path
query = ''
environ = {
'wsgi.version': (1, 0),
'wsgi.url_scheme': utils.native(flow.request.scheme, "latin-1"),
'wsgi.url_scheme': strutils.native(flow.request.scheme, "latin-1"),
'wsgi.input': BytesIO(flow.request.content or b""),
'wsgi.errors': errsoc,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
'SERVER_SOFTWARE': self.sversion,
'REQUEST_METHOD': utils.native(flow.request.method, "latin-1"),
'REQUEST_METHOD': strutils.native(flow.request.method, "latin-1"),
'SCRIPT_NAME': '',
'PATH_INFO': urllib.parse.unquote(path_info),
'QUERY_STRING': query,
'CONTENT_TYPE': utils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
'CONTENT_LENGTH': utils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
'CONTENT_TYPE': strutils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
'CONTENT_LENGTH': strutils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
'SERVER_NAME': self.domain,
'SERVER_PORT': str(self.port),
'SERVER_PROTOCOL': utils.native(flow.request.http_version, "latin-1"),
'SERVER_PROTOCOL': strutils.native(flow.request.http_version, "latin-1"),
}
environ.update(extra)
if flow.client_conn.address:
environ["REMOTE_ADDR"] = utils.native(flow.client_conn.address.host, "latin-1")
environ["REMOTE_ADDR"] = strutils.native(flow.client_conn.address.host, "latin-1")
environ["REMOTE_PORT"] = flow.client_conn.address.port
for key, value in flow.request.headers.items():
key = 'HTTP_' + utils.native(key, "latin-1").upper().replace('-', '_')
key = 'HTTP_' + strutils.native(key, "latin-1").upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
return environ
@ -139,7 +139,7 @@ class WSGIAdaptor(object):
elif state["status"]:
raise AssertionError('Response already started')
state["status"] = status
state["headers"] = http.Headers([[utils.always_bytes(k), utils.always_bytes(v)] for k, v in headers])
state["headers"] = http.Headers([[strutils.always_bytes(k), strutils.always_bytes(v)] for k, v in headers])
if exc_info:
self.error_page(soc, state["headers_sent"], traceback.format_tb(exc_info[2]))
state["headers_sent"] = True

View File

@ -3,8 +3,9 @@ import pprint
import io
import copy
from flask import Flask, jsonify, render_template, request, abort, make_response
from . import version, language, utils
from . import version, language
from netlib.http import user_agents
from netlib import strutils
logging.basicConfig(level="DEBUG")
EXAMPLE_HOST = "example.com"
@ -166,7 +167,7 @@ def make_app(noapi, debug):
settings.websocket_key = EXAMPLE_WEBSOCKET_KEY
language.serve(safe, s, settings)
args["output"] = utils.escape_unprintables(s.getvalue())
args["output"] = strutils.bytes_to_escaped_str(s.getvalue())
return render(template, False, **args)
@app.route('/response_preview')

View File

@ -5,7 +5,7 @@ import pyparsing as pp
import six
from six.moves import reduce
from netlib.utils import escaped_str_to_bytes, bytes_to_escaped_str
from netlib import strutils
from netlib import human
from . import generators, exceptions
@ -110,7 +110,7 @@ class Token(object):
class _TokValueLiteral(Token):
def __init__(self, val):
self.val = escaped_str_to_bytes(val)
self.val = strutils.escaped_str_to_bytes(val)
def get_generator(self, settings_):
return self.val
@ -135,7 +135,7 @@ class TokValueLiteral(_TokValueLiteral):
return v
def spec(self):
inner = bytes_to_escaped_str(self.val)
inner = strutils.bytes_to_escaped_str(self.val)
inner = inner.replace(r"\'", r"\x27")
return "'" + inner + "'"
@ -148,7 +148,7 @@ class TokValueNakedLiteral(_TokValueLiteral):
return e.setParseAction(lambda x: cls(*x))
def spec(self):
return bytes_to_escaped_str(self.val)
return strutils.bytes_to_escaped_str(self.val)
class TokValueGenerate(Token):
@ -166,7 +166,7 @@ class TokValueGenerate(Token):
def freeze(self, settings):
g = self.get_generator(settings)
return TokValueLiteral(bytes_to_escaped_str(g[:]))
return TokValueLiteral(strutils.bytes_to_escaped_str(g[:]))
@classmethod
def expr(cls):
@ -226,7 +226,7 @@ class TokValueFile(Token):
return generators.FileGenerator(s)
def spec(self):
return "<'%s'" % bytes_to_escaped_str(self.path)
return "<'%s'" % strutils.bytes_to_escaped_str(self.path)
TokValue = pp.MatchFirst(
@ -578,4 +578,4 @@ class NestedMessage(Token):
def freeze(self, settings):
f = self.parsed.freeze(settings).spec()
return self.__class__(TokValueLiteral(bytes_to_escaped_str(f)))
return self.__class__(TokValueLiteral(strutils.bytes_to_escaped_str(f)))

View File

@ -2,9 +2,7 @@ import datetime
import six
import netlib.utils
import netlib.tcp
import netlib.http
from netlib import strutils
TIMEFMT = '%d-%m-%y %H:%M:%S'
@ -62,10 +60,10 @@ class LogCtx(object):
def dump(self, data, hexdump):
if hexdump:
for line in netlib.utils.hexdump(data):
for line in strutils.hexdump(data):
self("\t%s %s %s" % line)
else:
for i in netlib.utils.clean_bin(data).split("\n"):
for i in strutils.clean_bin(data).split("\n"):
self("\t%s" % i)
def __call__(self, line):

View File

@ -18,14 +18,19 @@ from netlib.exceptions import HttpException, TcpDisconnect, TcpTimeout, TlsExcep
NetlibException
from netlib.http import http1, http2
from . import utils, log, language
from . import log, language
import logging
from netlib.tutils import treq
from netlib import strutils
logging.getLogger("hpack").setLevel(logging.WARNING)
def xrepr(s):
return repr(s)[1:-1]
class PathocError(Exception):
pass
@ -423,7 +428,7 @@ class Pathoc(tcp.TCPClient):
finally:
if resp:
lg("<< %s %s: %s bytes" % (
resp.status_code, utils.xrepr(resp.reason), len(resp.content)
resp.status_code, strutils.bytes_to_escaped_str(resp.reason), len(resp.content)
))
if resp.status_code in self.ignorecodes:
lg.suppress()

View File

@ -2,8 +2,6 @@ import os
import sys
import netlib.utils
from netlib.utils import bytes_to_escaped_str
class MemBool(object):
@ -28,22 +26,6 @@ def parse_anchor_spec(s):
return tuple(s.split("=", 1))
def xrepr(s):
return repr(s)[1:-1]
def escape_unprintables(s):
"""
Like inner_repr, but preserves line breaks.
"""
s = s.replace(b"\r\n", b"PATHOD_MARKER_RN")
s = s.replace(b"\n", b"PATHOD_MARKER_N")
s = bytes_to_escaped_str(s)
s = s.replace("PATHOD_MARKER_RN", "\n")
s = s.replace("PATHOD_MARKER_N", "\n")
return s
data = netlib.utils.Data(__name__)

View File

@ -13,25 +13,6 @@ def test_format_timestamp_with_milli():
assert utils.format_timestamp_with_milli(utils.timestamp())
def test_isBin():
assert not utils.isBin("testing\n\r")
assert utils.isBin("testing\x01")
assert utils.isBin("testing\x0e")
assert utils.isBin("testing\x7f")
def test_isXml():
assert not utils.isXML("foo")
assert utils.isXML("<foo")
assert utils.isXML(" \n<foo")
def test_clean_hanging_newline():
s = "foo\n"
assert utils.clean_hanging_newline(s) == "foo"
assert utils.clean_hanging_newline("foo") == "foo"
def test_pkg_data():
assert utils.pkg_data.path("console")
tutils.raises("does not exist", utils.pkg_data.path, "nonexistent")

View File

@ -0,0 +1,63 @@
# coding=utf-8
from netlib import strutils
def test_clean_bin():
assert strutils.clean_bin(b"one") == b"one"
assert strutils.clean_bin(b"\00ne") == b".ne"
assert strutils.clean_bin(b"\nne") == b"\nne"
assert strutils.clean_bin(b"\nne", False) == b".ne"
assert strutils.clean_bin(u"\u2605".encode("utf8")) == b"..."
assert strutils.clean_bin(u"one") == u"one"
assert strutils.clean_bin(u"\00ne") == u".ne"
assert strutils.clean_bin(u"\nne") == u"\nne"
assert strutils.clean_bin(u"\nne", False) == u".ne"
assert strutils.clean_bin(u"\u2605") == u"\u2605"
def test_safe_subn():
assert strutils.safe_subn("foo", u"bar", "\xc2foo")
def test_bytes_to_escaped_str():
assert strutils.bytes_to_escaped_str(b"foo") == "foo"
assert strutils.bytes_to_escaped_str(b"\b") == r"\x08"
assert strutils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
assert strutils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
assert strutils.bytes_to_escaped_str(b"'") == r"\'"
assert strutils.bytes_to_escaped_str(b'"') == r'"'
def test_escaped_str_to_bytes():
assert strutils.escaped_str_to_bytes("foo") == b"foo"
assert strutils.escaped_str_to_bytes("\x08") == b"\b"
assert strutils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
assert strutils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
assert strutils.escaped_str_to_bytes(u"\\x08") == b"\b"
assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
assert strutils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
def test_isBin():
assert not strutils.isBin("testing\n\r")
assert strutils.isBin("testing\x01")
assert strutils.isBin("testing\x0e")
assert strutils.isBin("testing\x7f")
def test_isXml():
assert not strutils.isXML("foo")
assert strutils.isXML("<foo")
assert strutils.isXML(" \n<foo")
def test_clean_hanging_newline():
s = "foo\n"
assert strutils.clean_hanging_newline(s) == "foo"
assert strutils.clean_hanging_newline("foo") == "foo"
def test_hexdump():
assert list(strutils.hexdump(b"one\0" * 10))

View File

@ -10,44 +10,3 @@ def test_bidi():
assert b.get_name(5) is None
tutils.raises(AttributeError, getattr, b, "c")
tutils.raises(ValueError, utils.BiDi, one=1, two=1)
def test_hexdump():
assert list(utils.hexdump(b"one\0" * 10))
def test_clean_bin():
assert utils.clean_bin(b"one") == b"one"
assert utils.clean_bin(b"\00ne") == b".ne"
assert utils.clean_bin(b"\nne") == b"\nne"
assert utils.clean_bin(b"\nne", False) == b".ne"
assert utils.clean_bin(u"\u2605".encode("utf8")) == b"..."
assert utils.clean_bin(u"one") == u"one"
assert utils.clean_bin(u"\00ne") == u".ne"
assert utils.clean_bin(u"\nne") == u"\nne"
assert utils.clean_bin(u"\nne", False) == u".ne"
assert utils.clean_bin(u"\u2605") == u"\u2605"
def test_safe_subn():
assert utils.safe_subn("foo", u"bar", "\xc2foo")
def test_bytes_to_escaped_str():
assert utils.bytes_to_escaped_str(b"foo") == "foo"
assert utils.bytes_to_escaped_str(b"\b") == r"\x08"
assert utils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
assert utils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
assert utils.bytes_to_escaped_str(b"'") == r"\'"
assert utils.bytes_to_escaped_str(b'"') == r'"'
def test_escaped_str_to_bytes():
assert utils.escaped_str_to_bytes("foo") == b"foo"
assert utils.escaped_str_to_bytes("\x08") == b"\b"
assert utils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
assert utils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
assert utils.escaped_str_to_bytes(u"\\x08") == b"\b"
assert utils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
assert utils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'

View File

@ -1,8 +1,6 @@
from pathod import utils
import tutils
import six
def test_membool():
m = utils.MemBool()
@ -20,12 +18,3 @@ def test_parse_anchor_spec():
def test_data_path():
tutils.raises(ValueError, utils.data.path, "nonexistent")
def test_escape_unprintables():
s = bytes(range(256))
if six.PY2:
s = "".join([chr(i) for i in range(255)])
e = utils.escape_unprintables(s)
assert e.encode('ascii')
assert "PATHOD_MARKER" not in e