From 5d491b9456ec4908fca3c8dd9e110cccfed3347c Mon Sep 17 00:00:00 2001 From: Klaus Basan Date: Fri, 13 Feb 2015 02:06:33 +0100 Subject: [PATCH] refs #380, use CWorker instead of QConcurrent for threaded reader (bookings/VATSIM file) --- src/blackcore/context_network_impl.cpp | 4 +- src/blackcore/vatsimbookingreader.cpp | 55 ++++++++------- src/blackcore/vatsimbookingreader.h | 18 ++--- src/blackcore/vatsimdatafilereader.cpp | 72 ++++++++++---------- src/blackcore/vatsimdatafilereader.h | 20 +++--- src/blackmisc/threadedreader.cpp | 83 +++++++++++++++++++++++ src/blackmisc/threadedreader.h | 92 ++++++-------------------- src/blackmisc/worker.h | 2 +- 8 files changed, 189 insertions(+), 157 deletions(-) create mode 100644 src/blackmisc/threadedreader.cpp diff --git a/src/blackcore/context_network_impl.cpp b/src/blackcore/context_network_impl.cpp index e85211a21..22d232196 100644 --- a/src/blackcore/context_network_impl.cpp +++ b/src/blackcore/context_network_impl.cpp @@ -123,8 +123,8 @@ namespace BlackCore void CContextNetwork::gracefulShutdown() { - if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->quit(); } - if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->quit(); } + if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->requestStop(); this->m_vatsimBookingReader->quit(); } + if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->requestStop(); this->m_vatsimDataFileReader->quit(); } if (this->isConnected()) { this->disconnectFromNetwork(); } } diff --git a/src/blackcore/vatsimbookingreader.cpp b/src/blackcore/vatsimbookingreader.cpp index 5f7412854..1292dbe00 100644 --- a/src/blackcore/vatsimbookingreader.cpp +++ b/src/blackcore/vatsimbookingreader.cpp @@ -14,7 +14,6 @@ #include "vatsimbookingreader.h" #include -#include using namespace BlackMisc; using namespace BlackMisc::Aviation; @@ -23,46 +22,46 @@ using namespace BlackMisc::Network; namespace BlackCore { CVatsimBookingReader::CVatsimBookingReader(QObject *owner, const QString &url) : - CThreadedReader(owner), - m_serviceUrl(url), m_networkManager(nullptr) + CThreadedReader(owner, "CVatsimBookingReader"), + m_serviceUrl(url) { this->m_networkManager = new QNetworkAccessManager(this); - this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimBookingReader::ps_loadFinished); - this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::read); + this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimBookingReader::ps_parseBookings); + this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::ps_read); } - void CVatsimBookingReader::read() + void CVatsimBookingReader::readInBackgroundThread() { + if (QThread::currentThread() == QObject::thread()) + { + ps_read(); + } + else + { + bool s = QMetaObject::invokeMethod(this, "ps_read", Qt::BlockingQueuedConnection); + Q_ASSERT(s); + Q_UNUSED(s); + } + } + + void CVatsimBookingReader::ps_read() + { + this->threadAssertCheck(); QUrl url(this->m_serviceUrl); if (url.isEmpty()) return; Q_ASSERT(this->m_networkManager); QNetworkRequest request(url); - QNetworkReply *reply = this->m_networkManager->get(request); - this->setPendingNetworkReply(reply); + this->m_networkManager->get(request); } - /* - * Bookings read from XML - */ - void CVatsimBookingReader::ps_loadFinished(QNetworkReply *nwReply) - { - this->setPendingNetworkReply(nullptr); - if (!this->isFinished()) - { - QFuture f = QtConcurrent::run(this, &CVatsimBookingReader::parseBookings, nwReply); - this->setPendingFuture(f); - } - } - - /* - * Parse bookings - */ - void CVatsimBookingReader::parseBookings(QNetworkReply *nwReplyPtr) + void CVatsimBookingReader::ps_parseBookings(QNetworkReply *nwReplyPtr) { // wrap pointer, make sure any exit cleans up reply // required to use delete later as object is created in a different thread QScopedPointer nwReply(nwReplyPtr); + this->threadAssertCheck(); + // Worker thread, make sure to write no members here! if (this->isFinished()) { @@ -103,7 +102,7 @@ namespace BlackCore if (this->isFinished()) { CLogMessage(this).debug() << Q_FUNC_INFO; - CLogMessage(this).info("terminated booking parsing process"); // for users + CLogMessage(this).info("Terminated booking parsing process"); // for users return; // stop, terminate straight away, ending thread } @@ -148,8 +147,8 @@ namespace BlackCore } // time checks QDateTime now = QDateTime::currentDateTimeUtc(); - if (now.msecsTo(bookedStation.getBookedUntilUtc()) < (1000 * 60 * 15)) continue; // until n mins in past - if (now.msecsTo(bookedStation.getBookedFromUtc()) > (1000 * 60 * 60 * 24)) continue; // to far in the future, n hours + if (now.msecsTo(bookedStation.getBookedUntilUtc()) < (1000 * 60 * 15)) { continue; } // until n mins in past + if (now.msecsTo(bookedStation.getBookedFromUtc()) > (1000 * 60 * 60 * 24)) { continue; } // to far in the future, n hours bookedStation.setController(user); bookedStations.push_back(bookedStation); } diff --git a/src/blackcore/vatsimbookingreader.h b/src/blackcore/vatsimbookingreader.h index ab2dc7a46..163e31f54 100644 --- a/src/blackcore/vatsimbookingreader.h +++ b/src/blackcore/vatsimbookingreader.h @@ -25,7 +25,7 @@ namespace BlackCore /*! * Read bookings from VATSIM */ - class CVatsimBookingReader : public BlackMisc::CThreadedReader + class CVatsimBookingReader : public BlackMisc::CThreadedReader { Q_OBJECT @@ -34,21 +34,21 @@ namespace BlackCore explicit CVatsimBookingReader(QObject *owner, const QString &url); //! Read / re-read bookings - void read(); + void readInBackgroundThread(); private slots: //! Bookings have been read - void ps_loadFinished(QNetworkReply *nwReply); + //! \threadsafe + void ps_parseBookings(QNetworkReply *nwReply); + + //! Do reading + void ps_read(); private: QString m_serviceUrl; /*!< URL of the service */ - QNetworkAccessManager *m_networkManager; + QNetworkAccessManager *m_networkManager = nullptr; - //! Parse received bookings - //! \threadsafe - void parseBookings(QNetworkReply *nwReplyPtr); - - signals: + signals: //! Bookings have been read and converted to BlackMisc::Aviation::CAtcStationList void dataRead(const BlackMisc::Aviation::CAtcStationList &bookedStations); }; diff --git a/src/blackcore/vatsimdatafilereader.cpp b/src/blackcore/vatsimdatafilereader.cpp index 9fc983795..88c1598c2 100644 --- a/src/blackcore/vatsimdatafilereader.cpp +++ b/src/blackcore/vatsimdatafilereader.cpp @@ -15,7 +15,6 @@ #include "vatsimdatafilereader.h" #include -#include using namespace BlackMisc; using namespace BlackMisc::Aviation; @@ -26,30 +25,12 @@ using namespace BlackMisc::PhysicalQuantities; namespace BlackCore { CVatsimDataFileReader::CVatsimDataFileReader(QObject *owner, const QStringList &urls) : - CThreadedReader(owner), - m_serviceUrls(urls), m_currentUrlIndex(0), m_networkManager(nullptr) + CThreadedReader(owner, "CVatsimDataFileReader"), + m_serviceUrls(urls), m_currentUrlIndex(0) { this->m_networkManager = new QNetworkAccessManager(this); - this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimDataFileReader::ps_loadFinished); - this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::read); - } - - void CVatsimDataFileReader::read() - { - if (this->m_serviceUrls.isEmpty()) return; - - // round robin for load distribution - this->m_currentUrlIndex++; - if (this->m_serviceUrls.size() >= this->m_currentUrlIndex) this->m_currentUrlIndex = 0; - - // remark: Don't use QThread to run network operations in the background - // see http://qt-project.org/doc/qt-4.7/qnetworkaccessmanager.html - QUrl url(this->m_serviceUrls.at(this->m_currentUrlIndex)); - if (url.isEmpty()) return; - Q_ASSERT(this->m_networkManager); - QNetworkRequest request(url); - QNetworkReply *r = this->m_networkManager->get(request); - this->setPendingNetworkReply(r); + this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimDataFileReader::ps_parseVatsimFile); + this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::ps_read); } CAircraftList CVatsimDataFileReader::getAircraft() const @@ -143,34 +124,51 @@ namespace BlackCore return users; } - /* - * Data file read from XML - */ - void CVatsimDataFileReader::ps_loadFinished(QNetworkReply *nwReply) + void CVatsimDataFileReader::readInBackgroundThread() { - this->setPendingNetworkReply(nullptr); - if (!this->isFinished()) + if (QThread::currentThread() == QObject::thread()) { - QFuture f = QtConcurrent::run(this, &CVatsimDataFileReader::parseVatsimFileInBackground, nwReply); - this->setPendingFuture(f); + ps_read(); + } + else + { + bool s = QMetaObject::invokeMethod(this, "ps_read", Qt::BlockingQueuedConnection); + Q_ASSERT(s); + Q_UNUSED(s); } } - /* - * Data file read from XML - * Example: http://info.vroute.net/vatsim-data.txt - */ - void CVatsimDataFileReader::parseVatsimFileInBackground(QNetworkReply *nwReplyPtr) + void CVatsimDataFileReader::ps_read() + { + this->threadAssertCheck(); + if (this->m_serviceUrls.isEmpty()) { return; } + + // round robin for load distribution + this->m_currentUrlIndex++; + if (this->m_serviceUrls.size() >= this->m_currentUrlIndex) this->m_currentUrlIndex = 0; + + // remark: Don't use QThread to run network operations in the background + // see http://qt-project.org/doc/qt-4.7/qnetworkaccessmanager.html + QUrl url(this->m_serviceUrls.at(this->m_currentUrlIndex)); + if (url.isEmpty()) { return; } + Q_ASSERT(this->m_networkManager); + QNetworkRequest request(url); + this->m_networkManager->get(request); + } + + void CVatsimDataFileReader::ps_parseVatsimFile(QNetworkReply *nwReplyPtr) { // wrap pointer, make sure any exit cleans up reply // required to use delete later as object is created in a different thread QScopedPointer nwReply(nwReplyPtr); + this->threadAssertCheck(); + // Worker thread, make sure to write only synced here! if (this->isFinished()) { CLogMessage(this).debug() << Q_FUNC_INFO; - CLogMessage(this).info("terminated VATSIM file parsing process"); // for users + CLogMessage(this).info("Terminated VATSIM file parsing process"); // for users return; // stop, terminate straight away, ending thread } diff --git a/src/blackcore/vatsimdatafilereader.h b/src/blackcore/vatsimdatafilereader.h index 369147746..6e440e845 100644 --- a/src/blackcore/vatsimdatafilereader.h +++ b/src/blackcore/vatsimdatafilereader.h @@ -30,7 +30,7 @@ namespace BlackCore /*! * Read bookings from VATSIM */ - class CVatsimDataFileReader : public BlackMisc::CThreadedReader + class CVatsimDataFileReader : public BlackMisc::CThreadedReader { Q_OBJECT @@ -38,9 +38,6 @@ namespace BlackCore //! Constructor explicit CVatsimDataFileReader(QObject *owner, const QStringList &urls); - //! Read / re-read data file - void read(); - //! Get aircrafts //! \threadsafe BlackMisc::Aviation::CAircraftList getAircraft() const; @@ -93,14 +90,20 @@ namespace BlackCore //! \threadsafe void updateWithVatsimDataFileData(BlackMisc::Aviation::CAircraft &aircraftToBeUdpated) const; + //! Start reading in own thread + void readInBackgroundThread(); + private slots: - //! Data have been read - void ps_loadFinished(QNetworkReply *nwReply); + //! Data have been read, parse VATSIM file + void ps_parseVatsimFile(QNetworkReply *nwReply); + + //! Read / re-read data file + void ps_read(); private: + QNetworkAccessManager *m_networkManager = nullptr; QStringList m_serviceUrls; /*!< URL of the service */ int m_currentUrlIndex; - QNetworkAccessManager *m_networkManager; BlackMisc::Network::CServerList m_voiceServers; BlackMisc::Network::CServerList m_fsdServers; BlackMisc::Aviation::CAtcStationList m_atcStations; @@ -123,9 +126,6 @@ namespace BlackCore //! Get current section static Section currentLineToSection(const QString ¤tLine); - //! Parse the VATSIM data file in backgroun - void parseVatsimFileInBackground(QNetworkReply *nwReplyPtr); - signals: //! Data have been read void dataRead(); diff --git a/src/blackmisc/threadedreader.cpp b/src/blackmisc/threadedreader.cpp new file mode 100644 index 000000000..ed4692b8d --- /dev/null +++ b/src/blackmisc/threadedreader.cpp @@ -0,0 +1,83 @@ +/* Copyright (C) 2013 + * swift project Community / Contributors + * + * This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level + * directory of this distribution and at http://www.swift-project.org/license.html. No part of swift project, + * including this file, may be copied, modified, propagated, or distributed except according to the terms + * contained in the LICENSE file. + */ + +#include "threadedreader.h" + +namespace BlackMisc +{ + + CThreadedReader::CThreadedReader(QObject *owner, const QString &name) : + CContinuousWorker(owner, name), + m_updateTimer(new QTimer(this)) + { } + + + QDateTime CThreadedReader::getUpdateTimestamp() const + { + QReadLocker(&this->m_lock); + return this->m_updateTimestamp; + } + + void CThreadedReader::setUpdateTimestamp(const QDateTime &updateTimestamp) + { + QWriteLocker(&this->m_lock); + this->m_updateTimestamp = updateTimestamp; + } + + void CThreadedReader::requestStop() + { + QWriteLocker(&this->m_lock); + this->m_stopped = true; + this->m_updateTimer->stop(); + } + + CThreadedReader::~CThreadedReader() + { + cleanup(); + } + + void CThreadedReader::cleanup() + { + // cleanup code would go here + } + + bool CThreadedReader::isFinished() const + { + if (CContinuousWorker::isFinished()) { return true; } + QReadLocker(&this->m_lock); + return this->m_stopped; + } + + void CThreadedReader::setInterval(int updatePeriodMs) + { + Q_ASSERT(this->m_updateTimer); + QWriteLocker(&this->m_lock); + if (updatePeriodMs < 1) + { + this->m_updateTimer->stop(); + } + else + { + this->m_updateTimer->start(updatePeriodMs); + } + } + + int CThreadedReader::interval() const + { + QReadLocker rl(&this->m_lock); + return this->m_updateTimer->interval(); + } + + void CThreadedReader::threadAssertCheck() const + { + Q_ASSERT_X(QCoreApplication::instance()->thread() != QThread::currentThread(), "CThreadedReader::threadAssertCheck", "Needs to run in own thread"); + Q_ASSERT_X(QObject::thread() == QThread::currentThread(), "CThreadedReader::threadAssertCheck", "Needs to run in own thread"); + } + +} // namespace diff --git a/src/blackmisc/threadedreader.h b/src/blackmisc/threadedreader.h index d83501569..8f805d025 100644 --- a/src/blackmisc/threadedreader.h +++ b/src/blackmisc/threadedreader.h @@ -17,7 +17,6 @@ #include #include #include -#include #include namespace BlackMisc @@ -25,102 +24,55 @@ namespace BlackMisc /*! * Support for threaded based reading and parsing tasks such * as data files via http, or file system and parsing (such as FSX models) - * - * \remarks Header only class to avoid forward instantiations across subprojects */ - template class CThreadedReader : public CContinuousWorker + class CThreadedReader : public CContinuousWorker { public: //! Thread safe, set update timestamp //! \threadsafe - QDateTime getUpdateTimestamp() const - { - QReadLocker(&this->m_lock); - return this->m_updateTimestamp; - } + QDateTime getUpdateTimestamp() const; //! Thread safe, set update timestamp //! \threadsafe - void setUpdateTimestamp(const QDateTime &updateTimestamp) - { - QWriteLocker(&this->m_lock); - this->m_updateTimestamp = updateTimestamp; - } + void setUpdateTimestamp(const QDateTime &updateTimestamp); + + //! Request to stop + //! \threadsafe + void requestStop(); + + //! Destructor + virtual ~CThreadedReader(); //! \copydoc CContinuousWorker::cleanup - virtual void cleanup() override - { - // shutdown pending - QWriteLocker(&this->m_lock); - if (this->m_pendingFuture.isRunning()) - { - // cancel does not work with all futures, especially not with QConcurrent::run - // the stop flag should the job - // but I will cancel anyway - this->m_pendingFuture.cancel(); - } - if (this->m_pendingNetworkReply && this->m_pendingNetworkReply->isRunning()) - { - this->m_pendingNetworkReply->abort(); - } + virtual void cleanup() override; - // cancel or stop flag above should terminate QFuture - this->m_pendingFuture.waitForFinished(); - } + //! Thread ended of stop requested + virtual bool isFinished() const override; /*! * Set the update time * \param updatePeriodMs <=0 stops the timer * \threadsafe */ - void setInterval(int updatePeriodMs) - { - Q_ASSERT(this->m_updateTimer); - QWriteLocker(&this->m_lock); - if (updatePeriodMs < 1) - this->m_updateTimer->stop(); - else - this->m_updateTimer->start(updatePeriodMs); - } + void setInterval(int updatePeriodMs); //! Get the timer interval (ms) //! \threadsafe - int interval() const - { - QReadLocker rl(&this->m_lock); - return this->m_updateTimer->interval(); - } + int interval() const; protected: //! Constructor - CThreadedReader(QObject *owner) : CContinuousWorker(owner), - m_updateTimer(new QTimer(this)), m_lock(QReadWriteLock::Recursive) - { - } + CThreadedReader(QObject *owner, const QString &name); - //! Has pending network replay - //! \threadsafe - void setPendingNetworkReply(QNetworkReply *reply) - { - QWriteLocker(&this->m_lock); - this->m_pendingNetworkReply = reply; - } + QTimer *m_updateTimer = nullptr; //!< update timer + mutable QReadWriteLock m_lock {QReadWriteLock::Recursive}; //!< lock - //! Has pending operation - //! \threadsafe - void setPendingFuture(QFuture future) - { - QWriteLocker(&this->m_lock); - this->m_pendingFuture = future; - } - - QTimer *m_updateTimer = nullptr; //!< update timer - mutable QReadWriteLock m_lock; //!< lock + //! Make sure everthing runs correctly in own thread + void threadAssertCheck() const; private: - QDateTime m_updateTimestamp; //!< when was file / resource read - QFuture m_pendingFuture; //!< optional future to be stopped - QNetworkReply *m_pendingNetworkReply = nullptr; //!< optional network reply to be stopped + QDateTime m_updateTimestamp; //!< when was file / resource read + bool m_stopped = false; //!< optional network reply to be stopped }; } // namespace diff --git a/src/blackmisc/worker.h b/src/blackmisc/worker.h index a4255fb45..93311ab5c 100644 --- a/src/blackmisc/worker.h +++ b/src/blackmisc/worker.h @@ -130,7 +130,7 @@ namespace BlackMisc //! Returns true if the task has finished. //! \threadsafe But don't rely on this condition remaining true for any length of time. - bool isFinished() const + virtual bool isFinished() const { QMutexLocker lock(&m_finishedMutex); return m_finished;