Simplify CThreadedReader to avoid race conditions

refs #731
This commit is contained in:
Roland Winklmeier
2016-08-11 23:18:51 +02:00
committed by Mathew Sutcliffe
parent 6b0412d68d
commit 4733c72553
9 changed files with 86 additions and 130 deletions

View File

@@ -34,7 +34,8 @@ namespace BlackCore
CContinuousWorker(owner, name),
m_updateTimer(new QTimer(this))
{
m_toggleConnection = connect(this->m_updateTimer, &QTimer::timeout, this, &CThreadedReader::ps_toggleInterval);
connect(m_updateTimer, &QTimer::timeout, this, &CThreadedReader::doWork);
m_updateTimer->setSingleShot(true);
}
CThreadedReader::~CThreadedReader()
@@ -67,12 +68,6 @@ namespace BlackCore
return delta <= timeLastMs;
}
void CThreadedReader::requestReload()
{
// default implementation, subclasses shall override as required
this->initialize();
}
bool CThreadedReader::isNetworkAvailable() const
{
static const bool nw = CNetworkUtils::hasConnectedInterface();
@@ -88,29 +83,15 @@ namespace BlackCore
}
}
void CThreadedReader::setInterval(int updatePeriodMs)
void CThreadedReader::startReader()
{
Q_ASSERT(this->m_updateTimer);
QTimer::singleShot(0, this, [this, updatePeriodMs]
{
QWriteLocker wl(&this->m_lock);
if (updatePeriodMs < 1)
{
this->m_updateTimer->stop();
}
else {
this->m_updateTimer->start(updatePeriodMs);
}
});
Q_ASSERT(m_initialTime > 0);
QTimer::singleShot(m_initialTime, this, [=] { this->doWork(); });
}
void CThreadedReader::restartTimer(bool onlyWhenActive)
void CThreadedReader::pauseReader()
{
const int intervalMs(this->interval());
if (!onlyWhenActive || this->isTimerActive())
{
this->setInterval(intervalMs);
}
QTimer::singleShot(0, m_updateTimer, &QTimer::stop);
}
bool CThreadedReader::didContentChange(const QString &content, int startPosition)
@@ -129,24 +110,6 @@ namespace BlackCore
return true;
}
void CThreadedReader::ps_toggleInterval()
{
disconnect(this->m_toggleConnection);
this->setPeriodicTime();
}
int CThreadedReader::interval() const
{
QReadLocker rl(&this->m_lock);
return this->m_updateTimer->interval();
}
bool CThreadedReader::isTimerActive() const
{
QReadLocker rl(&this->m_lock);
return this->m_updateTimer->isActive();
}
bool CThreadedReader::isMarkedAsFailed() const
{
return this->m_markedAsFailed;
@@ -157,33 +120,30 @@ namespace BlackCore
this->m_markedAsFailed = failed;
}
void CThreadedReader::setIntervalFromSettingsAndStart()
{
this->setInitialTime();
}
void CThreadedReader::threadAssertCheck() const
{
Q_ASSERT_X(QCoreApplication::instance()->thread() != QThread::currentThread(), Q_FUNC_INFO, "Needs to run in own thread");
Q_ASSERT_X(QObject::thread() == QThread::currentThread(), Q_FUNC_INFO, "Wrong object thread");
}
CReaderSettings CThreadedReader::getSettings() const
void CThreadedReader::setInitialAndPeriodicTime(int initialTime, int periodicTime)
{
return CReaderSettings::neverUpdateSettings();
m_initialTime = initialTime;
m_periodicTime = periodicTime;
if (m_updateTimer->isActive())
{
int oldPeriodicTime = m_updateTimer->interval();
int delta = m_periodicTime - oldPeriodicTime + m_updateTimer->remainingTime();
m_updateTimer->start(qMax(delta, 0));
}
}
void CThreadedReader::setInitialTime()
void CThreadedReader::doWork()
{
const CReaderSettings s = this->getSettings();
const int ms = s.getInitialTime().toMs();
this->setInterval(s.isNeverUpdate() ? -1 : ms);
if (isFinished()) { return; }
doWorkImpl();
Q_ASSERT(m_periodicTime > 0);
m_updateTimer->start(m_periodicTime);
}
void CThreadedReader::setPeriodicTime()
{
const CReaderSettings s = this->getSettings();
const int ms = s.getPeriodicTime().toMs();
this->setInterval(s.isNeverUpdate() ? -1 : ms);
}
} // namespace

