websockets: more compact and legible human_readable

This commit is contained in:
Aldo Cortesi 2015-04-30 12:10:08 +12:00
parent 8086022920
commit 4dce7ee074
4 changed files with 55 additions and 28 deletions

View File

@ -70,11 +70,12 @@ def getbit(byte, offset):
class BiDi: class BiDi:
""" """
A wee utility class for keeping bi-directional mappings, like field A wee utility class for keeping bi-directional mappings, like field
constants in protocols: constants in protocols. Names are attributes on the object, dict-like
access maps values to names:
CONST = BiDi(a=1, b=2) CONST = BiDi(a=1, b=2)
assert CONST.a == 1 assert CONST.a == 1
assert CONST[1] == "a" assert CONST.get_name(1) == "a"
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.names = kwargs self.names = kwargs
@ -89,5 +90,21 @@ class BiDi:
return self.names[k] return self.names[k]
raise AttributeError("No such attribute: %s", k) raise AttributeError("No such attribute: %s", k)
def __getitem__(self, k): def get_name(self, n, default=None):
return self.values[k] return self.values.get(n, default)
def pretty_size(size):
suffixes = [
("B", 2**10),
("kB", 2**20),
("MB", 2**30),
]
for suf, lim in suffixes:
if size >= lim:
continue
else:
x = round(size/float(lim/2**10), 2)
if x == int(x):
x = int(x)
return str(x) + suf

View File

@ -162,16 +162,21 @@ class FrameHeader:
raise ValueError("Masking key must be 4 bytes.") raise ValueError("Masking key must be 4 bytes.")
def human_readable(self): def human_readable(self):
return "\n".join([ vals = [
("fin - " + str(self.fin)), "wf:",
("rsv1 - " + str(self.rsv1)), OPCODE.get_name(self.opcode, hex(self.opcode)).lower()
("rsv2 - " + str(self.rsv2)), ]
("rsv3 - " + str(self.rsv3)), flags = []
("opcode - " + str(self.opcode)), for i in ["fin", "rsv1", "rsv2", "rsv3", "mask"]:
("mask - " + str(self.mask)), if getattr(self, i):
("length_code - " + str(self.length_code)), flags.append(i)
("masking_key - " + repr(str(self.masking_key))), if flags:
]) vals.extend([":", "|".join(flags)])
if self.masking_key:
vals.append(":key=%s"%repr(self.masking_key))
if self.payload_length:
vals.append(" %s"%utils.pretty_size(self.payload_length))
return "".join(vals)
def to_bytes(self): def to_bytes(self):
first_byte = utils.setbit(0, 7, self.fin) first_byte = utils.setbit(0, 7, self.fin)
@ -308,17 +313,8 @@ class Frame(object):
return cls.from_file(io.BytesIO(bytestring)) return cls.from_file(io.BytesIO(bytestring))
def human_readable(self): def human_readable(self):
return "\n".join([ hdr = self.header.human_readable()
("fin - " + str(self.header.fin)), return hdr + "\n" + repr(self.payload)
("rsv1 - " + str(self.header.rsv1)),
("rsv2 - " + str(self.header.rsv2)),
("rsv3 - " + str(self.header.rsv3)),
("opcode - " + str(self.header.opcode)),
("mask - " + str(self.header.mask)),
("length_code - " + str(self.header.length_code)),
("masking_key - " + repr(str(self.header.masking_key))),
("payload - " + repr(str(self.payload))),
])
def to_bytes(self): def to_bytes(self):
""" """

View File

@ -5,9 +5,10 @@ import tutils
def test_bidi(): def test_bidi():
b = utils.BiDi(a=1, b=2) b = utils.BiDi(a=1, b=2)
assert b.a == 1 assert b.a == 1
assert b[1] == "a" assert b.get_name(1) == "a"
assert b.get_name(5) is None
tutils.raises(AttributeError, getattr, b, "c") tutils.raises(AttributeError, getattr, b, "c")
tutils.raises(KeyError, b.__getitem__, 5) tutils.raises(ValueError, utils.BiDi, one=1, two=1)
def test_hexdump(): def test_hexdump():
@ -19,3 +20,10 @@ def test_cleanBin():
assert utils.cleanBin("\00ne") == ".ne" assert utils.cleanBin("\00ne") == ".ne"
assert utils.cleanBin("\nne") == "\nne" assert utils.cleanBin("\nne") == "\nne"
assert utils.cleanBin("\nne", True) == ".ne" assert utils.cleanBin("\nne", True) == ".ne"
def test_pretty_size():
assert utils.pretty_size(100) == "100B"
assert utils.pretty_size(1024) == "1kB"
assert utils.pretty_size(1024 + (1024/2.0)) == "1.5kB"
assert utils.pretty_size(1024*1024) == "1MB"

View File

@ -185,7 +185,13 @@ class TestFrameHeader:
round(masking_key="test") round(masking_key="test")
def test_human_readable(self): def test_human_readable(self):
f = websockets.FrameHeader(masking_key="test", mask=False) f = websockets.FrameHeader(
masking_key="test",
fin=True,
payload_length=10
)
assert f.human_readable()
f = websockets.FrameHeader()
assert f.human_readable() assert f.human_readable()
def test_funky(self): def test_funky(self):