refs #287 Thread safety, fix "memory access violation"

* some improved comments and information
* QMetaObject::invokeMethod in tool.cpp as thread safe invocation
* common base class for threaded readers
* removed event class, using QMetaObject::invoke instead for forcing calls in main event loop
* stop methods for readers, as used for graceful shutdown (preparing for thread safe destruction of objects)
* graceful shutdown for network context
* calls in tool now via inkoke for thread safety (only thread safe methods called directly)
This commit is contained in:
Klaus Basan
2014-07-01 19:11:25 +02:00
parent c03d45123d
commit d9a1c0cc8c
16 changed files with 522 additions and 229 deletions

View File

@@ -7,6 +7,9 @@
#include "blackmisc/project.h"
#include "blackmisc/indexvariantmap.h"
// KB_REMOVE with debug log message
#include <QThread>
namespace BlackCore
{
@@ -304,6 +307,9 @@ namespace BlackCore
void CAirspaceMonitor::receivedBookings(const CAtcStationList &bookedStations)
{
// KB_REMOVE qDebug
qDebug() << Q_FUNC_INFO << QThread::currentThreadId();
this->m_atcStationsBooked.clear();
foreach(CAtcStation bookedStation, bookedStations)
{

View File

@@ -1,7 +1,6 @@
#include "blackcore/context_application.h"
#include "blackcore/context_application_impl.h"
#include "blackcore/context_application_proxy.h"
#include "blackcore/context_application_event.h"
#include "blackmisc/statusmessage.h"
#include <QCoreApplication>
#include <QThread>
@@ -24,11 +23,29 @@ namespace BlackCore
case CRuntimeConfig::Remote:
return new BlackCore::CContextApplicationProxy(BlackCore::CDBusServer::ServiceName, conn, mode, parent);
default:
qFatal("Always initialize an application context");
qFatal("Always initialize an application context!");
return nullptr;
}
}
IContextApplication::RedirectionLevel IContextApplication::getOutputRedirectionLevel() const
{
QReadLocker(&this->m_lock);
return this->m_outputRedirectionLevel;
}
void IContextApplication::setOutputRedirectionLevel(IContextApplication::RedirectionLevel redirectionLevel)
{
QWriteLocker(&this->m_lock);
this->m_outputRedirectionLevel = redirectionLevel;
}
IContextApplication::RedirectionLevel IContextApplication::getStreamingForRedirectedOutputLevel() const
{
QReadLocker(&this->m_lock);
return this->m_redirectedOutputRedirectionLevel;
}
/*
* Constructor
*/
@@ -36,7 +53,9 @@ namespace BlackCore
CContext(mode, runtime), m_outputRedirectionLevel(IContextApplication::RedirectNone), m_redirectedOutputRedirectionLevel(IContextApplication::RedirectNone)
{
if (IContextApplication::s_contexts.isEmpty())
{
IContextApplication::s_oldHandler = qInstallMessageHandler(IContextApplication::messageHandlerDispatch);
}
IContextApplication::s_contexts.append(this);
}
@@ -45,24 +64,13 @@ namespace BlackCore
*/
void IContextApplication::setStreamingForRedirectedOutputLevel(RedirectionLevel redirectionLevel)
{
QWriteLocker(&this->m_lock);
disconnect(this, &IContextApplication::redirectedOutput, this, &IContextApplication::streamRedirectedOutput);
if (redirectionLevel != RedirectNone)
connect(this, &IContextApplication::redirectedOutput, this, &IContextApplication::streamRedirectedOutput);
this->m_redirectedOutputRedirectionLevel = redirectionLevel;
}
/*
* Process event in object's thread, used to emit signal from other thread
*/
bool IContextApplication::event(QEvent *event)
{
if (event->type() == CApplicationEvent::eventType())
{
CApplicationEvent *e = static_cast<CApplicationEvent *>(event);
emit this->redirectedOutput(e->m_message, this->getUniqueId());
return true;
connect(this, &IContextApplication::redirectedOutput, this, &IContextApplication::streamRedirectedOutput);
}
return CContext::event(event);
this->m_redirectedOutputRedirectionLevel = redirectionLevel;
}
/*
@@ -93,24 +101,25 @@ namespace BlackCore
void IContextApplication::messageHandler(QtMsgType type, const QMessageLogContext &messageContext, const QString &message)
{
Q_UNUSED(messageContext);
if (this->m_outputRedirectionLevel == RedirectNone) return;
RedirectionLevel outputRedirectionLevel = this->getOutputRedirectionLevel(); // local copy, thready safety
if (outputRedirectionLevel == RedirectNone) return;
CStatusMessage m(CStatusMessage::TypeStdoutRedirect, CStatusMessage::SeverityInfo, message);
switch (type)
{
case QtDebugMsg:
if (this->m_outputRedirectionLevel != RedirectAllOutput) return;
if (outputRedirectionLevel != RedirectAllOutput) return;
break;
case QtWarningMsg:
if (this->m_outputRedirectionLevel == RedirectAllOutput) return;
if (this->m_outputRedirectionLevel == RedirectError) return;
if (outputRedirectionLevel == RedirectAllOutput) return;
if (outputRedirectionLevel == RedirectError) return;
m.setSeverity(CStatusMessage::SeverityWarning);
break;
case QtCriticalMsg:
if (this->m_outputRedirectionLevel != RedirectError) return;
if (outputRedirectionLevel != RedirectError) return;
m.setSeverity(CStatusMessage::SeverityError);
break;
case QtFatalMsg:
if (this->m_outputRedirectionLevel != RedirectError) return;
if (m_outputRedirectionLevel != RedirectError) return;
m.setSeverity(CStatusMessage::SeverityError);
break;
default:
@@ -125,11 +134,9 @@ namespace BlackCore
}
else
{
// different threads, use event.
// in this event the same redirect as above will emitted, only that this is then
// done in the same thread as the parent object
CApplicationEvent *e = new CApplicationEvent(m, this->getUniqueId());
QCoreApplication::postEvent(this, e);
// Different threads, use invoke so that is called in "main / object's thread"
// Otherwise for DBus: QtDBus: cannot relay signals from parent BlackCore::CContextApplication(0x4b4358 "") unless they are emitted in the object's thread QThread(0x4740b0 ""). Current thread is QThread(0x4b5530 "Thread (pooled)")
QMetaObject::invokeMethod(this, "redirectedOutput", Q_ARG(BlackMisc::CStatusMessage, m), Q_ARG(qint64, this->getUniqueId()));
}
}
@@ -139,22 +146,22 @@ namespace BlackCore
void IContextApplication::streamRedirectedOutput(const CStatusMessage &message, qint64 contextId)
{
if (this->getUniqueId() == contextId) return; // avoid infinite output
if (this->m_redirectedOutputRedirectionLevel == RedirectNone) return;
RedirectionLevel redirectedOutputRedirectionLevel = this->getStreamingForRedirectedOutputLevel(); // local copy
if (message.isEmpty()) return;
switch (message.getSeverity())
{
case CStatusMessage::SeverityInfo:
if (this->m_redirectedOutputRedirectionLevel != RedirectAllOutput) return;
if (redirectedOutputRedirectionLevel != RedirectAllOutput) return;
qDebug() << message.getMessage();
break;
case CStatusMessage::SeverityWarning:
if (this->m_redirectedOutputRedirectionLevel == RedirectAllOutput) return;
if (this->m_redirectedOutputRedirectionLevel == RedirectError) return;
if (redirectedOutputRedirectionLevel == RedirectAllOutput) return;
if (redirectedOutputRedirectionLevel == RedirectError) return;
qWarning() << message.getMessage();
break;
case CStatusMessage::SeverityError:
if (this->m_redirectedOutputRedirectionLevel != RedirectError) return;
if (redirectedOutputRedirectionLevel != RedirectError) return;
qCritical() << message.getMessage();
break;
}

View File

@@ -9,6 +9,7 @@
#include "blackcore/context.h"
#include "blackmisc/statusmessagelist.h"
#include <QObject>
#include <QReadWriteLock>
#define BLACKCORE_CONTEXTAPPLICATION_INTERFACENAME "net.vatsim.PilotClient.BlackCore.ContextApplication"
#define BLACKCORE_CONTEXTAPPLICATION_OBJECTPATH "/Application"
@@ -16,7 +17,7 @@
namespace BlackCore
{
/*!
* \brief Application context interface
* Application context interface
*/
class IContextApplication : public CContext
{
@@ -76,20 +77,21 @@ namespace BlackCore
virtual ~IContextApplication() {}
//! Output redirection (redirect my output)
RedirectionLevel getOutputRedirectionLevel() const { return this->m_outputRedirectionLevel; }
//! \threadsafe
RedirectionLevel getOutputRedirectionLevel() const;
//! Output redirection (redirect my output)
void setOutputRedirectionLevel(RedirectionLevel redirectionLevel) { this->m_outputRedirectionLevel = redirectionLevel; }
//! \threadsafe
void setOutputRedirectionLevel(RedirectionLevel redirectionLevel);
//! Redirected output generated by others
RedirectionLevel getStreamingForRedirectedOutputLevel() const { return this->m_redirectedOutputRedirectionLevel; }
//! \threadsafe
RedirectionLevel getStreamingForRedirectedOutputLevel() const;
//! Redirected output generated by others
//! \threadsafe
void setStreamingForRedirectedOutputLevel(RedirectionLevel redirectionLevel) ;
//! Process event, cross thread messages
bool event(QEvent *event) override;
//! Reset output redirection
static void resetOutputRedirection();
@@ -144,13 +146,16 @@ namespace BlackCore
static QtMessageHandler s_oldHandler;
//! Message handler, handles one individual context
//! \threadsafe
void messageHandler(QtMsgType type, const QMessageLogContext &messageContext, const QString &messsage);
//! Handle output dispatch, handles all contexts
//! \remarks Can be called in thread, has to be thread safe
static void messageHandlerDispatch(QtMsgType type, const QMessageLogContext &messageContext, const QString &message);
RedirectionLevel m_outputRedirectionLevel; //!< enable / disable my output
RedirectionLevel m_redirectedOutputRedirectionLevel; //!< enable / disable others output
RedirectionLevel m_redirectedOutputRedirectionLevel; //!< enable / disable others' output
mutable QReadWriteLock m_lock; //!< thread safety
private slots:
//! Re-stream the redirected output

View File

@@ -1,36 +0,0 @@
#ifndef BLACKCORE_CONTEXT_APPLICATION_EVENT_H
#define BLACKCORE_CONTEXT_APPLICATION_EVENT_H
#include "blackcore/context_application.h"
#include <QEvent>
namespace BlackCore
{
/*!
* \brief Event to allow cross thread output redirection
*/
class CApplicationEvent : public QEvent
{
friend class IContextApplication;
public:
//! Constructor
CApplicationEvent(const BlackMisc::CStatusMessage &msg, qint64 contextId) :
QEvent(eventType()), m_message(msg), m_contextId(contextId) {}
//! Destructor
virtual ~CApplicationEvent() {}
//! Event type
static const QEvent::Type &eventType()
{
const static QEvent::Type t = static_cast<QEvent::Type>(QEvent::registerEventType());
return t;
}
private:
BlackMisc::CStatusMessage m_message;
qint64 m_contextId;
};
} // namespace
#endif // guard

View File

@@ -6,6 +6,8 @@
#ifndef BLACKCORE_CONTEXTNETWORK_H
#define BLACKCORE_CONTEXTNETWORK_H
//! \file
#include "blackcore/context.h"
#include "blackmisc/avallclasses.h"
#include "blackmisc/statusmessage.h"
@@ -22,7 +24,7 @@
namespace BlackCore
{
//! \brief Network context proxy
//! Network context proxy
class IContextNetwork : public CContext
{
Q_OBJECT

View File

@@ -84,6 +84,16 @@ namespace BlackCore
*/
CContextNetwork::~CContextNetwork()
{
this->gracefulShutdown();
}
/*
* Stop, going down
*/
void CContextNetwork::gracefulShutdown()
{
if (this->m_vatsimBookingReader) this->m_vatsimBookingReader->stop();
if (this->m_vatsimDataFileReader) this->m_vatsimDataFileReader->stop();
if (this->isConnected()) this->disconnectFromNetwork();
}

View File

@@ -6,6 +6,8 @@
#ifndef BLACKCORE_CONTEXTNETWORK_IMPL_H
#define BLACKCORE_CONTEXTNETWORK_IMPL_H
//! \file
#include "blackcore/context_network.h"
#include "blackcore/context_settings.h"
#include "blackcore/context_runtime.h"
@@ -118,6 +120,9 @@ namespace BlackCore
//! \copydoc IContextNetwork::requestAtisUpdates
virtual void requestAtisUpdates() override;
//! Gracefully shut down, e.g. for thread safety
void gracefulShutdown();
protected:
//! Constructor, with link to runtime
CContextNetwork(CRuntimeConfig::ContextMode, CRuntime *runtime);

View File

@@ -1,4 +1,3 @@
#include "blackcore/context_runtime.h"
#include "blackcore/context_all_impl.h"
#include "blackcore/context_all_proxies.h"
#include "blackcore/blackcorefreefunctions.h"
@@ -9,6 +8,7 @@
#include "blackmisc/statusmessagelist.h"
#include "blackmisc/avaircraft.h"
#include "blackmisc/blackmiscfreefunctions.h"
#include "blackcore/context_runtime.h"
#include <QDebug>
@@ -58,6 +58,7 @@ namespace BlackCore
*/
bool CRuntime::signalLogForApplication(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogApplication) return enabled;
if (!this->getIContextApplication())
{
@@ -88,6 +89,7 @@ namespace BlackCore
bool CRuntime::signalLogForAudio(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogAudio) return enabled;
if (!this->getIContextNetwork())
{
@@ -114,6 +116,7 @@ namespace BlackCore
*/
bool CRuntime::signalLogForNetwork(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogNetwork) return enabled;
if (!this->getIContextNetwork())
{
@@ -152,6 +155,7 @@ namespace BlackCore
*/
bool CRuntime::signalLogForOwnAircraft(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogOwnAircraft) return enabled;
if (!this->getIContextOwnAircraft())
{
@@ -188,6 +192,7 @@ namespace BlackCore
*/
bool CRuntime::signalLogForSettings(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogSettings) return enabled;
if (!this->getIContextSettings())
{
@@ -214,6 +219,7 @@ namespace BlackCore
*/
bool CRuntime::signalLogForSimulator(bool enabled)
{
QWriteLocker wl(&m_lock);
if (enabled == this->m_signalLogSimulator) return enabled;
if (!this->getIContextSimulator())
{
@@ -285,12 +291,12 @@ namespace BlackCore
switch (context)
{
default: return true;
case LogForApplication: return this->m_slotLogApplication;
case LogForAudio: return this->m_slotLogAudio;
case LogForNetwork: return this->m_slotLogNetwork;
case LogForOwnAircraft: return this->m_slotLogOwnAircraft;
case LogForSettings: return this->m_slotLogSettings;
case LogForSimulator: return this->m_slotLogSimulator;
case LogForApplication: return this->isSlotLogForApplicationEnabled();
case LogForAudio: return this->isSlotLogForAudioEnabled();
case LogForNetwork: return this->isSlotLogForNetworkEnabled();
case LogForOwnAircraft: return this->isSlotLogForOwnAircraftEnabled();
case LogForSettings: return this->isSlotLogForSettingsEnabled();
case LogForSimulator: return this->isSlotLogForSimulatorEnabled();
}
}
@@ -496,6 +502,10 @@ namespace BlackCore
{
disconnect(this->getIContextNetwork());
this->getIContextNetwork()->disconnectFromNetwork();
if (this->m_contextNetwork->usingLocalObjects())
{
this->getCContextNetwork()->gracefulShutdown(); // for threads
}
this->getIContextNetwork()->deleteLater();
this->m_contextNetwork = nullptr;
}
@@ -663,8 +673,6 @@ namespace BlackCore
return static_cast<CContextNetwork *>(this->m_contextNetwork);
}
CContextOwnAircraft *CRuntime::getCContextOwnAircraft()
{
Q_ASSERT_X(!this->m_contextOwnAircraft || this->m_contextOwnAircraft->usingLocalObjects(), "CCoreRuntime", "Cannot downcast to local object");

View File

@@ -6,6 +6,7 @@
#include <QDBusConnection>
#include <QObject>
#include <QMultiMap>
#include <QReadWriteLock>
using namespace BlackMisc;
@@ -58,66 +59,135 @@ namespace BlackCore
const QDBusConnection &getDBusConnection() const { return this->m_dbusConnection; }
//! Enable / disable all logging
//! \threadsafe
void signalLog(bool enabled);
//! Signal logging for application context
//! \threadsafe
bool signalLogForApplication(bool enabled);
//! Signal logging for audio context
//! \threadsafe
bool signalLogForAudio(bool enabled);
//! Signal logging for network context
//! \threadsafe
bool signalLogForNetwork(bool enabled);
//! Signal logging for own aircraft context
//! \threadsafe
bool signalLogForOwnAircraft(bool enabled);
//! Signal logging for settings context
//! \threadsafe
bool signalLogForSettings(bool enabled);
//! Signal logging for simulator context
//! \threadsafe
bool signalLogForSimulator(bool enabled);
//! Enable / disable all logging
//! \threadsafe
void slotLog(bool enabled);
//! Slot logging for application context
void slotLogForApplication(bool enabled) { this->m_slotLogApplication = enabled; }
//! \threadsafe
void slotLogForApplication(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogApplication = enabled;
}
//! Slot logging for audio context
void slotLogForAudio(bool enabled) { this->m_slotLogAudio = enabled; }
//! \threadsafe
void slotLogForAudio(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogAudio = enabled;
}
//! Slot logging for network context
void slotLogForNetwork(bool enabled) { this->m_slotLogNetwork = enabled; }
//! \threadsafe
void slotLogForNetwork(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogNetwork = enabled;
}
//! Slot logging for own aircraft context
void slotLogForOwnAircraft(bool enabled) { this->m_slotLogOwnAircraft = enabled; }
//! \threadsafe
void slotLogForOwnAircraft(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogOwnAircraft = enabled;
}
//! Slot logging for settings context
void slotLogForSettings(bool enabled) { this->m_slotLogSettings = enabled; }
//! \threadsafe
void slotLogForSettings(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogSettings = enabled;
}
//! Slot logging for simulator context
void slotLogForSimulator(bool enabled) { this->m_slotLogSimulator = enabled; }
//! \threadsafe
void slotLogForSimulator(bool enabled)
{
QWriteLocker wl(&m_lock);
this->m_slotLogSimulator = enabled;
}
//! Slot logging for application context
bool isSlotLogForApplicationEnabled() const { return this->m_slotLogApplication; }
//! \threadsafe
bool isSlotLogForApplicationEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogApplication;
}
//! Slot logging for audio context
bool isSlotLogForAudioEnabled() const { return this->m_slotLogAudio; }
//! \threadsafe
bool isSlotLogForAudioEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogAudio;
}
//! Slot logging for network context
bool isSlotLogForNetworkEnabled() const { return this->m_slotLogNetwork; }
//! \threadsafe
bool isSlotLogForNetworkEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogNetwork;
}
//! Slot log for own aircraft
bool isSlotLogForOwnAircraftEnabled() const { return this->m_slotLogOwnAircraft; }
//! \threadsafe
bool isSlotLogForOwnAircraftEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogOwnAircraft;
}
//! Slot logging for settings context
bool isSlotLogForSettingsEnabled() const { return this->m_slotLogSettings; }
//! \threadsafe
bool isSlotLogForSettingsEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogSettings;
}
//! Slot logging for simulator context
bool isSlotLogForSimulatorEnabled() const { return this->m_slotLogSimulator; }
//! \threadsafe
bool isSlotLogForSimulatorEnabled() const
{
QReadLocker rl(&m_lock);
return this->m_slotLogSimulator;
}
//! Slot logging for specified context
//! \threadsafe
bool isSlotLogEnabledFor(LogContext context) const;
//! Slot logging
@@ -266,6 +336,7 @@ namespace BlackCore
IContextSettings *m_contextSettings;
IContextSimulator *m_contextSimulator;
QMultiMap<QString, QMetaObject::Connection> m_logSignalConnections;
mutable QReadWriteLock m_lock;
//! initialization of DBus connection (where applicable)
void initDBusConnection(const QString &address);

View File

@@ -12,11 +12,11 @@ using namespace BlackMisc::Network;
namespace BlackCore
{
CVatsimBookingReader::CVatsimBookingReader(const QString &url, QObject *parent) : QObject(parent), m_serviceUrl(url), m_networkManager(nullptr), m_updateTimer(nullptr)
CVatsimBookingReader::CVatsimBookingReader(const QString &url, QObject *parent) :
QObject(parent), CThreadedReader(),
m_serviceUrl(url), m_networkManager(nullptr)
{
this->m_networkManager = new QNetworkAccessManager(this);
this->m_updateTimer = new QTimer(this);
this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimBookingReader::loadFinished);
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimBookingReader::read);
}
@@ -27,16 +27,8 @@ namespace BlackCore
if (url.isEmpty()) return;
Q_ASSERT(this->m_networkManager);
QNetworkRequest request(url);
this->m_networkManager->get(request);
}
void CVatsimBookingReader::setInterval(int updatePeriodMs)
{
Q_ASSERT(this->m_updateTimer);
if (updatePeriodMs < 1)
this->m_updateTimer->stop();
else
this->m_updateTimer->start(updatePeriodMs);
QNetworkReply *reply = this->m_networkManager->get(request);
this->setPendingNetworkReply(reply);
}
/*
@@ -44,7 +36,12 @@ namespace BlackCore
*/
void CVatsimBookingReader::loadFinished(QNetworkReply *nwReply)
{
QtConcurrent::run(this, &CVatsimBookingReader::parseBookings, nwReply);
this->setPendingNetworkReply(nullptr);
if (!this->isStopped())
{
QFuture<void> f = QtConcurrent::run(this, &CVatsimBookingReader::parseBookings, nwReply);
this->setPendingFuture(f);
}
}
/*
@@ -52,30 +49,33 @@ namespace BlackCore
*/
void CVatsimBookingReader::parseBookings(QNetworkReply *nwReply)
{
// Worker thread, make sure to write no members here!
if (this->isStopped())
{
qDebug() << "terminated" << Q_FUNC_INFO;
return; // stop, terminate straight away, ending thread
}
if (nwReply->error() == QNetworkReply::NoError)
{
static const QString timestampFormat("yyyy-MM-dd HH:mm:ss");
QString xmlData = nwReply->readAll();
QDomDocument doc;
QDateTime updateTimestamp = QDateTime::currentDateTimeUtc();
if (doc.setContent(xmlData))
{
QDomNode timestamp = doc.elementsByTagName("timestamp").at(0);
QString ts = timestamp.toElement().text().trimmed();
Q_ASSERT(!ts.isEmpty());
if (ts.isEmpty())
{
// fallback
m_updateTimestamp = QDateTime::currentDateTimeUtc();
}
else
if (!ts.isEmpty())
{
// normally the timestamp is always updated from backend
// if this changes in the future we're prepared
QDateTime fileTimestamp = QDateTime::fromString(ts, timestampFormat);
fileTimestamp.setTimeSpec(Qt::UTC);
if (this->m_updateTimestamp == fileTimestamp) return; // nothing to do
this->m_updateTimestamp = fileTimestamp;
updateTimestamp = QDateTime::fromString(ts, timestampFormat);
updateTimestamp.setTimeSpec(Qt::UTC);
if (this->getUpdateTimestamp() == updateTimestamp) return; // nothing to do
}
QDomNode atc = doc.elementsByTagName("atcs").at(0);
@@ -84,6 +84,13 @@ namespace BlackCore
CAtcStationList bookedStations;
for (int i = 0; i < size; i++)
{
if (this->isStopped())
{
qDebug() << "terminated" << Q_FUNC_INFO;
return; // stop, terminate straight away, ending thread
}
// pase nodes
QDomNode bookingNode = bookingNodes.at(i);
QDomNodeList bookingNodeValues = bookingNode.childNodes();
CAtcStation bookedStation;
@@ -129,8 +136,8 @@ namespace BlackCore
bookedStation.setController(user);
bookedStations.push_back(bookedStation);
}
this->setUpdateTimestamp(updateTimestamp); // thread safe update
emit this->dataRead(bookedStations);
} // node
} // content
@@ -138,4 +145,5 @@ namespace BlackCore
nwReply->deleteLater();
} // method
} // namespace

View File

@@ -6,50 +6,42 @@
#ifndef BLACKCORE_VATSIMBOOKINGREADER_H
#define BLACKCORE_VATSIMBOOKINGREADER_H
//! \file
#include "blackmisc/threadedreader.h"
#include "blackmisc/avatcstationlist.h"
#include <QObject>
#include <QTimer>
#include <QNetworkReply>
#include <QReadWriteLock>
namespace BlackCore
{
/*!
* \brief Read bookings from VATSIM
* Read bookings from VATSIM
*/
class CVatsimBookingReader : public QObject
class CVatsimBookingReader : public QObject, public BlackMisc::CThreadedReader<void>
{
Q_OBJECT
public:
//! \brief Constructor
//! Constructor
explicit CVatsimBookingReader(const QString &url, QObject *parent = nullptr);
//! \brief Update timestamp
QDateTime getUpdateTimestamp() const { return this->m_updateTimestamp; }
//! \brief Read / re-read bookings
//! Read / re-read bookings
void read();
/*!
* \brief Set the update time
* \param updatePeriodMs 0 stops the timer
*/
void setInterval(int updatePeriodMs);
//! \brief Get the timer interval (ms)
int interval() const { return this->m_updateTimer->interval();}
private slots:
//! \brief Bookings have been read
//! Bookings have been read
void loadFinished(QNetworkReply *nwReply);
private:
QString m_serviceUrl; /*!< URL of the service */
QNetworkAccessManager *m_networkManager;
QDateTime m_updateTimestamp;
QTimer *m_updateTimer;
//! Parse received bookings
//! \threadsafe
void parseBookings(QNetworkReply *nwReply);
signals:

View File

@@ -13,14 +13,13 @@ using namespace BlackMisc::Network;
using namespace BlackMisc::Geo;
using namespace BlackMisc::PhysicalQuantities;
namespace BlackCore
{
CVatsimDataFileReader::CVatsimDataFileReader(const QStringList &urls, QObject *parent) : QObject(parent), m_serviceUrls(urls), m_currentUrlIndex(0), m_networkManager(nullptr), m_updateTimer(nullptr)
CVatsimDataFileReader::CVatsimDataFileReader(const QStringList &urls, QObject *parent) :
QObject(parent), CThreadedReader(),
m_serviceUrls(urls), m_currentUrlIndex(0), m_networkManager(nullptr)
{
this->m_networkManager = new QNetworkAccessManager(this);
this->m_updateTimer = new QTimer(this);
this->connect(this->m_networkManager, &QNetworkAccessManager::finished, this, &CVatsimDataFileReader::loadFinished);
this->connect(this->m_updateTimer, &QTimer::timeout, this, &CVatsimDataFileReader::read);
}
@@ -39,16 +38,26 @@ namespace BlackCore
if (url.isEmpty()) return;
Q_ASSERT(this->m_networkManager);
QNetworkRequest request(url);
this->m_networkManager->get(request);
QNetworkReply *r = this->m_networkManager->get(request);
this->setPendingNetworkReply(r);
}
void CVatsimDataFileReader::setInterval(int updatePeriodMs)
const CAircraftList &CVatsimDataFileReader::getAircrafts()
{
Q_ASSERT(this->m_updateTimer);
if (updatePeriodMs < 1)
this->m_updateTimer->stop();
else
this->m_updateTimer->start(updatePeriodMs);
QReadLocker rl(&this->m_lock);
return this->m_aircrafts;
}
const CAtcStationList &CVatsimDataFileReader::getAtcStations()
{
QReadLocker rl(&this->m_lock);
return this->m_atcStations;
}
const CServerList &CVatsimDataFileReader::getVoiceServers()
{
QReadLocker rl(&this->m_lock);
return this->m_voiceServers;
}
CUserList CVatsimDataFileReader::getPilotsForCallsigns(const CCallsignList &callsigns)
@@ -57,7 +66,7 @@ namespace BlackCore
if (callsigns.isEmpty()) return users;
foreach(CCallsign callsign, callsigns)
{
users.push_back(this->m_aircrafts.findByCallsign(callsign).getPilots());
users.push_back(this->getAircrafts().findByCallsign(callsign).getPilots());
}
return users;
}
@@ -71,7 +80,7 @@ namespace BlackCore
CAircraftIcao CVatsimDataFileReader::getIcaoInfo(const CCallsign &callsign)
{
CAircraft aircraft = this->m_aircrafts.findFirstByCallsign(callsign);
CAircraft aircraft = this->getAircrafts().findFirstByCallsign(callsign);
return aircraft.getIcaoInfo();
}
@@ -95,7 +104,7 @@ namespace BlackCore
if (callsigns.isEmpty()) return users;
foreach(CCallsign callsign, callsigns)
{
users.push_back(this->m_atcStations.findByCallsign(callsign).getControllers());
users.push_back(this->getAtcStations().findByCallsign(callsign).getControllers());
}
return users;
}
@@ -117,7 +126,12 @@ namespace BlackCore
*/
void CVatsimDataFileReader::loadFinished(QNetworkReply *nwReply)
{
QtConcurrent::run(this, &CVatsimDataFileReader::parseVatsimFileInBackground, nwReply);
this->setPendingNetworkReply(nullptr);
if (!this->isStopped())
{
QFuture<void> f = QtConcurrent::run(this, &CVatsimDataFileReader::parseVatsimFileInBackground, nwReply);
this->setPendingFuture(f);
}
}
/*
@@ -126,8 +140,14 @@ namespace BlackCore
*/
void CVatsimDataFileReader::parseVatsimFileInBackground(QNetworkReply *nwReply)
{
QStringList illegalIcaoCodes;
// Worker thread, make sure to write only synced here!
if (this->isStopped())
{
qDebug() << "terminated" << Q_FUNC_INFO;
return; // stop, terminate straight away, ending thread
}
QStringList illegalIcaoCodes;
if (nwReply->error() == QNetworkReply::NoError)
{
const QString dataFileData = nwReply->readAll();
@@ -135,10 +155,23 @@ namespace BlackCore
QStringList lines = dataFileData.split(QRegExp("[\r\n]"), QString::SkipEmptyParts);
if (lines.isEmpty()) return;
// build on local vars for thread safety
CServerList voiceServers;
CAtcStationList atcStations;
CAircraftList aircrafts;
QDateTime updateTimestampFromFile;
QStringList clientSectionAttributes;
Section section = SectionNone;
foreach(QString currentLine, lines)
{
if (this->isStopped())
{
qDebug() << "terminated" << Q_FUNC_INFO;
return; // stop, terminate straight away, ending thread
}
// parse lines
currentLine = currentLine.trimmed();
if (currentLine.isEmpty()) continue;
if (currentLine.startsWith(";"))
@@ -162,13 +195,10 @@ namespace BlackCore
else if (currentLine.contains("VOICE SERVERS", Qt::CaseInsensitive))
{
section = SectionVoiceServer;
this->m_voiceServers.clear();
}
else if (currentLine.contains("CLIENTS", Qt::CaseInsensitive))
{
section = SectionClients;
this->m_aircrafts.clear();
this->m_atcStations.clear();
}
else
{
@@ -217,16 +247,16 @@ namespace BlackCore
}
}
this->m_aircrafts.push_back(aircraft);
aircrafts.push_back(aircraft);
}
else if (clientType.startsWith('a'))
{
// ATC section
CLength range;
position.setHeight(altitude); // the altitude is elevation for a station
position.setGeodeticHeight(altitude); // the altitude is elevation for a station
CAtcStation station(user.getCallsign().getStringAsSet(), user, frequency, position, range);
station.setOnline(true);
this->m_atcStations.push_back(station);
atcStations.push_back(station);
}
else
{
@@ -241,40 +271,45 @@ namespace BlackCore
QStringList updateParts = currentLine.replace(" ", "").split('=');
if (updateParts.length() < 2) continue;
QString dts = updateParts.at(1).trimmed();
QDateTime dt = QDateTime::fromString(dts, "yyyyMMddHHmmss");
dt.setOffsetFromUtc(0);
if (dt == this->m_updateTimestamp)
{
return; // still same data, terminate
}
this->m_updateTimestamp = dt;
updateTimestampFromFile = QDateTime::fromString(dts, "yyyyMMddHHmmss");
updateTimestampFromFile.setOffsetFromUtc(0);
bool alreadyRead = (updateTimestampFromFile == this->getUpdateTimestamp());
if (alreadyRead) { return; }// still same data, terminate
}
break;
case SectionVoiceServer:
{
QStringList voiceServerParts = currentLine.split(':');
if (voiceServerParts.size() < 3) continue;
BlackMisc::Network::CServer voiceServer(voiceServerParts.at(1), voiceServerParts.at(2), voiceServerParts.at(0), -1, CUser());
voiceServers.push_back(voiceServer);
}
break;
case SectionNone:
default:
break;
}
break;
case SectionVoiceServer:
{
QStringList voiceServerParts = currentLine.split(':');
if (voiceServerParts.size() < 3) continue;
BlackMisc::Network::CServer voiceServer(voiceServerParts.at(1), voiceServerParts.at(2), voiceServerParts.at(0), -1, CUser());
this->m_voiceServers.push_back(voiceServer);
}
break;
case SectionNone:
default:
break;
}
} // for each
}
} // for each
nwReply->close();
nwReply->deleteLater(); // we are responsible for reading this
emit this->dataRead();
// this part needs to be synchronized
this->m_lock.lockForWrite();
this->m_updateTimestamp = updateTimestampFromFile;
this->m_aircrafts = aircrafts;
this->m_atcStations = atcStations;
this->m_voiceServers = voiceServers;
this->m_lock.unlock();
} // read success
// warnings, if required
if (!illegalIcaoCodes.isEmpty())
{
const QString w = QString("Illegal ICAO code(s) in VATSIM data file: %1").arg(illegalIcaoCodes.join(", "));
qWarning(w.toLatin1());
nwReply->close();
nwReply->deleteLater(); // we are responsible for reading this
emit this->dataRead();
// warnings, if required
if (!illegalIcaoCodes.isEmpty())
{
const QString w = QString("Illegal ICAO code(s) in VATSIM data file: %1").arg(illegalIcaoCodes.join(", "));
qWarning(w.toLatin1());
}
}
}