View File

@@ -52,21 +52,9 @@ namespace BlackCore
//! \threadsafe
bool updatedWithinLastMs(qint64 timeLastMs);
//! Request new reading
//! \note override as required, default is to call initialize()
virtual void requestReload();
//! Network available
bool isNetworkAvailable() const;
//! Get the timer interval (ms)
//! \threadsafe
int interval() const;
//! Is timer running
//! \threadsafe
bool isTimerActive() const;
//! Is marked as read failed
//! \threadsafe
bool isMarkedAsFailed() const;
@@ -75,8 +63,13 @@ namespace BlackCore
//! \threadsafe
void setMarkedAsFailed(bool failed);
//! Set inverval from settings and start
void setIntervalFromSettingsAndStart();
//! Starts the reader
//! \threadsafe
void startReader();
//! Pauses the reader
//! \threadsafe
void pauseReader();
public slots:
//! Graceful shutdown
@@ -84,7 +77,6 @@ namespace BlackCore
void gracefulShutdown();
protected:
QTimer *m_updateTimer = nullptr; //!< update timer
mutable QReadWriteLock m_lock {QReadWriteLock::Recursive}; //!< lock which can be used from the derived classes
//! Constructor
@@ -96,37 +88,25 @@ namespace BlackCore
//! Make sure everthing runs correctly in own thread
void threadAssertCheck() const;
//! Get settings, default implementation returns BlackCore::Settings::CReaderSettings::neverUpdateSettings
virtual BlackCore::Vatsim::CReaderSettings getSettings() const;
//! Set initial time
void setInitialTime();
//! Set periodic time
void setPeriodicTime();
//! Set the update time
//! \param updatePeriodMs <=0 stops the timer
//! \threadsafe
void setInterval(int updatePeriodMs);
//! Restart timer
//! \threadsafe
void restartTimer(bool onlyWhenActive = false);
//! Stores new content hash and returns if content changed (based on hash value
//! \threadsafe
bool didContentChange(const QString &content, int startPosition = -1);
//! Set initial and periodic times
void setInitialAndPeriodicTime(int initialTime, int periodicTime);
//! This method does the actual work in the derived class
virtual void doWorkImpl() {}
private:
void doWork();
int m_initialTime = -1; //!< Initial start delay
int m_periodicTime = -1; //!< Periodic time after which the task is repeated
QDateTime m_updateTimestamp; //!< when file/resource was read
uint m_contentHash = 0; //!< has of the content given
std::atomic<bool> m_markedAsFailed { false }; //!< marker if reading failed
QMetaObject::Connection m_toggleConnection; //!< connection to switch interval from initial to periodic
private slots:
//! switch from initial to periodic
void ps_toggleInterval();
QTimer *m_updateTimer = nullptr;
};
} // namespace

View File

@@ -45,7 +45,7 @@ namespace BlackCore
CVatsimBookingReader::CVatsimBookingReader(QObject *owner) :
CThreadedReader(owner, "CVatsimBookingReader")
{
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::ps_read);
reloadSettings();
}
void CVatsimBookingReader::readInBackgroundThread()
@@ -60,22 +60,20 @@ namespace BlackCore
// void
}
CReaderSettings CVatsimBookingReader::getSettings() const
void CVatsimBookingReader::doWorkImpl()
{
return this->m_settings.get();
ps_read();
}
void CVatsimBookingReader::ps_read()
{
if (!this->isNetworkAvailable())
{
CLogMessage(this).warning("No network, cancel bookings reader");
this->m_updateTimer->stop();
CLogMessage(this).warning("No network, cannot read VATSIM bookings");
return;
}
this->threadAssertCheck();
this->restartTimer(true); // when timer active, restart so we cause no undesired reads
Q_ASSERT_X(sApp, Q_FUNC_INFO, "No application");
const QUrl url(sApp->getGlobalSetup().getVatsimBookingsUrl());
@@ -122,7 +120,6 @@ namespace BlackCore
if (this->getUpdateTimestamp() == updateTimestamp) return; // nothing to do
// save parsing and all follow up actions if nothing changed
this->restartTimer(); // do not consider time for reading
bool changed = this->didContentChange(xmlData, xmlData.indexOf("</timestamp>"));
if (!changed)
{
@@ -203,5 +200,12 @@ namespace BlackCore
emit this->dataRead(CEntityFlags::BookingEntity, CEntityFlags::ReadFailed, 0);
}
} // method
void CVatsimBookingReader::reloadSettings()
{
CReaderSettings s = m_settings.get();
setInitialAndPeriodicTime(s.getInitialTime().toMs(), s.getPeriodicTime().toMs());
}
} // ns
} // ns

