Skip to content

Commit

Permalink
Reconnect fix.
Browse files Browse the repository at this point in the history
- Adds forceDisconnectFromHost() method to QAmqpClient to immediately disconnect underlying socket;
- Fixes issue when one calls disconnectFromHost while reconnectTimer is alive;
- Properly disconnects underlying socket before another connectToHost call when socket is already connected.
  • Loading branch information
dkormalev authored and mbroadst committed Jan 12, 2017
1 parent 1db89fd commit 2f52bf5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
46 changes: 35 additions & 11 deletions src/qamqpclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ void QAmqpClientPrivate::init()
initSocket();
heartbeatTimer = new QTimer(q);
QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat()));
reconnectTimer = new QTimer(q);
reconnectTimer->setSingleShot(true);
QObject::connect(reconnectTimer, SIGNAL(timeout()), q, SLOT(_q_connect()));

authenticator = QSharedPointer<QAmqpAuthenticator>(
new QAmqpPlainAuthenticator(QString::fromLatin1(AMQP_LOGIN), QString::fromLatin1(AMQP_PSWD)));
Expand Down Expand Up @@ -136,9 +139,13 @@ void QAmqpClientPrivate::parseConnectionString(const QString &uri)

void QAmqpClientPrivate::_q_connect()
{
if (reconnectTimer)
reconnectTimer->stop();
if (socket->state() != QAbstractSocket::UnconnectedState) {
qAmqpDebug() << Q_FUNC_INFO << "socket already connected, disconnecting..";
_q_disconnect();
// We need to explicitly close connection here because either way it will not be closed until we receive closeOk
closeConnection();
}

qAmqpDebug() << "connecting to host: " << host << ", port: " << port;
Expand All @@ -150,6 +157,8 @@ void QAmqpClientPrivate::_q_connect()

void QAmqpClientPrivate::_q_disconnect()
{
if (reconnectTimer)
reconnectTimer->stop();
if (socket->state() == QAbstractSocket::UnconnectedState) {
qAmqpDebug() << Q_FUNC_INFO << "already disconnected";
return;
Expand All @@ -162,10 +171,10 @@ void QAmqpClientPrivate::_q_disconnect()
// private slots
void QAmqpClientPrivate::_q_socketConnected()
{
if (reconnectTimer)
reconnectTimer->stop();
if(reconnectFixedTimeout == false)
{
timeout = 0;
}
char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
socket->write(header, 8);
}
Expand All @@ -188,7 +197,6 @@ void QAmqpClientPrivate::_q_heartbeat()

void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error)
{
Q_Q(QAmqpClient);
if(reconnectFixedTimeout == false)
{
if (timeout <= 0) {
Expand Down Expand Up @@ -223,9 +231,9 @@ void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error)

errorString = socket->errorString();

if (autoReconnect) {
if (autoReconnect && reconnectTimer) {
qAmqpDebug() << "trying to reconnect after: " << timeout << "ms";
QTimer::singleShot(timeout, q, SLOT(_q_connect()));
reconnectTimer->start(timeout);
}
}

Expand Down Expand Up @@ -336,6 +344,18 @@ void QAmqpClientPrivate::sendFrame(const QAmqpFrame &frame)
stream << frame;
}

void QAmqpClientPrivate::closeConnection()
{
qAmqpDebug("AMQP: closing connection");

connected = false;
if (reconnectTimer)
reconnectTimer->stop();
if (heartbeatTimer)
heartbeatTimer->stop();
socket->disconnectFromHost();
}

bool QAmqpClientPrivate::_q_method(const QAmqpMethodFrame &frame)
{
Q_ASSERT(frame.methodClass() == QAmqpFrame::Connection);
Expand Down Expand Up @@ -453,11 +473,7 @@ void QAmqpClientPrivate::closeOk(const QAmqpMethodFrame &frame)
{
Q_UNUSED(frame)
qAmqpDebug("-> connection#closeOk()");

connected = false;
if (heartbeatTimer)
heartbeatTimer->stop();
socket->disconnectFromHost();
closeConnection();
}

void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame)
Expand All @@ -482,7 +498,7 @@ void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame)

// if it was a force disconnect, simulate receiving a closeOk
if (checkError == QAMQP::ConnectionForcedError) {
closeOk(QAmqpMethodFrame());
closeConnection();
if (autoReconnect) {
qAmqpDebug() << "trying to reconnect after: " << timeout << "ms";
QTimer::singleShot(timeout, q, SLOT(_q_connect()));
Expand All @@ -499,6 +515,7 @@ void QAmqpClientPrivate::close(const QAmqpMethodFrame &frame)
QAmqpMethodFrame closeOkFrame(QAmqpFrame::Connection, QAmqpClientPrivate::miCloseOk);
qAmqpDebug("<- connection#closeOk()");
sendFrame(closeOkFrame);
closeConnection();
}

void QAmqpClientPrivate::startOk()
Expand Down Expand Up @@ -921,4 +938,11 @@ void QAmqpClient::disconnectFromHost()
d->_q_disconnect();
}

void QAmqpClient::forceDisconnectFromHost()
{
Q_D(QAmqpClient);
d->_q_disconnect();
d->closeConnection();
}

#include "moc_qamqpclient.cpp"
1 change: 1 addition & 0 deletions src/qamqpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class QAMQP_EXPORT QAmqpClient : public QObject
void connectToHost(const QString &uri = QString());
void connectToHost(const QHostAddress &address, quint16 port = AMQP_PORT);
void disconnectFromHost();
void forceDisconnectFromHost();

Q_SIGNALS:
void connected();
Expand Down
3 changes: 3 additions & 0 deletions src/qamqpclient_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class QAMQP_EXPORT QAmqpClientPrivate : public QAmqpMethodFrameHandler
void parseConnectionString(const QString &uri);
void sendFrame(const QAmqpFrame &frame);

void closeConnection();

// private slots
void _q_socketConnected();
void _q_socketDisconnected();
Expand Down Expand Up @@ -95,6 +97,7 @@ class QAMQP_EXPORT QAmqpClientPrivate : public QAmqpMethodFrameHandler
bool closed;
bool connected;
QPointer<QTimer> heartbeatTimer;
QPointer<QTimer> reconnectTimer;
QAmqpTable customProperties;
qint16 channelMax;
qint16 heartbeatDelay;
Expand Down

0 comments on commit 2f52bf5

Please sign in to comment.