mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
add Client/Server.__str__
This commit is contained in:
parent
cca242a581
commit
9186c1aeb5
@ -8,6 +8,7 @@ from mitmproxy import certs
|
|||||||
from mitmproxy.coretypes import serializable
|
from mitmproxy.coretypes import serializable
|
||||||
from mitmproxy.net import server_spec
|
from mitmproxy.net import server_spec
|
||||||
from mitmproxy.options import Options
|
from mitmproxy.options import Options
|
||||||
|
from mitmproxy.utils import human
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
import mitmproxy.proxy.layer
|
import mitmproxy.proxy.layer
|
||||||
@ -90,7 +91,10 @@ class Connection(serializable.Serializable, metaclass=ABCMeta):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
attrs = repr({
|
attrs = repr({
|
||||||
k: {"cipher_list": lambda: f"<{len(v)} ciphers>"}.get(k, lambda: v)()
|
k: {
|
||||||
|
"cipher_list": lambda: f"<{len(v)} ciphers>",
|
||||||
|
"id": lambda: f"…{v[-6:]}"
|
||||||
|
}.get(k, lambda: v)()
|
||||||
for k, v in self.__dict__.items()
|
for k, v in self.__dict__.items()
|
||||||
})
|
})
|
||||||
return f"{type(self).__name__}({attrs})"
|
return f"{type(self).__name__}({attrs})"
|
||||||
@ -118,6 +122,15 @@ class Client(Connection):
|
|||||||
self.sockname = sockname
|
self.sockname = sockname
|
||||||
self.timestamp_start = timestamp_start
|
self.timestamp_start = timestamp_start
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
if self.alpn:
|
||||||
|
tls_state = f", alpn={self.alpn.decode(errors='replace')}"
|
||||||
|
elif self.tls_established:
|
||||||
|
tls_state = ", tls"
|
||||||
|
else:
|
||||||
|
tls_state = ""
|
||||||
|
return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})"
|
||||||
|
|
||||||
def get_state(self):
|
def get_state(self):
|
||||||
# Important: Retain full compatibility with old proxy core for now!
|
# Important: Retain full compatibility with old proxy core for now!
|
||||||
# This means we need to add all new fields to the old implementation.
|
# This means we need to add all new fields to the old implementation.
|
||||||
@ -191,7 +204,8 @@ class Client(Connection):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover
|
def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover
|
||||||
warnings.warn("Client.clientcert is deprecated, use Client.certificate_list instead.", DeprecationWarning, stacklevel=2)
|
warnings.warn("Client.clientcert is deprecated, use Client.certificate_list instead.", DeprecationWarning,
|
||||||
|
stacklevel=2)
|
||||||
if self.certificate_list:
|
if self.certificate_list:
|
||||||
return self.certificate_list[0]
|
return self.certificate_list[0]
|
||||||
else:
|
else:
|
||||||
@ -225,6 +239,19 @@ class Server(Connection):
|
|||||||
self.id = str(uuid.uuid4())
|
self.id = str(uuid.uuid4())
|
||||||
self.address = address
|
self.address = address
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
if self.alpn:
|
||||||
|
tls_state = f", alpn={self.alpn.decode(errors='replace')}"
|
||||||
|
elif self.tls_established:
|
||||||
|
tls_state = ", tls"
|
||||||
|
else:
|
||||||
|
tls_state = ""
|
||||||
|
if self.sockname:
|
||||||
|
local_port = f", src_port={self.sockname[1]}"
|
||||||
|
else:
|
||||||
|
local_port = ""
|
||||||
|
return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
|
||||||
|
|
||||||
def get_state(self):
|
def get_state(self):
|
||||||
return {
|
return {
|
||||||
'address': self.address,
|
'address': self.address,
|
||||||
@ -285,7 +312,8 @@ class Server(Connection):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def cert(self) -> Optional[certs.Cert]: # pragma: no cover
|
def cert(self) -> Optional[certs.Cert]: # pragma: no cover
|
||||||
warnings.warn("Server.cert is deprecated, use Server.certificate_list instead.", DeprecationWarning, stacklevel=2)
|
warnings.warn("Server.cert is deprecated, use Server.certificate_list instead.", DeprecationWarning,
|
||||||
|
stacklevel=2)
|
||||||
if self.certificate_list:
|
if self.certificate_list:
|
||||||
return self.certificate_list[0]
|
return self.certificate_list[0]
|
||||||
else:
|
else:
|
||||||
@ -293,7 +321,8 @@ class Server(Connection):
|
|||||||
|
|
||||||
@cert.setter
|
@cert.setter
|
||||||
def cert(self, val): # pragma: no cover
|
def cert(self, val): # pragma: no cover
|
||||||
warnings.warn("Server.cert is deprecated, use Server.certificate_list instead.", DeprecationWarning, stacklevel=2)
|
warnings.warn("Server.cert is deprecated, use Server.certificate_list instead.", DeprecationWarning,
|
||||||
|
stacklevel=2)
|
||||||
if val:
|
if val:
|
||||||
self.certificate_list = [val]
|
self.certificate_list = [val]
|
||||||
else:
|
else:
|
||||||
|
@ -36,6 +36,11 @@ class TestClient:
|
|||||||
1607780791
|
1607780791
|
||||||
)
|
)
|
||||||
assert repr(c)
|
assert repr(c)
|
||||||
|
assert str(c)
|
||||||
|
c.timestamp_tls_setup = 1607780791
|
||||||
|
assert str(c)
|
||||||
|
c.alpn = b"foo"
|
||||||
|
assert str(c) == "Client(127.0.0.1:52314, state=open, alpn=foo)"
|
||||||
|
|
||||||
def test_state(self):
|
def test_state(self):
|
||||||
c = tflow.tclient_conn()
|
c = tflow.tclient_conn()
|
||||||
@ -58,6 +63,12 @@ class TestServer:
|
|||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
s = context.Server(("address", 22))
|
s = context.Server(("address", 22))
|
||||||
assert repr(s)
|
assert repr(s)
|
||||||
|
assert str(s)
|
||||||
|
s.timestamp_tls_setup = 1607780791
|
||||||
|
assert str(s)
|
||||||
|
s.alpn = b"foo"
|
||||||
|
s.sockname = ("127.0.0.1", 54321)
|
||||||
|
assert str(s) == "Server(address:22, state=closed, alpn=foo, src_port=54321)"
|
||||||
|
|
||||||
def test_state(self):
|
def test_state(self):
|
||||||
c = tflow.tserver_conn()
|
c = tflow.tserver_conn()
|
||||||
|
Loading…
Reference in New Issue
Block a user