refs #380, use CWorker instead of QConcurrent for threaded reader (bookings/VATSIM file)

This commit is contained in:
Klaus Basan
2015-02-13 02:06:33 +01:00
parent 1da3f58fec
commit 5d491b9456
8 changed files with 189 additions and 157 deletions

View File

@@ -123,8 +123,8 @@ namespace BlackCore
void CContextNetwork::gracefulShutdown() void CContextNetwork::gracefulShutdown()
{ {
if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->quit(); } if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->requestStop(); this->m_vatsimBookingReader->quit(); }
if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->quit(); } if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->requestStop(); this->m_vatsimDataFileReader->quit(); }
if (this->isConnected()) { this->disconnectFromNetwork(); } if (this->isConnected()) { this->disconnectFromNetwork(); }
} }

View File

@@ -14,7 +14,6 @@
#include "vatsimbookingreader.h" #include "vatsimbookingreader.h"
#include <QtXml/QDomElement> #include <QtXml/QDomElement>
#include <QtConcurrent/QtConcurrent>
using namespace BlackMisc; using namespace BlackMisc;
using namespace BlackMisc::Aviation; using namespace BlackMisc::Aviation;
@@ -23,46 +22,46 @@ using namespace BlackMisc::Network;
namespace BlackCore namespace BlackCore
{ {
CVatsimBookingReader::CVatsimBookingReader(QObject *owner, const QString &url) : CVatsimBookingReader::CVatsimBookingReader(QObject *owner, const QString &url) :
CThreadedReader(owner), CThreadedReader(owner, "CVatsimBookingReader"),
m_serviceUrl(url), m_networkManager(nullptr) m_serviceUrl(url)
{ {
this->m_networkManager = new QNetworkAccessManager(this); this->m_networkManager = new QNetworkAccessManager(this);
this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimBookingReader::ps_loadFinished); this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimBookingReader::ps_parseBookings);
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::read); this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::ps_read);
} }
void CVatsimBookingReader::read() void CVatsimBookingReader::readInBackgroundThread()
{ {
if (QThread::currentThread() == QObject::thread())
{
ps_read();
}
else
{
bool s = QMetaObject::invokeMethod(this, "ps_read", Qt::BlockingQueuedConnection);
Q_ASSERT(s);
Q_UNUSED(s);
}
}
void CVatsimBookingReader::ps_read()
{
this->threadAssertCheck();
QUrl url(this->m_serviceUrl); QUrl url(this->m_serviceUrl);
if (url.isEmpty()) return; if (url.isEmpty()) return;
Q_ASSERT(this->m_networkManager); Q_ASSERT(this->m_networkManager);
QNetworkRequest request(url); QNetworkRequest request(url);
QNetworkReply *reply = this->m_networkManager->get(request); this->m_networkManager->get(request);
this->setPendingNetworkReply(reply);
} }
/* void CVatsimBookingReader::ps_parseBookings(QNetworkReply *nwReplyPtr)
* Bookings read from XML
*/
void CVatsimBookingReader::ps_loadFinished(QNetworkReply *nwReply)
{
this->setPendingNetworkReply(nullptr);
if (!this->isFinished())
{
QFuture<void> f = QtConcurrent::run(this, &CVatsimBookingReader::parseBookings, nwReply);
this->setPendingFuture(f);
}
}
/*
* Parse bookings
*/
void CVatsimBookingReader::parseBookings(QNetworkReply *nwReplyPtr)
{ {
// wrap pointer, make sure any exit cleans up reply // wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread // required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr); QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
this->threadAssertCheck();
// Worker thread, make sure to write no members here! // Worker thread, make sure to write no members here!
if (this->isFinished()) if (this->isFinished())
{ {
@@ -103,7 +102,7 @@ namespace BlackCore
if (this->isFinished()) if (this->isFinished())
{ {
CLogMessage(this).debug() << Q_FUNC_INFO; CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("terminated booking parsing process"); // for users CLogMessage(this).info("Terminated booking parsing process"); // for users
return; // stop, terminate straight away, ending thread return; // stop, terminate straight away, ending thread
} }
@@ -148,8 +147,8 @@ namespace BlackCore
} }
// time checks // time checks
QDateTime now = QDateTime::currentDateTimeUtc(); QDateTime now = QDateTime::currentDateTimeUtc();
if (now.msecsTo(bookedStation.getBookedUntilUtc()) < (1000 * 60 * 15)) continue; // until n mins in past if (now.msecsTo(bookedStation.getBookedUntilUtc()) < (1000 * 60 * 15)) { continue; } // until n mins in past
if (now.msecsTo(bookedStation.getBookedFromUtc()) > (1000 * 60 * 60 * 24)) continue; // to far in the future, n hours if (now.msecsTo(bookedStation.getBookedFromUtc()) > (1000 * 60 * 60 * 24)) { continue; } // to far in the future, n hours
bookedStation.setController(user); bookedStation.setController(user);
bookedStations.push_back(bookedStation); bookedStations.push_back(bookedStation);
} }

