Ref T105, use new style with threaded readers

* use doWorkCheck
* relaxed doWorkCheck in unit tests
* removed isShuttingDown, gracefulShutdown
* set timer object name (in case something is wrong, we might see the name in the log)
This commit is contained in:
Klaus Basan
2017-07-10 18:13:44 +02:00
committed by Mathew Sutcliffe
parent 2fbd2c6382
commit b6b1b96ec4
14 changed files with 86 additions and 84 deletions

View File

@@ -149,7 +149,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
const CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReplyPtr);
if (res.hasErrorMessage())
@@ -196,7 +196,7 @@ namespace BlackCore
void CAirportDataReader::ps_read(CEntityFlags::Entity entity, CDbFlags::DataRetrievalModeFlag mode, const QDateTime &newerThan)
{
this->threadAssertCheck();
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
entity &= CEntityFlags::AirportEntity;
if (!this->isNetworkAccessible())
{

View File

@@ -49,7 +49,7 @@ namespace BlackCore
void CDatabaseReader::readInBackgroundThread(CEntityFlags::Entity entities, const QDateTime &newerThan)
{
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
// we accept cached data
Q_ASSERT_X(!entities.testFlag(CEntityFlags::DbInfoObjectEntity), Q_FUNC_INFO, "Read info objects directly");
@@ -226,7 +226,7 @@ namespace BlackCore
{
Q_ASSERT_X(nwReply, Q_FUNC_INFO, "Missing reply");
this->threadAssertCheck();
if (this->isAbandoned())
if (!this->doWorkCheck())
{
nwReply->abort();
headerResponse.setMessage(CStatusMessage(this, CStatusMessage::SeverityError, "Terminated data parsing process"));
@@ -469,7 +469,7 @@ namespace BlackCore
CUrl CDatabaseReader::getBaseUrl(CDbFlags::DataRetrievalModeFlag mode) const
{
if (this->isShuttingDown()) { return CUrl(); }
if (!this->doWorkCheck()) { return CUrl(); }
Q_ASSERT_X(sApp, Q_FUNC_INFO, "Missing app object, URLs cannot be obtained");
switch (mode)
{
@@ -500,7 +500,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
this->receivedSharedFileHeaderNonClosing(nwReplyPtr);
nwReply->close();
}

View File

@@ -133,7 +133,7 @@ namespace BlackCore
void CIcaoDataReader::ps_read(BlackMisc::Network::CEntityFlags::Entity entities, BlackMisc::Db::CDbFlags::DataRetrievalModeFlag mode, const QDateTime &newerThan)
{
this->threadAssertCheck(); // runs in background thread
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
entities &= CEntityFlags::AllIcaoAndCountries;
if (!this->isNetworkAccessible())
{
@@ -229,7 +229,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())
@@ -278,7 +278,7 @@ namespace BlackCore
void CIcaoDataReader::ps_parseAirlineIcaoData(QNetworkReply *nwReplyPtr)
{
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())

View File

@@ -112,7 +112,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
const CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())

View File

@@ -152,7 +152,7 @@ namespace BlackCore
void CModelDataReader::ps_read(CEntityFlags::Entity entities, CDbFlags::DataRetrievalModeFlag mode, const QDateTime &newerThan)
{
this->threadAssertCheck();
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
entities &= CEntityFlags::DistributorLiveryModel;
if (!this->isNetworkAccessible())
{
@@ -248,7 +248,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())
{
@@ -290,7 +290,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())
{
@@ -332,7 +332,7 @@ namespace BlackCore
// wrap pointer, make sure any exit cleans up reply
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
if (this->isShuttingDown()) { return; }
if (!this->doWorkCheck()) { return; }
CDatabaseReader::JsonDatastoreResponse res = this->setStatusAndTransformReplyIntoDatastoreResponse(nwReply.data());
if (res.hasErrorMessage())
{

View File

@@ -35,13 +35,12 @@ namespace BlackCore
CContinuousWorker(owner, name)
{
connect(&m_updateTimer, &QTimer::timeout, this, &CThreadedReader::doWork);
m_updateTimer.setObjectName(getName());
m_updateTimer.setSingleShot(true);
}
CThreadedReader::~CThreadedReader()
{
gracefulShutdown();
}
{ }
qint64 CThreadedReader::lastModifiedMsSinceEpoch(QNetworkReply *nwReply) const
{
@@ -73,17 +72,6 @@ namespace BlackCore
return sApp->isNetworkAccessible();
}
void CThreadedReader::gracefulShutdown()
{
// if not in main thread stop, otherwise it makes no sense to abandon
if (this->m_shutdown) { return; }
this->m_shutdown = true;
if (!CThreadUtils::isCurrentThreadObjectThread(this))
{
this->abandonAndWait();
}
}
void CThreadedReader::startReader()
{
Q_ASSERT(m_initialTime > 0);
@@ -95,14 +83,6 @@ namespace BlackCore
QTimer::singleShot(0, &m_updateTimer, &QTimer::stop);
}
bool CThreadedReader::isShuttingDown() const
{
if (!sApp) { return true; } // sApp object is gone, whole system shutdown
if (this->m_shutdown) { return true; } // marked as shutdown
if (this->isAbandoned()) { return true; } // worker abandoned
return false;
}
bool CThreadedReader::didContentChange(const QString &content, int startPosition)
{
uint oldHash = 0;
@@ -119,6 +99,11 @@ namespace BlackCore
return true;
}
void CThreadedReader::cleanup()
{
m_updateTimer.stop();
}
bool CThreadedReader::isMarkedAsFailed() const
{
return this->m_markedAsFailed;
@@ -149,10 +134,16 @@ namespace BlackCore
void CThreadedReader::doWork()
{
if (isFinished()) { return; }
if (!doWorkCheck()) { return; }
doWorkImpl();
Q_ASSERT(m_periodicTime > 0);
m_updateTimer.start(m_periodicTime);
m_updateTimer.start(m_periodicTime); // restart
}
bool CThreadedReader::doWorkCheck() const
{
if (!m_unitTest && (!sApp || !sApp->hasWebDataServices())) { return false; }
if (!isEnabled()) { return false; }
return true;
}
} // namespace

View File

@@ -52,7 +52,7 @@ namespace BlackCore
//! \threadsafe
bool updatedWithinLastMs(qint64 timeLastMs);
//! Network accessable?
//! Network accessible?
bool isNetworkAccessible() const;
//! Is marked as read failed
@@ -71,13 +71,8 @@ namespace BlackCore
//! \threadsafe
void pauseReader();
//! Is shutting down?
//! \threadsafe
bool isShuttingDown() const;
//! Graceful shutdown
//! \threadsafe
void gracefulShutdown();
//! Used in unit test
void markAsUsedInUnitTest() { m_unitTest = true; }
protected:
mutable QReadWriteLock m_lock {QReadWriteLock::Recursive}; //!< lock which can be used from the derived classes
@@ -95,22 +90,29 @@ namespace BlackCore
//! \threadsafe
bool didContentChange(const QString &content, int startPosition = -1);
//! \copydoc BlackMisc::CContinuousWorker::cleanup
virtual void cleanup() override;
//! Set initial and periodic times
void setInitialAndPeriodicTime(int initialTime, int periodicTime);
//! This method does the actual work in the derived class
virtual void doWorkImpl() {}
//! Still enabled etc.
bool doWorkCheck() const;
private:
//! Trigger doWorkImpl
void doWork();
int m_initialTime = -1; //!< Initial start delay
int m_periodicTime = -1; //!< Periodic time after which the task is repeated
QDateTime m_updateTimestamp; //!< when file/resource was read
uint m_contentHash = 0; //!< has of the content given
std::atomic<bool> m_markedAsFailed { false }; //!< marker if reading failed
std::atomic<bool> m_shutdown { false }; //!< marker it is shutting down
QTimer m_updateTimer { this };
int m_initialTime = -1; //!< Initial start delay
int m_periodicTime = -1; //!< Periodic time after which the task is repeated
QDateTime m_updateTimestamp; //!< when file/resource was read
uint m_contentHash = 0; //!< has of the content given
std::atomic<bool> m_markedAsFailed { false }; //!< marker if reading failed
QTimer m_updateTimer { this };
bool m_unitTest { false }; //!< mark as unit test
};
} // namespace

View File

@@ -67,14 +67,14 @@ namespace BlackCore
void CVatsimBookingReader::ps_read()
{
this->threadAssertCheck();
if (!this->doWorkCheck()) { return; }
if (!this->isNetworkAccessible())
{
CLogMessage(this).warning("No network, cannot read VATSIM bookings");
return;
}
this->threadAssertCheck();
Q_ASSERT_X(sApp, Q_FUNC_INFO, "No application");
const QUrl url(sApp->getGlobalSetup().getVatsimBookingsUrl());
if (url.isEmpty()) { return; }
@@ -90,7 +90,7 @@ namespace BlackCore
this->threadAssertCheck();
// Worker thread, make sure to write no members here od do it threadsafe
if (this->isShuttingDown())
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated booking parsing process");
@@ -135,7 +135,7 @@ namespace BlackCore
CAtcStationList bookedStations;
for (int i = 0; i < size; i++)
{
if (this->isAbandoned())
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated booking parsing process"); // for users

View File

@@ -186,12 +186,13 @@ namespace BlackCore
void CVatsimDataFileReader::ps_read()
{
this->threadAssertCheck();
if (!this->doWorkCheck()) { return; }
if (!this->isNetworkAccessible())
{
CLogMessage(this).warning("No network, cannot read VATSIM data file");
return;
}
this->threadAssertCheck();
// round robin for load balancing
// remark: Don't use QThread to run network operations in the background
@@ -209,10 +210,9 @@ namespace BlackCore
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
this->threadAssertCheck();
// Worker thread, make sure to write only synced here!
if (this->isAbandoned())
this->threadAssertCheck();
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated VATSIM file parsing process");
@@ -249,7 +249,7 @@ namespace BlackCore
QString currentLine; // declared outside of the for loop, to amortize the cost of allocation
for (const QStringRef &clRef : lines)
{
if (this->isAbandoned())
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated VATSIM file parsing process"); // for users

View File

@@ -79,13 +79,13 @@ namespace BlackCore
void CVatsimMetarReader::readMetars()
{
if (this->isAbandoned()) { return; }
this->threadAssertCheck();
if (!this->doWorkCheck()) { return; }
if (!this->isNetworkAccessible())
{
CLogMessage(this).warning("No network, cannot read METARs");
return;
}
this->threadAssertCheck();
CFailoverUrlList urls(sApp->getVatsimMetarUrls());
const CUrl url(urls.obtainNextWorkingUrl(true));
@@ -100,10 +100,9 @@ namespace BlackCore
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
this->threadAssertCheck();
// Worker thread, make sure to write thread safe!
if (this->isAbandoned())
this->threadAssertCheck();
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("terminated METAR decoding process"); // for users
@@ -126,7 +125,7 @@ namespace BlackCore
QTextStream lineReader(&metarData);
while (!lineReader.atEnd())
{
if (this->isAbandoned()) { return; }
if (!this->doWorkCheck()) { return; }
QString line = lineReader.readLine();
CMetar metar = m_metarDecoder.decode(line);
if (metar != CMetar()) { metars.push_back(metar); }
@@ -157,6 +156,5 @@ namespace BlackCore
CReaderSettings s = m_settings.get();
setInitialAndPeriodicTime(s.getInitialTime().toMs(), s.getPeriodicTime().toMs());
}
} // ns
} // ns

View File

@@ -74,6 +74,7 @@ namespace BlackCore
//! Do reading
void readMetars();
//! Reload settings
void reloadSettings();
private:

View File

@@ -47,7 +47,7 @@ namespace BlackCore
void CVatsimStatusFileReader::readInBackgroundThread()
{
bool s = QMetaObject::invokeMethod(this, "ps_read");
const bool s = QMetaObject::invokeMethod(this, "ps_read");
Q_ASSERT_X(s, Q_FUNC_INFO, "Invoke failed");
Q_UNUSED(s);
}
@@ -70,7 +70,12 @@ namespace BlackCore
void CVatsimStatusFileReader::ps_read()
{
this->threadAssertCheck();
if (!this->isNetworkAccessible()) { return; }
if (!this->doWorkCheck()) { return; }
if (!this->isNetworkAccessible())
{
CLogMessage(this).warning("No network, cannot read VATSIM status file");
return;
}
Q_ASSERT_X(sApp, Q_FUNC_INFO, "Missing application");
CFailoverUrlList urls(sApp->getGlobalSetup().getVatsimStatusFileUrls());
@@ -85,10 +90,9 @@ namespace BlackCore
// required to use delete later as object is created in a different thread
QScopedPointer<QNetworkReply, QScopedPointerDeleteLater> nwReply(nwReplyPtr);
this->threadAssertCheck();
// Worker thread, make sure to write only synced here!
if (this->isAbandoned())
this->threadAssertCheck();
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated VATSIM status file parsing process"); // for users
@@ -112,7 +116,7 @@ namespace BlackCore
QString currentLine; // declared outside of the for loop, to amortize the cost of allocation
for (const QStringRef &clRef : lines)
{
if (this->isAbandoned())
if (!this->doWorkCheck())
{
CLogMessage(this).debug() << Q_FUNC_INFO;
CLogMessage(this).info("Terminated status parsing process"); // for users

View File

@@ -715,14 +715,16 @@ namespace BlackCore
void CWebDataServices::gracefulShutdown()
{
this->disconnect(); // all signals
if (this->m_vatsimMetarReader) { this->m_vatsimMetarReader->gracefulShutdown(); }
if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->gracefulShutdown(); }
if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->gracefulShutdown(); }
if (this->m_vatsimStatusReader) { this->m_vatsimStatusReader->gracefulShutdown(); }
if (this->m_modelDataReader) { this->m_modelDataReader->gracefulShutdown(); }
if (this->m_airportDataReader) { this->m_airportDataReader->gracefulShutdown(); }
if (this->m_icaoDataReader) { this->m_icaoDataReader->gracefulShutdown(); }
if (this->m_dbInfoDataReader) { this->m_dbInfoDataReader->gracefulShutdown(); }
if (this->m_vatsimMetarReader) { this->m_vatsimMetarReader->setEnabled(false); }
if (this->m_vatsimBookingReader) { this->m_vatsimBookingReader->setEnabled(false); }
if (this->m_vatsimDataFileReader) { this->m_vatsimDataFileReader->setEnabled(false); }
if (this->m_vatsimStatusReader) { this->m_vatsimStatusReader->setEnabled(false); }
if (this->m_modelDataReader) { this->m_modelDataReader->setEnabled(false); }
if (this->m_airportDataReader) { this->m_airportDataReader->setEnabled(false); }
if (this->m_icaoDataReader) { this->m_icaoDataReader->setEnabled(false); }
if (this->m_dbInfoDataReader) { this->m_dbInfoDataReader->setEnabled(false); }
// DB writer is no threaded reader, it has a special role
if (this->m_databaseWriter) { this->m_databaseWriter->gracefulShutdown(); }
}

View File

@@ -48,7 +48,11 @@ namespace BlackCoreTest
m_airportReader(new CAirportDataReader(this, CDatabaseReaderConfigList::allDirectDbAccess())),
m_icaoReader(new CIcaoDataReader(this, CDatabaseReaderConfigList::allDirectDbAccess())),
m_modelReader(new CModelDataReader(this, CDatabaseReaderConfigList::allDirectDbAccess()))
{ }
{
m_airportReader->markAsUsedInUnitTest();
m_icaoReader->markAsUsedInUnitTest();
m_modelReader->markAsUsedInUnitTest();
}
CTestReaders::~CTestReaders()
{