View File

@@ -6,6 +6,9 @@
#ifndef BLACKCORE_VATSIMDATAFILEREADER_H
#define BLACKCORE_VATSIMDATAFILEREADER_H
//! \file
#include "blackmisc/threadedreader.h"
#include "blackmisc/avatcstationlist.h"
#include "blackmisc/avaircraftlist.h"
#include "blackmisc/nwserverlist.h"
@@ -15,13 +18,14 @@
#include <QObject>
#include <QTimer>
#include <QNetworkReply>
#include <QReadWriteLock>
namespace BlackCore
{
/*!
* Read bookings from VATSIM
*/
class CVatsimDataFileReader : public QObject
class CVatsimDataFileReader : public QObject, public BlackMisc::CThreadedReader<void>
{
Q_OBJECT
@@ -29,29 +33,17 @@ namespace BlackCore
//! Constructor
explicit CVatsimDataFileReader(const QStringList &urls, QObject *parent = nullptr);
//! Update timestamp
QDateTime getUpdateTimestamp() const { return this->m_updateTimestamp; }
//! Read / re-read bookings
void read();
/*!
* \brief Set the update time
* \param updatePeriodMs 0 stops the timer
*/
void setInterval(int updatePeriodMs);
//! Get the timer interval (ms)
int interval() const { return this->m_updateTimer->interval();}
//! Get aircrafts
const BlackMisc::Aviation::CAircraftList &getAircrafts();
//! Get aircrafts
const BlackMisc::Aviation::CAircraftList &getAircrafts() { return this->m_aircrafts; }
//! Get aircrafts
const BlackMisc::Aviation::CAtcStationList &getAtcStations() { return this->m_atcStations; }
const BlackMisc::Aviation::CAtcStationList &getAtcStations();
//! Get all voice servers
const BlackMisc::Network::CServerList &getVoiceServers() { return this->m_voiceServers; }
const BlackMisc::Network::CServerList &getVoiceServers();
//! Users for callsign(s)
BlackMisc::Network::CUserList getUsersForCallsigns(const BlackMisc::Aviation::CCallsignList &callsigns);
@@ -85,8 +77,6 @@ namespace BlackCore
QStringList m_serviceUrls; /*!< URL of the service */
int m_currentUrlIndex;
QNetworkAccessManager *m_networkManager;
QDateTime m_updateTimestamp;
QTimer *m_updateTimer;
BlackMisc::Network::CServerList m_voiceServers;
BlackMisc::Aviation::CAtcStationList m_atcStations;
BlackMisc::Aviation::CAircraftList m_aircrafts;
@@ -107,6 +97,7 @@ namespace BlackCore
signals:
//! Data have been read
void dataRead();
};
}