View File

@@ -25,7 +25,7 @@ namespace BlackCore
/*! /*!
* Read bookings from VATSIM * Read bookings from VATSIM
*/ */
class CVatsimBookingReader : public BlackMisc::CThreadedReader<void> class CVatsimBookingReader : public BlackMisc::CThreadedReader
{ {
Q_OBJECT Q_OBJECT
@@ -34,21 +34,21 @@ namespace BlackCore
explicit CVatsimBookingReader(QObject *owner, const QString &url); explicit CVatsimBookingReader(QObject *owner, const QString &url);
//! Read / re-read bookings //! Read / re-read bookings
void read(); void readInBackgroundThread();
private slots: private slots:
//! Bookings have been read //! Bookings have been read
void ps_loadFinished(QNetworkReply *nwReply); //! \threadsafe
void ps_parseBookings(QNetworkReply *nwReply);
//! Do reading
void ps_read();
private: private:
QString m_serviceUrl; /*!< URL of the service */ QString m_serviceUrl; /*!< URL of the service */
QNetworkAccessManager *m_networkManager; QNetworkAccessManager *m_networkManager = nullptr;
//! Parse received bookings signals:
//! \threadsafe
void parseBookings(QNetworkReply *nwReplyPtr);
signals:
//! Bookings have been read and converted to BlackMisc::Aviation::CAtcStationList //! Bookings have been read and converted to BlackMisc::Aviation::CAtcStationList
void dataRead(const BlackMisc::Aviation::CAtcStationList &bookedStations); void dataRead(const BlackMisc::Aviation::CAtcStationList &bookedStations);
}; };

View File