View File

@@ -48,7 +48,7 @@ namespace BlackCore
//! \name BlackCore::CThreadedReader overrides
//! @{
virtual void cleanup() override;
virtual BlackCore::Vatsim::CReaderSettings getSettings() const override;
virtual void doWorkImpl() override;
//! @}
private slots:
@@ -60,7 +60,9 @@ namespace BlackCore
void ps_read();
private:
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimBookings> m_settings { this };
void reloadSettings();
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimBookings> m_settings { this, &CVatsimBookingReader::reloadSettings };
};
} // ns
} // ns

View File

@@ -60,7 +60,7 @@ namespace BlackCore
CVatsimDataFileReader::CVatsimDataFileReader(QObject *owner) :
CThreadedReader(owner, "CVatsimDataFileReader")
{
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::ps_read);
reloadSettings();
}
CSimulatedAircraftList CVatsimDataFileReader::getAircraft() const
@@ -178,22 +178,19 @@ namespace BlackCore
// void
}
CReaderSettings CVatsimDataFileReader::getSettings() const
void CVatsimDataFileReader::doWorkImpl()
{
return this->m_settings.get();
ps_read();
}
void CVatsimDataFileReader::ps_read()
{
if (!this->isNetworkAvailable())
{
CLogMessage(this).warning("No network, cancel data file reader");
this->m_updateTimer->stop();
CLogMessage(this).warning("No network, cannot read VATSIM data file");
return;
}
this->threadAssertCheck();
this->restartTimer(true); // when timer active, restart so we cause no undesired reads
// round robin for load balancing
// remark: Don't use QThread to run network operations in the background
@@ -228,7 +225,6 @@ namespace BlackCore
nwReply->close(); // close asap
if (dataFileData.isEmpty()) { return; }
this->restartTimer(); // do not consider time for reading
if (!this->didContentChange(dataFileData)) // Quick check by hash
{
CLogMessage(this).info("VATSIM file has same content, skipped");
@@ -440,6 +436,12 @@ namespace BlackCore
}
}
void CVatsimDataFileReader::reloadSettings()
{
CReaderSettings s = m_settings.get();
setInitialAndPeriodicTime(s.getInitialTime().toMs(), s.getPeriodicTime().toMs());
}
const QMap<QString, QString> CVatsimDataFileReader::clientPartsToMap(const QString &currentLine, const QStringList &clientSectionAttributes)
{
QMap<QString, QString> parts;

View File

@@ -128,7 +128,7 @@ namespace BlackCore
//! \name BlackCore::CThreadedReader overrides
//! @{
virtual void cleanup() override;
virtual BlackCore::Vatsim::CReaderSettings getSettings() const override;
virtual void doWorkImpl() override;
//! @}
private slots:
@@ -139,10 +139,12 @@ namespace BlackCore
void ps_read();
private:
void reloadSettings();
BlackMisc::Aviation::CAtcStationList m_atcStations;
BlackMisc::Simulation::CSimulatedAircraftList m_aircraft;
BlackMisc::CData<BlackCore::Data::TVatsimSetup> m_lastGoodSetup { this };
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimDataFile> m_settings { this };
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimDataFile> m_settings { this, &CVatsimDataFileReader::reloadSettings };
QMap<BlackMisc::Aviation::CCallsign, BlackMisc::Network::CVoiceCapabilities> m_voiceCapabilities;
//! Split line and assign values to their corresponding attribute names

View File

@@ -41,7 +41,7 @@ namespace BlackCore
CVatsimMetarReader::CVatsimMetarReader(QObject *owner) :
CThreadedReader(owner, "CVatsimMetarReader")
{
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimMetarReader::readMetars);
reloadSettings();
}
void CVatsimMetarReader::readInBackgroundThread()
@@ -72,9 +72,9 @@ namespace BlackCore
// void
}
CReaderSettings CVatsimMetarReader::getSettings() const
void CVatsimMetarReader::doWorkImpl()
{
return m_settings.get();
readMetars();
}
void CVatsimMetarReader::readMetars()
@@ -82,12 +82,10 @@ namespace BlackCore
if (this->isAbandoned()) { return; }
if (!this->isNetworkAvailable())
{
CLogMessage(this).warning("No network, cancel METAR reader");
this->m_updateTimer->stop();
CLogMessage(this).warning("No network, cannot read METARs");
return;
}
this->threadAssertCheck();
this->restartTimer(true); // when timer active, restart so we cause no undesired reads
CFailoverUrlList urls(sApp->getVatsimMetarUrls());
const CUrl url(urls.obtainNextWorkingUrl(true));
@@ -117,7 +115,6 @@ namespace BlackCore
QString metarData = nwReply->readAll();
nwReply->close(); // close asap
this->restartTimer(); // do not consider time for reading
if (!this->didContentChange(metarData)) // Quick check by hash
{
CLogMessage(this).info("METAR file has same content, skipped");
@@ -154,5 +151,12 @@ namespace BlackCore
emit dataRead(CEntityFlags::MetarEntity, CEntityFlags::ReadFailed, 0);
}
} // method
void CVatsimMetarReader::reloadSettings()
{
CReaderSettings s = m_settings.get();
setInitialAndPeriodicTime(s.getInitialTime().toMs(), s.getPeriodicTime().toMs());
}
} // ns
} // ns