View File

@@ -0,0 +1,146 @@
#ifndef BLACKMISC_THREADED_READER_H
#define BLACKMISC_THREADED_READER_H
//! \file
#include <QReadWriteLock>
#include <QDateTime>
#include <QTimer>
#include <QNetworkReply>
#include <QFuture>
#include <QCoreApplication>
// Header only class, to avoid orward instantiation across subprojects
namespace BlackMisc
{
/*!
* Support for threaded based reading and parsing tasks such
* as data files via http, or file system and parsing (such as FSX models)
*/
template <class FutureRet = void> class CThreadedReader
{
public:
//! Destructor
virtual ~CThreadedReader()
{
delete m_updateTimer;
this->stop();
}
//! Thread safe, set update timestamp
//! \threadsafe
QDateTime getUpdateTimestamp() const
{
QReadLocker(&this->m_lock);
return this->m_updateTimestamp;
}
//! Thread safe, set update timestamp
//! \threadsafe
void setUpdateTimestamp(const QDateTime &updateTimestamp)
{
QWriteLocker(&this->m_lock);
this->m_updateTimestamp = updateTimestamp;
}
//! Thread safe, mark as stopped
virtual void stop()
{
if (this->isStopped()) return;
this->setStopFlag();
this->setInterval(0);
// shutdown pending
if (this->m_pendingFuture.isRunning())
{
// cancel does not work with all futures, especially not with QConcurrent::run
// the stop flag should the job
// but I will cancel anyway
this->m_pendingFuture.cancel();
}
if (this->m_pendingNetworkReply && this->m_pendingNetworkReply->isRunning())
{
this->m_pendingNetworkReply->abort();
QCoreApplication::processEvents(QEventLoop::AllEvents, 250); // allow the abort to be called
}
// cancel or stop flag above should terminate QFuture
this->m_pendingFuture.waitForFinished();
}
//! Thread safe, is in state stopped?
//! \threadsafe
bool isStopped() const
{
QReadLocker rl(&this->m_lock);
return this->m_stopped;
}
/*!
* Set the update time
* \param updatePeriodMs <=0 stops the timer
* \threadsafe
*/
void setInterval(int updatePeriodMs)
{
Q_ASSERT(this->m_updateTimer);
QWriteLocker(&this->m_lock);
if (updatePeriodMs < 1)
this->m_updateTimer->stop();
else
this->m_updateTimer->start(updatePeriodMs);
}
//! Get the timer interval (ms)
//! \threadsafe
int interval() const
{
QReadLocker rl(&this->m_lock);
return this->m_updateTimer->interval();
}
protected:
//! Constructor
CThreadedReader() :
m_updateTimer(nullptr), m_stopped(false), m_pendingNetworkReply(nullptr), m_lock(QReadWriteLock::Recursive)
{
this->m_updateTimer = new QTimer();
}
//! Has pending network replay
//! \threadsafe
void setPendingNetworkReply(QNetworkReply *reply)
{
QWriteLocker(&this->m_lock);
this->m_pendingNetworkReply = reply;
}
//! Has pending operation
//! \threadsafe
void setPendingFuture(QFuture<FutureRet> future)
{
QWriteLocker(&this->m_lock);
this->m_pendingFuture = future;
}
//! Thread safe, mark as to be stopped
//! \threadsafe
void setStopFlag()
{
QWriteLocker wl(&this->m_lock);
this->m_stopped = true;
}
QDateTime m_updateTimestamp;
QTimer *m_updateTimer;
bool m_stopped;
QFuture<FutureRet> m_pendingFuture; //!< optional future to be stopped
QNetworkReply *m_pendingNetworkReply; //!< optional future to be stopped
mutable QReadWriteLock m_lock;
};
} // namespace
#endif // guard