#include "v2/NativeNetworkingImpl.h" #include "p2p/base/basic_packet_socket_factory.h" #include "p2p/client/basic_port_allocator.h" #include "p2p/base/p2p_transport_channel.h" #include "p2p/base/basic_async_resolver_factory.h" #include "api/packet_socket_factory.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/rtc_certificate_generator.h" #include "p2p/base/ice_credentials_iterator.h" #include "api/jsep_ice_candidate.h" #include "p2p/base/dtls_transport.h" #include "p2p/base/dtls_transport_factory.h" #include "pc/dtls_srtp_transport.h" #include "pc/dtls_transport.h" #include "StaticThreads.h" namespace tgcalls { class TurnCustomizerImpl : public webrtc::TurnCustomizer { public: TurnCustomizerImpl() { } virtual ~TurnCustomizerImpl() { } void MaybeModifyOutgoingStunMessage(cricket::PortInterface* port, cricket::StunMessage* message) override { message->AddAttribute(std::make_unique(cricket::STUN_ATTR_SOFTWARE, "Telegram ")); } bool AllowChannelData(cricket::PortInterface* port, const void *data, size_t size, bool payload) override { return true; } }; class SctpDataChannelProviderInterfaceImpl : public sigslot::has_slots<>, public webrtc::SctpDataChannelProviderInterface, public webrtc::DataChannelObserver { public: SctpDataChannelProviderInterfaceImpl( cricket::DtlsTransport *transportChannel, bool isOutgoing, std::function onStateChanged, std::function onMessageReceived, std::shared_ptr threads ) : _threads(std::move(threads)), _onStateChanged(onStateChanged), _onMessageReceived(onMessageReceived) { assert(_threads->getNetworkThread()->IsCurrent()); _sctpTransportFactory.reset(new cricket::SctpTransportFactory(_threads->getNetworkThread())); _sctpTransport = _sctpTransportFactory->CreateSctpTransport(transportChannel); _sctpTransport->SignalReadyToSendData.connect(this, &SctpDataChannelProviderInterfaceImpl::sctpReadyToSendData); _sctpTransport->SignalDataReceived.connect(this, &SctpDataChannelProviderInterfaceImpl::sctpDataReceived); webrtc::InternalDataChannelInit dataChannelInit; dataChannelInit.id = 0; dataChannelInit.open_handshake_role = isOutgoing ? webrtc::InternalDataChannelInit::kOpener : webrtc::InternalDataChannelInit::kAcker; _dataChannel = webrtc::SctpDataChannel::Create( this, "data", dataChannelInit, _threads->getNetworkThread(), _threads->getNetworkThread() ); _dataChannel->RegisterObserver(this); } virtual ~SctpDataChannelProviderInterfaceImpl() { assert(_threads->getNetworkThread()->IsCurrent()); _dataChannel->UnregisterObserver(); _dataChannel->Close(); _dataChannel = nullptr; _sctpTransport = nullptr; _sctpTransportFactory.reset(); } void sendDataChannelMessage(std::string const &message) { assert(_threads->getNetworkThread()->IsCurrent()); if (_isDataChannelOpen) { RTC_LOG(LS_INFO) << "Outgoing DataChannel message: " << message; webrtc::DataBuffer buffer(message); _dataChannel->Send(buffer); } else { RTC_LOG(LS_INFO) << "Could not send an outgoing DataChannel message: the channel is not open"; } } virtual void OnStateChange() override { assert(_threads->getNetworkThread()->IsCurrent()); auto state = _dataChannel->state(); bool isDataChannelOpen = state == webrtc::DataChannelInterface::DataState::kOpen; if (_isDataChannelOpen != isDataChannelOpen) { _isDataChannelOpen = isDataChannelOpen; _onStateChanged(_isDataChannelOpen); } } virtual void OnMessage(const webrtc::DataBuffer& buffer) override { assert(_threads->getNetworkThread()->IsCurrent()); if (!buffer.binary) { std::string messageText(buffer.data.data(), buffer.data.data() + buffer.data.size()); RTC_LOG(LS_INFO) << "Incoming DataChannel message: " << messageText; _onMessageReceived(messageText); } } void updateIsConnected(bool isConnected) { assert(_threads->getNetworkThread()->IsCurrent()); if (isConnected) { if (!_isSctpTransportStarted) { _isSctpTransportStarted = true; _sctpTransport->Start(5000, 5000, 262144); } } } void sctpReadyToSendData() { assert(_threads->getNetworkThread()->IsCurrent()); _dataChannel->OnTransportReady(true); } void sctpDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { assert(_threads->getNetworkThread()->IsCurrent()); _dataChannel->OnDataReceived(params, buffer); } virtual bool SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) override { assert(_threads->getNetworkThread()->IsCurrent()); return _sctpTransport->SendData(params, payload); } virtual bool ConnectDataChannel(webrtc::SctpDataChannel *data_channel) override { assert(_threads->getNetworkThread()->IsCurrent()); return true; } virtual void DisconnectDataChannel(webrtc::SctpDataChannel* data_channel) override { assert(_threads->getNetworkThread()->IsCurrent()); return; } virtual void AddSctpDataStream(int sid) override { assert(_threads->getNetworkThread()->IsCurrent()); _sctpTransport->OpenStream(sid); } virtual void RemoveSctpDataStream(int sid) override { assert(_threads->getNetworkThread()->IsCurrent()); _threads->getNetworkThread()->Invoke(RTC_FROM_HERE, [this, sid]() { _sctpTransport->ResetStream(sid); }); } virtual bool ReadyToSendData() const override { assert(_threads->getNetworkThread()->IsCurrent()); return _sctpTransport->ReadyToSendData(); } private: std::shared_ptr _threads; std::function _onStateChanged; std::function _onMessageReceived; std::unique_ptr _sctpTransportFactory; std::unique_ptr _sctpTransport; rtc::scoped_refptr _dataChannel; bool _isSctpTransportStarted = false; bool _isDataChannelOpen = false; }; webrtc::CryptoOptions NativeNetworkingImpl::getDefaulCryptoOptions() { auto options = webrtc::CryptoOptions(); options.srtp.enable_aes128_sha1_80_crypto_cipher = true; options.srtp.enable_gcm_crypto_suites = true; return options; } NativeNetworkingImpl::NativeNetworkingImpl(Configuration &&configuration) : _threads(std::move(configuration.threads)), _isOutgoing(configuration.isOutgoing), _enableStunMarking(configuration.enableStunMarking), _enableTCP(configuration.enableTCP), _enableP2P(configuration.enableP2P), _rtcServers(configuration.rtcServers), _stateUpdated(std::move(configuration.stateUpdated)), _candidateGathered(std::move(configuration.candidateGathered)), _transportMessageReceived(std::move(configuration.transportMessageReceived)), _rtcpPacketReceived(std::move(configuration.rtcpPacketReceived)), _dataChannelStateUpdated(configuration.dataChannelStateUpdated), _dataChannelMessageReceived(configuration.dataChannelMessageReceived) { assert(_threads->getNetworkThread()->IsCurrent()); _localIceParameters = PeerIceParameters(rtc::CreateRandomString(cricket::ICE_UFRAG_LENGTH), rtc::CreateRandomString(cricket::ICE_PWD_LENGTH)); _localCertificate = rtc::RTCCertificateGenerator::GenerateCertificate(rtc::KeyParams(rtc::KT_ECDSA), absl::nullopt); _socketFactory.reset(new rtc::BasicPacketSocketFactory(_threads->getNetworkThread())); _networkManager = std::make_unique(); _asyncResolverFactory = std::make_unique(); _dtlsSrtpTransport = std::make_unique(true); _dtlsSrtpTransport->SetDtlsTransports(nullptr, nullptr); _dtlsSrtpTransport->SetActiveResetSrtpParams(false); _dtlsSrtpTransport->SignalReadyToSend.connect(this, &NativeNetworkingImpl::DtlsReadyToSend); _dtlsSrtpTransport->SignalRtpPacketReceived.connect(this, &NativeNetworkingImpl::RtpPacketReceived_n); _dtlsSrtpTransport->SignalRtcpPacketReceived.connect(this, &NativeNetworkingImpl::OnRtcpPacketReceived_n); resetDtlsSrtpTransport(); } NativeNetworkingImpl::~NativeNetworkingImpl() { assert(_threads->getNetworkThread()->IsCurrent()); RTC_LOG(LS_INFO) << "NativeNetworkingImpl::~NativeNetworkingImpl()"; _dtlsSrtpTransport.reset(); _dtlsTransport.reset(); _dataChannelInterface.reset(); _transportChannel.reset(); _asyncResolverFactory.reset(); _portAllocator.reset(); _networkManager.reset(); _socketFactory.reset(); } void NativeNetworkingImpl::resetDtlsSrtpTransport() { if (_enableStunMarking) { _turnCustomizer.reset(new TurnCustomizerImpl()); } _portAllocator.reset(new cricket::BasicPortAllocator(_networkManager.get(), _socketFactory.get(), _turnCustomizer.get(), nullptr)); uint32_t flags = _portAllocator->flags(); flags |= //cricket::PORTALLOCATOR_ENABLE_SHARED_SOCKET | cricket::PORTALLOCATOR_ENABLE_IPV6 | cricket::PORTALLOCATOR_ENABLE_IPV6_ON_WIFI; if (!_enableTCP) { flags |= cricket::PORTALLOCATOR_DISABLE_TCP; } if (!_enableP2P) { flags |= cricket::PORTALLOCATOR_DISABLE_UDP; flags |= cricket::PORTALLOCATOR_DISABLE_STUN; uint32_t candidateFilter = _portAllocator->candidate_filter(); candidateFilter &= ~(cricket::CF_REFLEXIVE); _portAllocator->SetCandidateFilter(candidateFilter); } _portAllocator->set_step_delay(cricket::kMinimumStepDelay); //TODO: figure out the proxy setup /*if (_proxy) { rtc::ProxyInfo proxyInfo; proxyInfo.type = rtc::ProxyType::PROXY_SOCKS5; proxyInfo.address = rtc::SocketAddress(_proxy->host, _proxy->port); proxyInfo.username = _proxy->login; proxyInfo.password = rtc::CryptString(TgCallsCryptStringImpl(_proxy->password)); _portAllocator->set_proxy("t/1.0", proxyInfo); }*/ _portAllocator->set_flags(flags); _portAllocator->Initialize(); cricket::ServerAddresses stunServers; std::vector turnServers; for (auto &server : _rtcServers) { if (server.isTurn) { turnServers.push_back(cricket::RelayServerConfig( rtc::SocketAddress(server.host, server.port), server.login, server.password, cricket::PROTO_UDP )); } else { rtc::SocketAddress stunAddress = rtc::SocketAddress(server.host, server.port); stunServers.insert(stunAddress); } } _portAllocator->SetConfiguration(stunServers, turnServers, 2, webrtc::NO_PRUNE, _turnCustomizer.get()); _transportChannel.reset(new cricket::P2PTransportChannel("transport", 0, _portAllocator.get(), _asyncResolverFactory.get(), nullptr)); cricket::IceConfig iceConfig; iceConfig.continual_gathering_policy = cricket::GATHER_CONTINUALLY; iceConfig.prioritize_most_likely_candidate_pairs = true; iceConfig.regather_on_failed_networks_interval = 8000; _transportChannel->SetIceConfig(iceConfig); cricket::IceParameters localIceParameters( _localIceParameters.ufrag, _localIceParameters.pwd, false ); _transportChannel->SetIceParameters(localIceParameters); _transportChannel->SetIceRole(_isOutgoing ? cricket::ICEROLE_CONTROLLING : cricket::ICEROLE_CONTROLLED); _transportChannel->SetRemoteIceMode(cricket::ICEMODE_FULL); _transportChannel->SignalCandidateGathered.connect(this, &NativeNetworkingImpl::candidateGathered); _transportChannel->SignalIceTransportStateChanged.connect(this, &NativeNetworkingImpl::transportStateChanged); _transportChannel->SignalReadPacket.connect(this, &NativeNetworkingImpl::transportPacketReceived); webrtc::CryptoOptions cryptoOptions = NativeNetworkingImpl::getDefaulCryptoOptions(); _dtlsTransport.reset(new cricket::DtlsTransport(_transportChannel.get(), cryptoOptions, nullptr)); _dtlsTransport->SignalWritableState.connect( this, &NativeNetworkingImpl::OnTransportWritableState_n); _dtlsTransport->SignalReceivingState.connect( this, &NativeNetworkingImpl::OnTransportReceivingState_n); _dtlsTransport->SetLocalCertificate(_localCertificate); _dtlsSrtpTransport->SetDtlsTransports(_dtlsTransport.get(), nullptr); } void NativeNetworkingImpl::start() { _transportChannel->MaybeStartGathering(); const auto weak = std::weak_ptr(shared_from_this()); _dataChannelInterface.reset(new SctpDataChannelProviderInterfaceImpl( _dtlsTransport.get(), _isOutgoing, [weak, threads = _threads](bool state) { assert(threads->getNetworkThread()->IsCurrent()); const auto strong = weak.lock(); if (!strong) { return; } strong->_dataChannelStateUpdated(state); }, [weak, threads = _threads](std::string const &message) { assert(threads->getNetworkThread()->IsCurrent()); const auto strong = weak.lock(); if (!strong) { return; } strong->_dataChannelMessageReceived(message); }, _threads )); } void NativeNetworkingImpl::stop() { _transportChannel->SignalCandidateGathered.disconnect(this); _transportChannel->SignalIceTransportStateChanged.disconnect(this); _transportChannel->SignalReadPacket.disconnect(this); _dtlsTransport->SignalWritableState.disconnect(this); _dtlsTransport->SignalReceivingState.disconnect(this); _dtlsSrtpTransport->SetDtlsTransports(nullptr, nullptr); _dataChannelInterface.reset(); _dtlsTransport.reset(); _transportChannel.reset(); _portAllocator.reset(); _localIceParameters = PeerIceParameters(rtc::CreateRandomString(cricket::ICE_UFRAG_LENGTH), rtc::CreateRandomString(cricket::ICE_PWD_LENGTH)); _localCertificate = rtc::RTCCertificateGenerator::GenerateCertificate(rtc::KeyParams(rtc::KT_ECDSA), absl::nullopt); resetDtlsSrtpTransport(); } PeerIceParameters NativeNetworkingImpl::getLocalIceParameters() { return _localIceParameters; } std::unique_ptr NativeNetworkingImpl::getLocalFingerprint() { auto certificate = _localCertificate; if (!certificate) { return nullptr; } return rtc::SSLFingerprint::CreateFromCertificate(*certificate); } void NativeNetworkingImpl::setRemoteParams(PeerIceParameters const &remoteIceParameters, rtc::SSLFingerprint *fingerprint, std::string const &sslSetup) { _remoteIceParameters = remoteIceParameters; cricket::IceParameters parameters( remoteIceParameters.ufrag, remoteIceParameters.pwd, false ); _transportChannel->SetRemoteIceParameters(parameters); if (sslSetup == "active") { _dtlsTransport->SetDtlsRole(rtc::SSLRole::SSL_SERVER); } else if (sslSetup == "passive") { _dtlsTransport->SetDtlsRole(rtc::SSLRole::SSL_CLIENT); } else { _dtlsTransport->SetDtlsRole(_isOutgoing ? rtc::SSLRole::SSL_CLIENT : rtc::SSLRole::SSL_SERVER); } if (fingerprint) { _dtlsTransport->SetRemoteFingerprint(fingerprint->algorithm, fingerprint->digest.data(), fingerprint->digest.size()); } } void NativeNetworkingImpl::addCandidates(std::vector const &candidates) { for (const auto &candidate : candidates) { _transportChannel->AddRemoteCandidate(candidate); } } void NativeNetworkingImpl::sendDataChannelMessage(std::string const &message) { if (_dataChannelInterface) { _dataChannelInterface->sendDataChannelMessage(message); } } webrtc::RtpTransport *NativeNetworkingImpl::getRtpTransport() { return _dtlsSrtpTransport.get(); } void NativeNetworkingImpl::checkConnectionTimeout() { const auto weak = std::weak_ptr(shared_from_this()); _threads->getNetworkThread()->PostDelayedTask(RTC_FROM_HERE, [weak]() { auto strong = weak.lock(); if (!strong) { return; } int64_t currentTimestamp = rtc::TimeMillis(); const int64_t maxTimeout = 20000; if (strong->_lastNetworkActivityMs + maxTimeout < currentTimestamp) { NativeNetworkingImpl::State emitState; emitState.isReadyToSendData = false; emitState.isFailed = true; strong->_stateUpdated(emitState); } strong->checkConnectionTimeout(); }, 1000); } void NativeNetworkingImpl::candidateGathered(cricket::IceTransportInternal *transport, const cricket::Candidate &candidate) { assert(_threads->getNetworkThread()->IsCurrent()); _candidateGathered(candidate); } void NativeNetworkingImpl::candidateGatheringState(cricket::IceTransportInternal *transport) { assert(_threads->getNetworkThread()->IsCurrent()); } void NativeNetworkingImpl::OnTransportWritableState_n(rtc::PacketTransportInternal *transport) { assert(_threads->getNetworkThread()->IsCurrent()); UpdateAggregateStates_n(); } void NativeNetworkingImpl::OnTransportReceivingState_n(rtc::PacketTransportInternal *transport) { assert(_threads->getNetworkThread()->IsCurrent()); UpdateAggregateStates_n(); } void NativeNetworkingImpl::DtlsReadyToSend(bool isReadyToSend) { UpdateAggregateStates_n(); if (isReadyToSend) { const auto weak = std::weak_ptr(shared_from_this()); _threads->getNetworkThread()->PostTask(RTC_FROM_HERE, [weak]() { const auto strong = weak.lock(); if (!strong) { return; } strong->UpdateAggregateStates_n(); }); } } void NativeNetworkingImpl::transportStateChanged(cricket::IceTransportInternal *transport) { UpdateAggregateStates_n(); } void NativeNetworkingImpl::transportReadyToSend(cricket::IceTransportInternal *transport) { assert(_threads->getNetworkThread()->IsCurrent()); } void NativeNetworkingImpl::transportPacketReceived(rtc::PacketTransportInternal *transport, const char *bytes, size_t size, const int64_t ×tamp, int unused) { assert(_threads->getNetworkThread()->IsCurrent()); _lastNetworkActivityMs = rtc::TimeMillis(); } void NativeNetworkingImpl::RtpPacketReceived_n(rtc::CopyOnWriteBuffer *packet, int64_t packet_time_us, bool isUnresolved) { if (_transportMessageReceived) { _transportMessageReceived(*packet, isUnresolved); } } void NativeNetworkingImpl::OnRtcpPacketReceived_n(rtc::CopyOnWriteBuffer *packet, int64_t packet_time_us) { if (_rtcpPacketReceived) { _rtcpPacketReceived(*packet, packet_time_us); } } void NativeNetworkingImpl::UpdateAggregateStates_n() { assert(_threads->getNetworkThread()->IsCurrent()); auto state = _transportChannel->GetIceTransportState(); bool isConnected = false; switch (state) { case webrtc::IceTransportState::kConnected: case webrtc::IceTransportState::kCompleted: isConnected = true; break; default: break; } if (!_dtlsSrtpTransport->IsWritable(false)) { isConnected = false; } if (_isConnected != isConnected) { _isConnected = isConnected; NativeNetworkingImpl::State emitState; emitState.isReadyToSendData = isConnected; _stateUpdated(emitState); if (_dataChannelInterface) { _dataChannelInterface->updateIsConnected(isConnected); } } } void NativeNetworkingImpl::sctpReadyToSendData() { } void NativeNetworkingImpl::sctpDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& buffer) { } } // namespace tgcalls