View File

@@ -63,7 +63,7 @@ namespace BlackCore
//! \name BlackCore::CThreadedReader overrides
//! @{
virtual void cleanup() override;
virtual BlackCore::Vatsim::CReaderSettings getSettings() const override;
virtual void doWorkImpl() override;
//! @}
private:
@@ -74,10 +74,12 @@ namespace BlackCore
//! Do reading
void readMetars();
void reloadSettings();
private:
BlackMisc::Weather::CMetarDecoder m_metarDecoder;
BlackMisc::Weather::CMetarList m_metars;
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimMetars> m_settings { this };
BlackMisc::CSettingReadOnly<BlackCore::Vatsim::TVatsimMetars> m_settings { this, &CVatsimMetarReader::reloadSettings };
};
} // ns
} // ns

View File

@@ -605,7 +605,7 @@ namespace BlackCore
Q_ASSERT_X(c, Q_FUNC_INFO, "connect failed bookings");
this->m_entitiesPeriodicallyRead |= CEntityFlags::BookingEntity;
this->m_vatsimBookingReader->start(QThread::LowPriority);
this->m_vatsimBookingReader->setIntervalFromSettingsAndStart();
this->m_vatsimBookingReader->startReader();
}
// 4. VATSIM data file
@@ -618,7 +618,7 @@ namespace BlackCore
Q_ASSERT_X(c, Q_FUNC_INFO, "connect failed VATSIM data file");
this->m_entitiesPeriodicallyRead |= CEntityFlags::VatsimDataFile;
this->m_vatsimDataFileReader->start(QThread::LowPriority);
this->m_vatsimDataFileReader->setIntervalFromSettingsAndStart();
this->m_vatsimDataFileReader->startReader();
}
// 5. VATSIM metar data
@@ -631,7 +631,7 @@ namespace BlackCore
Q_ASSERT_X(c, Q_FUNC_INFO, "connect failed VATSIM METAR");
this->m_entitiesPeriodicallyRead |= CEntityFlags::MetarEntity;
this->m_vatsimMetarReader->start(QThread::LowPriority);
this->m_vatsimMetarReader->setIntervalFromSettingsAndStart();
this->m_vatsimMetarReader->startReader();
}
// 6. ICAO data reader