Nagram/TMessagesProj/jni/tgnet/Connection.cpp

391 lines
14 KiB
C++
Raw Normal View History

2015-09-24 20:52:02 +00:00
/*
* This is the source code of tgnet library v. 1.0
* It is licensed under GNU GPL v. 2 or later.
* You should have received a copy of the license in this archive (see LICENSE).
*
* Copyright Nikolai Kudashov, 2015.
*/
#include "Connection.h"
#include "ConnectionsManager.h"
#include "BuffersStorage.h"
#include "FileLog.h"
#include "Timer.h"
#include "Datacenter.h"
#include "NativeByteBuffer.h"
static uint32_t lastConnectionToken = 1;
Connection::Connection(Datacenter *datacenter, ConnectionType type) {
currentDatacenter = datacenter;
connectionType = type;
genereateNewSessionId();
connectionState = TcpConnectionStageIdle;
reconnectTimer = new Timer([&] {
reconnectTimer->stop();
connect();
});
}
Connection::~Connection() {
if (reconnectTimer != nullptr) {
reconnectTimer->stop();
delete reconnectTimer;
reconnectTimer = nullptr;
}
}
void Connection::suspendConnection() {
reconnectTimer->stop();
if (connectionState == TcpConnectionStageIdle || connectionState == TcpConnectionStageSuspended) {
return;
}
DEBUG_D("connection(%p, dc%u, type %d) suspend", this, currentDatacenter->getDatacenterId(), connectionType);
connectionState = TcpConnectionStageSuspended;
dropConnection();
ConnectionsManager::getInstance().onConnectionClosed(this);
firstPacketSent = false;
if (restOfTheData != nullptr) {
restOfTheData->reuse();
restOfTheData = nullptr;
}
lastPacketLength = 0;
connectionToken = 0;
wasConnected = false;
}
void Connection::onReceivedData(NativeByteBuffer *buffer) {
failedConnectionCount = 0;
NativeByteBuffer *parseLaterBuffer = nullptr;
if (restOfTheData != nullptr) {
if (lastPacketLength == 0) {
if (restOfTheData->capacity() - restOfTheData->position() >= buffer->limit()) {
restOfTheData->limit(restOfTheData->position() + buffer->limit());
restOfTheData->writeBytes(buffer);
buffer = restOfTheData;
} else {
NativeByteBuffer *newBuffer = BuffersStorage::getInstance().getFreeBuffer(restOfTheData->limit() + buffer->limit());
restOfTheData->rewind();
newBuffer->writeBytes(restOfTheData);
newBuffer->writeBytes(buffer);
buffer = newBuffer;
restOfTheData->reuse();
restOfTheData = newBuffer;
}
} else {
uint32_t len;
if (lastPacketLength - restOfTheData->position() <= buffer->limit()) {
len = lastPacketLength - restOfTheData->position();
} else {
len = buffer->limit();
}
uint32_t oldLimit = buffer->limit();
buffer->limit(len);
restOfTheData->writeBytes(buffer);
buffer->limit(oldLimit);
if (restOfTheData->position() == lastPacketLength) {
parseLaterBuffer = buffer->hasRemaining() ? buffer : nullptr;
buffer = restOfTheData;
} else {
return;
}
}
}
buffer->rewind();
while (buffer->hasRemaining()) {
if (!hasSomeDataSinceLastConnect) {
currentDatacenter->storeCurrentAddressAndPortNum();
isTryingNextPort = false;
if (connectionType == ConnectionTypePush) {
setTimeout(60 * 15);
} else {
setTimeout(25);
}
}
hasSomeDataSinceLastConnect = true;
uint32_t currentPacketLength = 0;
uint32_t mark = buffer->position();
uint8_t fByte = buffer->readByte(nullptr);
if ((fByte & (1 << 7)) != 0) {
buffer->position(mark);
if (buffer->remaining() < 4) {
NativeByteBuffer *reuseLater = restOfTheData;
restOfTheData = BuffersStorage::getInstance().getFreeBuffer(16384);
restOfTheData->writeBytes(buffer);
restOfTheData->limit(restOfTheData->position());
lastPacketLength = 0;
if (reuseLater != nullptr) {
reuseLater->reuse();
}
break;
}
int32_t ackId = buffer->readBigInt32(nullptr) & (~(1 << 31));
ConnectionsManager::getInstance().onConnectionQuickAckReceived(this, ackId);
continue;
}
if (fByte != 0x7f) {
currentPacketLength = ((uint32_t) fByte) * 4;
} else {
buffer->position(mark);
if (buffer->remaining() < 4) {
if (restOfTheData == nullptr || (restOfTheData != nullptr && restOfTheData->position() != 0)) {
NativeByteBuffer *reuseLater = restOfTheData;
restOfTheData = BuffersStorage::getInstance().getFreeBuffer(16384);
restOfTheData->writeBytes(buffer);
restOfTheData->limit(restOfTheData->position());
lastPacketLength = 0;
if (reuseLater != nullptr) {
reuseLater->reuse();
}
} else {
restOfTheData->position(restOfTheData->limit());
}
break;
}
currentPacketLength = ((uint32_t) buffer->readInt32(nullptr) >> 8) * 4;
}
if (currentPacketLength % 4 != 0 || currentPacketLength > 2 * 1024 * 1024) {
DEBUG_E("connection(%p, dc%u, type %d) received invalid packet length", this, currentDatacenter->getDatacenterId(), connectionType);
reconnect();
return;
}
if (currentPacketLength < buffer->remaining()) {
DEBUG_E("connection(%p, dc%u, type %d) received message len %u but packet larger %u", this, currentDatacenter->getDatacenterId(), connectionType, currentPacketLength, buffer->remaining());
} else if (currentPacketLength == buffer->remaining()) {
DEBUG_E("connection(%p, dc%u, type %d) received message len %u equal to packet size", this, currentDatacenter->getDatacenterId(), connectionType, currentPacketLength);
} else {
DEBUG_E("connection(%p, dc%u, type %d) received packet size less(%u) then message size(%u)", this, currentDatacenter->getDatacenterId(), connectionType, buffer->remaining(), currentPacketLength);
NativeByteBuffer *reuseLater = nullptr;
uint32_t len = currentPacketLength + (fByte != 0x7f ? 1 : 4);
if (restOfTheData != nullptr && restOfTheData->capacity() < len) {
reuseLater = restOfTheData;
restOfTheData = nullptr;
}
if (restOfTheData == nullptr) {
buffer->position(mark);
restOfTheData = BuffersStorage::getInstance().getFreeBuffer(len);
restOfTheData->writeBytes(buffer);
} else {
restOfTheData->position(restOfTheData->limit());
restOfTheData->limit(len);
}
lastPacketLength = len;
if (reuseLater != nullptr) {
reuseLater->reuse();
}
return;
}
uint32_t old = buffer->limit();
buffer->limit(buffer->position() + currentPacketLength);
ConnectionsManager::getInstance().onConnectionDataReceived(this, buffer, currentPacketLength);
buffer->position(buffer->limit());
buffer->limit(old);
if (restOfTheData != nullptr) {
if ((lastPacketLength != 0 && restOfTheData->position() == lastPacketLength) || (lastPacketLength == 0 && !restOfTheData->hasRemaining())) {
restOfTheData->reuse();
restOfTheData = nullptr;
} else {
DEBUG_E("compact occured");
restOfTheData->compact();
restOfTheData->limit(restOfTheData->position());
restOfTheData->position(0);
}
}
if (parseLaterBuffer != nullptr) {
buffer = parseLaterBuffer;
parseLaterBuffer = nullptr;
}
}
}
void Connection::connect() {
if (!ConnectionsManager::getInstance().isNetworkAvailable()) {
ConnectionsManager::getInstance().onConnectionClosed(this);
return;
}
if ((connectionState == TcpConnectionStageConnected || connectionState == TcpConnectionStageConnecting)) {
return;
}
connectionState = TcpConnectionStageConnecting;
bool ipv6 = ConnectionsManager::getInstance().isIpv6Enabled();
if (connectionType == ConnectionTypeDownload) {
currentAddressFlags = 2;
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags | (ipv6 ? 1 : 0));
if (hostAddress.empty()) {
currentAddressFlags = 0;
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags | (ipv6 ? 1 : 0));
}
if (hostAddress.empty() && ipv6) {
currentAddressFlags = 2;
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags);
if (hostAddress.empty()) {
currentAddressFlags = 0;
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags);
}
}
} else {
currentAddressFlags = 0;
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags | (ipv6 ? 1 : 0));
if (ipv6 && hostAddress.empty()) {
hostAddress = currentDatacenter->getCurrentAddress(currentAddressFlags);
}
}
hostPort = (uint16_t) currentDatacenter->getCurrentPort(currentAddressFlags);
reconnectTimer->stop();
DEBUG_D("connection(%p, dc%u, type %d) connecting (%s:%hu)", this, currentDatacenter->getDatacenterId(), connectionType, hostAddress.c_str(), hostPort);
firstPacketSent = false;
if (restOfTheData != nullptr) {
restOfTheData->reuse();
restOfTheData = nullptr;
}
lastPacketLength = 0;
wasConnected = false;
hasSomeDataSinceLastConnect = false;
openConnection(hostAddress, hostPort, ipv6);
if (connectionType == ConnectionTypePush) {
if (isTryingNextPort) {
setTimeout(20);
} else {
setTimeout(30);
}
} else {
if (isTryingNextPort) {
setTimeout(8);
} else {
setTimeout(15);
}
}
}
void Connection::reconnect() {
suspendConnection();
connectionState = TcpConnectionStageReconnecting;
connect();
}
void Connection::sendData(NativeByteBuffer *buff, bool reportAck) {
if (buff == nullptr) {
return;
}
buff->rewind();
if (connectionState == TcpConnectionStageIdle || connectionState == TcpConnectionStageReconnecting || connectionState == TcpConnectionStageSuspended) {
connect();
}
if (isDisconnected()) {
buff->reuse();
DEBUG_D("connection(%p, dc%u, type %d) disconnected, don't send data", this, currentDatacenter->getDatacenterId(), connectionType);
return;
}
uint32_t bufferLen = 0;
uint32_t packetLength = buff->limit() / 4;
if (packetLength < 0x7f) {
bufferLen++;
} else {
bufferLen += 4;
}
if (!firstPacketSent) {
bufferLen++;
}
NativeByteBuffer *buffer = BuffersStorage::getInstance().getFreeBuffer(bufferLen);
if (!firstPacketSent) {
buffer->writeByte(0xef);
firstPacketSent = true;
}
if (packetLength < 0x7f) {
if (reportAck) {
packetLength |= (1 << 7);
}
buffer->writeByte((uint8_t) packetLength);
} else {
packetLength = (packetLength << 8) + 0x7f;
if (reportAck) {
packetLength |= (1 << 7);
}
buffer->writeInt32(packetLength);
}
buffer->rewind();
writeBuffer(buffer);
buff->rewind();
writeBuffer(buff);
}
void Connection::onDisconnected(int reason) {
reconnectTimer->stop();
DEBUG_D("connection(%p, dc%u, type %d) disconnected with reason %d", this, currentDatacenter->getDatacenterId(), connectionType, reason);
bool switchToNextPort = wasConnected && !hasSomeDataSinceLastConnect && reason == 2;
firstPacketSent = false;
if (restOfTheData != nullptr) {
restOfTheData->reuse();
restOfTheData = nullptr;
}
connectionToken = 0;
lastPacketLength = 0;
wasConnected = false;
if (connectionState != TcpConnectionStageSuspended && connectionState != TcpConnectionStageIdle) {
connectionState = TcpConnectionStageIdle;
}
ConnectionsManager::getInstance().onConnectionClosed(this);
uint32_t datacenterId = currentDatacenter->getDatacenterId();
if (connectionState == TcpConnectionStageIdle && connectionType == ConnectionTypeGeneric && (datacenterId == ConnectionsManager::getInstance().currentDatacenterId || datacenterId == ConnectionsManager::getInstance().movingToDatacenterId)) {
connectionState = TcpConnectionStageReconnecting;
failedConnectionCount++;
if (failedConnectionCount == 1) {
if (hasSomeDataSinceLastConnect) {
willRetryConnectCount = 5;
} else {
willRetryConnectCount = 1;
}
}
if (ConnectionsManager::getInstance().isNetworkAvailable()) {
isTryingNextPort = true;
if (failedConnectionCount > willRetryConnectCount || switchToNextPort) {
currentDatacenter->nextAddressOrPort(currentAddressFlags);
failedConnectionCount = 0;
}
}
DEBUG_D("connection(%p, dc%u, type %d) reconnect %s:%hu", this, currentDatacenter->getDatacenterId(), connectionType, hostAddress.c_str(), hostPort);
reconnectTimer->setTimeout(1000, false);
reconnectTimer->start();
}
}
void Connection::onConnected() {
connectionState = TcpConnectionStageConnected;
connectionToken = lastConnectionToken++;
wasConnected = true;
DEBUG_D("connection(%p, dc%u, type %d) connected to %s:%hu", this, currentDatacenter->getDatacenterId(), connectionType, hostAddress.c_str(), hostPort);
ConnectionsManager::getInstance().onConnectionConnected(this);
}
Datacenter *Connection::getDatacenter() {
return currentDatacenter;
}
ConnectionType Connection::getConnectionType() {
return connectionType;
}
uint32_t Connection::getConnectionToken() {
return connectionToken;
}