2011-02-15 22:21:06 +00:00
|
|
|
from libmproxy import netstring
|
|
|
|
from cStringIO import StringIO
|
|
|
|
import libpry
|
|
|
|
|
|
|
|
|
|
|
|
class uNetstring(libpry.AutoTree):
|
|
|
|
def setUp(self):
|
|
|
|
self.test_data = "Netstring module by Will McGugan"
|
|
|
|
self.encoded_data = "9:Netstring,6:module,2:by,4:Will,7:McGugan,"
|
|
|
|
|
|
|
|
def test_header(self):
|
|
|
|
tests = [ ("netstring", "9:"),
|
|
|
|
("Will McGugan", "12:"),
|
|
|
|
("", "0:") ]
|
|
|
|
for test, result in tests:
|
|
|
|
assert netstring.header(test) == result
|
|
|
|
|
|
|
|
def test_file_encoder(self):
|
|
|
|
file_out = StringIO()
|
|
|
|
data = self.test_data.split()
|
|
|
|
encoder = netstring.FileEncoder(file_out)
|
|
|
|
for s in data:
|
|
|
|
encoder.write(s)
|
|
|
|
encoded_data = file_out.getvalue()
|
|
|
|
assert encoded_data == self.encoded_data
|
|
|
|
|
|
|
|
def test_decode_file(self):
|
|
|
|
data = self.test_data.split()
|
|
|
|
for buffer_size in range(1, len(self.encoded_data)):
|
|
|
|
file_in = StringIO(self.encoded_data[:])
|
|
|
|
decoded_data = list(netstring.decode_file(file_in, buffer_size = buffer_size))
|
|
|
|
assert decoded_data == data
|
|
|
|
|
|
|
|
def test_decoder(self):
|
|
|
|
encoded_data = self.encoded_data
|
|
|
|
for step in range(1, len(encoded_data)):
|
|
|
|
i = 0
|
|
|
|
chunks = []
|
|
|
|
while i < len(encoded_data):
|
|
|
|
chunks.append(encoded_data[i:i+step])
|
|
|
|
i += step
|
|
|
|
decoder = netstring.Decoder()
|
|
|
|
decoded_data = []
|
|
|
|
for chunk in chunks:
|
|
|
|
for s in decoder.feed(chunk):
|
|
|
|
decoded_data.append(s)
|
|
|
|
assert decoded_data == self.test_data.split()
|
|
|
|
|
2011-03-12 01:39:41 +00:00
|
|
|
def test_errors(self):
|
|
|
|
d = netstring.Decoder()
|
|
|
|
libpry.raises("Illegal digit", list, d.feed("1:foo"))
|
|
|
|
d = netstring.Decoder()
|
|
|
|
libpry.raises("Preceding zero", list, d.feed("01:f"))
|
|
|
|
d = netstring.Decoder(5)
|
|
|
|
libpry.raises("Maximum size", list, d.feed("500:f"))
|
|
|
|
d = netstring.Decoder()
|
|
|
|
libpry.raises("Illegal digit", list, d.feed(":f"))
|
|
|
|
|
2011-02-15 22:21:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tests = [
|
|
|
|
uNetstring()
|
|
|
|
]
|
|
|
|
|