@@ -15,7 +15,6 @@
#include "vatsimdatafilereader.h" #include "vatsimdatafilereader.h"
#include <QRegularExpression> #include <QRegularExpression>
#include <QtConcurrent/QtConcurrent>
using namespace BlackMisc; using namespace BlackMisc;
using namespace BlackMisc::Aviation; using namespace BlackMisc::Aviation;
@@ -26,30 +25,12 @@ using namespace BlackMisc::PhysicalQuantities;
namespace BlackCore namespace BlackCore
{ {
CVatsimDataFileReader::CVatsimDataFileReader(QObject *owner, const QStringList &urls) : CVatsimDataFileReader::CVatsimDataFileReader(QObject *owner, const QStringList &urls) :
CThreadedReader(owner), CThreadedReader(owner, "CVatsimDataFileReader"),
m_serviceUrls(urls), m_currentUrlIndex(0), m_networkManager(nullptr) m_serviceUrls(urls), m_currentUrlIndex(0)
{ {
this->m_networkManager = new QNetworkAccessManager(this); this->m_networkManager = new QNetworkAccessManager(this);
this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimDataFileReader::ps_loadFinished); this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimDataFileReader::ps_parseVatsimFile);
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::read); this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::ps_read);
}
void CVatsimDataFileReader::read()
{
if (this->m_serviceUrls.isEmpty()) return;
// round robin for load distribution
this->m_currentUrlIndex++;
if (this->m_serviceUrls.size() >= this->m_currentUrlIndex) this->m_currentUrlIndex = 0;
// remark: Don't use QThread to run network operations in the background
// see http://qt-project.org/doc/qt-4.7/qnetworkaccessmanager.html
QUrl url(this->m_serviceUrls.at(this->m_currentUrlIndex));
if (url.isEmpty()) return;
Q_ASSERT(this->m_networkManager);
QNetworkRequest request(url);
QNetworkReply *r = this->m_networkManager->get(request);
this->setPendingNetworkReply(r);
} }
CAircraftList CVatsimDataFileReader::getAircraft() const CAircraftList CVatsimDataFileReader::getAircraft() const
@@ -143,34 +124,51 @@ namespace BlackCore
return users; return users;
} }
/* void CVatsimDataFileReader::readInBackgroundThread()
* Data file read from XML
*/
void CVatsimDataFileReader::ps_loadFinished(QNetworkReply *nwReply)
{ {
this->setPendingNetworkReply(nullptr); if (QThread::currentThread() == QObject::thread())
if (!this->isFinished())
{ {
QFuture<void> f = QtConcurrent::run(this, &CVatsimDataFileReader::parseVatsimFileInBackground, nwReply); ps_read();
this->setPendingFuture(f); }
else
{
bool s = QMetaObject::invokeMethod(this, "ps_read", Qt::BlockingQueuedConnection);
Q_ASSERT(s);
Q_UNUSED(s);
} }
} }
/* void CVatsimDataFileReader::ps_read()
* Data file read from XML {
* Example: http://info.vroute.net/vatsim-data.txt this->threadAssertCheck();
*/ if (this->m_serviceUrls.isEmpty()) { return; }
void CVatsimDataFileReader::parseVatsimFileInBackground(QNetworkReply *nwReplyPtr)
// round robin for load distribution
this->m_currentUrlIndex++;
if (this->m_serviceUrls.size() >= this->m_currentUrlIndex) this->m_currentUrlIndex = 0;
// remark: Don't use QThread to run network operations in the background
// see http://qt-project.org/doc/qt-4.7/qnetworkaccessmanager.html
QUrl url(this->m_serviceUrls.at(this->m_currentUrlIndex));
if (url.isEmpty()) { return; }
Q_ASSERT(this->m_networkManager);
QNetworkRequest request(url);
this->m_networkManager->get(request);
}
void CVatsimDataFileReader::ps_parseVatsimFile(QNetworkReply *nwReplyPtr)
{ {
// wrap pointer, make sure any exit cleans up reply // wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread // required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr); QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
this->threadAssertCheck();
// Worker thread, make sure to write only synced here! // Worker thread, make sure to write only synced here!
if (this->isFinished()) if (this->isFinished())
{ {
CLogMessage(this).debug() << Q_FUNC_INFO; CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("terminated VATSIM file parsing process"); // for users CLogMessage(this).info("Terminated VATSIM file parsing process"); // for users
return; // stop, terminate straight away, ending thread return; // stop, terminate straight away, ending thread
} }

View File

@@ -30,7 +30,7 @@ namespace BlackCore
/*! /*!
* Read bookings from VATSIM * Read bookings from VATSIM
*/ */
class CVatsimDataFileReader : public BlackMisc::CThreadedReader<void> class CVatsimDataFileReader : public BlackMisc::CThreadedReader
{ {
Q_OBJECT Q_OBJECT
@@ -38,9 +38,6 @@ namespace BlackCore
//! Constructor //! Constructor
explicit CVatsimDataFileReader(QObject *owner, const QStringList &urls); explicit CVatsimDataFileReader(QObject *owner, const QStringList &urls);
//! Read / re-read data file
void read();
//! Get aircrafts //! Get aircrafts
//! \threadsafe //! \threadsafe
BlackMisc::Aviation::CAircraftList getAircraft() const; BlackMisc::Aviation::CAircraftList getAircraft() const;
@@ -93,14 +90,20 @@ namespace BlackCore
//! \threadsafe //! \threadsafe
void updateWithVatsimDataFileData(BlackMisc::Aviation::CAircraft &aircraftToBeUdpated) const; void updateWithVatsimDataFileData(BlackMisc::Aviation::CAircraft &aircraftToBeUdpated) const;
//! Start reading in own thread
void readInBackgroundThread();
private slots: private slots:
//! Data have been read //! Data have been read, parse VATSIM file
void ps_loadFinished(QNetworkReply *nwReply); void ps_parseVatsimFile(QNetworkReply *nwReply);
//! Read / re-read data file
void ps_read();
private: private:
QNetworkAccessManager *m_networkManager = nullptr;
QStringList m_serviceUrls; /*!< URL of the service */ QStringList m_serviceUrls; /*!< URL of the service */
int m_currentUrlIndex; int m_currentUrlIndex;
QNetworkAccessManager *m_networkManager;
BlackMisc::Network::CServerList m_voiceServers; BlackMisc::Network::CServerList m_voiceServers;
BlackMisc::Network::CServerList m_fsdServers; BlackMisc::Network::CServerList m_fsdServers;
BlackMisc::Aviation::CAtcStationList m_atcStations; BlackMisc::Aviation::CAtcStationList m_atcStations;
@@ -123,9 +126,6 @@ namespace BlackCore
//! Get current section //! Get current section
static Section currentLineToSection(const QString &currentLine); static Section currentLineToSection(const QString &currentLine);
//! Parse the VATSIM data file in backgroun
void parseVatsimFileInBackground(QNetworkReply *nwReplyPtr);
signals: signals:
//! Data have been read //! Data have been read
void dataRead(); void dataRead();

View File

@@ -0,0 +1,83 @@
/* Copyright (C) 2013
* swift project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution and at http://www.swift-project.org/license.html. No part of swift project,
* including this file, may be copied, modified, propagated, or distributed except according to the terms
* contained in the LICENSE file.
*/
#include "threadedreader.h"
namespace BlackMisc
{
CThreadedReader::CThreadedReader(QObject *owner, const QString &name) :
CContinuousWorker(owner, name),
m_updateTimer(new QTimer(this))
{ }
QDateTime CThreadedReader::getUpdateTimestamp() const
{
QReadLocker(&this->m_lock);
return this->m_updateTimestamp;
}
void CThreadedReader::setUpdateTimestamp(const QDateTime &updateTimestamp)
{
QWriteLocker(&this->m_lock);
this->m_updateTimestamp = updateTimestamp;
}
void CThreadedReader::requestStop()
{
QWriteLocker(&this->m_lock);
this->m_stopped = true;
this->m_updateTimer->stop();
}
CThreadedReader::~CThreadedReader()
{
cleanup();
}
void CThreadedReader::cleanup()
{
// cleanup code would go here
}
bool CThreadedReader::isFinished() const
{
if (CContinuousWorker::isFinished()) { return true; }
QReadLocker(&this->m_lock);
return this->m_stopped;
}
void CThreadedReader::setInterval(int updatePeriodMs)
{
Q_ASSERT(this->m_updateTimer);
QWriteLocker(&this->m_lock);
if (updatePeriodMs < 1)
{
this->m_updateTimer->stop();
}
else
{
this->m_updateTimer->start(updatePeriodMs);
}
}
int CThreadedReader::interval() const
{
QReadLocker rl(&this->m_lock);
return this->m_updateTimer->interval();
}
void CThreadedReader::threadAssertCheck() const
{
Q_ASSERT_X(QCoreApplication::instance()->thread() != QThread::currentThread(), "CThreadedReader::threadAssertCheck", "Needs to run in own thread");
Q_ASSERT_X(QObject::thread() == QThread::currentThread(), "CThreadedReader::threadAssertCheck", "Needs to run in own thread");
}
} // namespace

View File

@@ -17,7 +17,6 @@
#include <QDateTime> #include <QDateTime>
#include <QTimer> #include <QTimer>
#include <QNetworkReply> #include <QNetworkReply>
#include <QFuture>
#include <QCoreApplication> #include <QCoreApplication>
namespace BlackMisc namespace BlackMisc
@@ -25,102 +24,55 @@ namespace BlackMisc
/*! /*!
* Support for threaded based reading and parsing tasks such * Support for threaded based reading and parsing tasks such
* as data files via http, or file system and parsing (such as FSX models) * as data files via http, or file system and parsing (such as FSX models)
*
* \remarks Header only class to avoid forward instantiations across subprojects
*/ */
template <class FutureRet = void> class CThreadedReader : public CContinuousWorker class CThreadedReader : public CContinuousWorker
{ {
public: public:
//! Thread safe, set update timestamp //! Thread safe, set update timestamp
//! \threadsafe //! \threadsafe
QDateTime getUpdateTimestamp() const QDateTime getUpdateTimestamp() const;
{
QReadLocker(&this->m_lock);
return this->m_updateTimestamp;
}
//! Thread safe, set update timestamp //! Thread safe, set update timestamp
//! \threadsafe //! \threadsafe
void setUpdateTimestamp(const QDateTime &updateTimestamp) void setUpdateTimestamp(const QDateTime &updateTimestamp);
{
QWriteLocker(&this->m_lock); //! Request to stop
this->m_updateTimestamp = updateTimestamp; //! \threadsafe
} void requestStop();
//! Destructor
virtual ~CThreadedReader();
//! \copydoc CContinuousWorker::cleanup //! \copydoc CContinuousWorker::cleanup
virtual void cleanup() override virtual void cleanup() override;
{
// shutdown pending
QWriteLocker(&this->m_lock);
if (this->m_pendingFuture.isRunning())
{
// cancel does not work with all futures, especially not with QConcurrent::run
// the stop flag should the job
// but I will cancel anyway
this->m_pendingFuture.cancel();
}
if (this->m_pendingNetworkReply && this->m_pendingNetworkReply->isRunning())
{
this->m_pendingNetworkReply->abort();
}
// cancel or stop flag above should terminate QFuture //! Thread ended of stop requested
this->m_pendingFuture.waitForFinished(); virtual bool isFinished() const override;
}
/*! /*!
* Set the update time * Set the update time
* \param updatePeriodMs <=0 stops the timer * \param updatePeriodMs <=0 stops the timer
* \threadsafe * \threadsafe
*/ */
void setInterval(int updatePeriodMs) void setInterval(int updatePeriodMs);
{
Q_ASSERT(this->m_updateTimer);
QWriteLocker(&this->m_lock);
if (updatePeriodMs < 1)
this->m_updateTimer->stop();
else
this->m_updateTimer->start(updatePeriodMs);
}
//! Get the timer interval (ms) //! Get the timer interval (ms)
//! \threadsafe //! \threadsafe
int interval() const int interval() const;
{
QReadLocker rl(&this->m_lock);
return this->m_updateTimer->interval();
}
protected: protected:
//! Constructor //! Constructor
CThreadedReader(QObject *owner) : CContinuousWorker(owner), CThreadedReader(QObject *owner, const QString &name);
m_updateTimer(new QTimer(this)), m_lock(QReadWriteLock::Recursive)
{
}
//! Has pending network replay QTimer *m_updateTimer = nullptr; //!< update timer
//! \threadsafe mutable QReadWriteLock m_lock {QReadWriteLock::Recursive}; //!< lock
void setPendingNetworkReply(QNetworkReply *reply)
{
QWriteLocker(&this->m_lock);
this->m_pendingNetworkReply = reply;
}
//! Has pending operation //! Make sure everthing runs correctly in own thread
//! \threadsafe void threadAssertCheck() const;
void setPendingFuture(QFuture<FutureRet> future)
{
QWriteLocker(&this->m_lock);
this->m_pendingFuture = future;
}
QTimer *m_updateTimer = nullptr; //!< update timer
mutable QReadWriteLock m_lock; //!< lock
private: private:
QDateTime m_updateTimestamp; //!< when was file / resource read QDateTime m_updateTimestamp; //!< when was file / resource read
QFuture<FutureRet> m_pendingFuture; //!< optional future to be stopped bool m_stopped = false; //!< optional network reply to be stopped
QNetworkReply *m_pendingNetworkReply = nullptr; //!< optional network reply to be stopped
}; };
} // namespace } // namespace

View File

@@ -130,7 +130,7 @@ namespace BlackMisc
//! Returns true if the task has finished. //! Returns true if the task has finished.
//! \threadsafe But don't rely on this condition remaining true for any length of time. //! \threadsafe But don't rely on this condition remaining true for any length of time.
bool isFinished() const virtual bool isFinished() const
{ {
QMutexLocker lock(&m_finishedMutex); QMutexLocker lock(&m_finishedMutex);
return m_finished; return m_finished;