benchmark: some improvements - limit to queue size

This commit is contained in:
madt1m 2018-07-23 21:12:53 +02:00
parent 8ab82ad9a3
commit e727446f14

48
test/bench/serialization-bm.py Executable file → Normal file
View File

@ -1,6 +1,8 @@
import tempfile
import asyncio import asyncio
import typing
import time import time
import os
from statistics import mean from statistics import mean
from mitmproxy import ctx from mitmproxy import ctx
@ -16,18 +18,20 @@ class StreamTester:
""" """
def __init__(self): def __init__(self):
self.loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=self.loop)
self.dbh = None self.dbh = None
self.streaming = False self.streaming = False
self.tf = None self.tf = None
self.out = None self.out = None
self.hot_flows = [] self.hot_flows = []
self.results = [] self.results = []
self._fflushes = 0 self._flushes = 0
self._stream_period = 0.001 self._stream_period = 0.001
self._flush_period = 3.0 self._flush_period = 3.0
self._flush_rate = 150 self._flush_rate = 150
self._target = 2000
self.loop = asyncio.get_event_loop()
self.queue = asyncio.Queue(maxsize=self._flush_rate * 3, loop=self.loop)
self.temp = tempfile.NamedTemporaryFile()
def load(self, loader): def load(self, loader):
loader.add_option( loader.add_option(
@ -38,21 +42,30 @@ class StreamTester:
) )
loader.add_option( loader.add_option(
"benchmark_save_path", "benchmark_save_path",
str, typing.Optional[str],
"/tmp/stats", None,
"Destination for the stats result file" "Destination for the stats result file"
) )
def _log(self, msg):
if self.out:
self.out.write(msg + '\n')
else:
ctx.log(msg)
def running(self): def running(self):
if not self.streaming: if not self.streaming:
ctx.log("<== Serialization Benchmark Enabled ==>") ctx.log("<== Serialization Benchmark Enabled ==>")
self.tf = tflow.tflow() self.tf = tflow.tflow()
self.tf.request.content = b'A' * ctx.options.testflow_size self.tf.request.content = b'A' * ctx.options.testflow_size
ctx.log(f"With content size: {len(self.tf.request.content)} B") ctx.log(f"With content size: {len(self.tf.request.content)} B")
self.dbh = db.DBHandler("/tmp/temp.sqlite", mode='write') if ctx.options.benchmark_save_path:
ctx.log(f"Storing results to {ctx.options.benchmark_save_path}")
self.out = open(ctx.options.benchmark_save_path, "w") self.out = open(ctx.options.benchmark_save_path, "w")
self.dbh = db.DBHandler(self.temp.name, mode='write')
self.streaming = True self.streaming = True
self.loop.create_task(asyncio.gather(self.writer(), self.stream(), self.stats())) tasks = (self.stream, self.writer, self.stats)
self.loop.create_task(asyncio.gather(*(t() for t in tasks)))
async def stream(self): async def stream(self):
while True: while True:
@ -65,7 +78,7 @@ class StreamTester:
count = 1 count = 1
f = await self.queue.get() f = await self.queue.get()
self.hot_flows.append(f) self.hot_flows.append(f)
while not self.queue.empty() and count < self._flush_rate: while count < self._flush_rate:
try: try:
self.hot_flows.append(self.queue.get_nowait()) self.hot_flows.append(self.queue.get_nowait())
count += 1 count += 1
@ -74,20 +87,21 @@ class StreamTester:
start = time.perf_counter() start = time.perf_counter()
n = self._fflush() n = self._fflush()
end = time.perf_counter() end = time.perf_counter()
self.out.write(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}\n") self._log(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}")
self.results.append(n / (end - start)) self.results.append(n / (end - start))
self._fflushes += 1 self._flushes += n
ctx.log(f"Flushes: {self._fflushes}") self._log(f"Flows dumped: {self._flushes}")
ctx.log(f"Progress: {min(100.0, 100.0 * (self._flushes / self._target))}%")
async def stats(self): async def stats(self):
while True: while True:
await asyncio.sleep(1.0) await asyncio.sleep(1.0)
if self._fflushes == 21: if self._flushes >= self._target:
self.out.write(f"AVG : {mean(self.results)}\n") self._log(f"AVG : {mean(self.results)}")
ctx.log(f"<== Benchmark Ended. Collect results at {ctx.options.benchmark_save_path} ==>") ctx.log(f"<== Benchmark Ended. Shutting down... ==>")
if self.out:
self.out.close() self.out.close()
del self.dbh self.temp.close()
os.remove("/tmp/temp.sqlite")
ctx.master.shutdown() ctx.master.shutdown()
def _fflush(self): def _fflush(self):