Issue #15 Added CDataLinkDBus, an implementation for sharing state between processes via DBus

This commit is contained in:
Mat Sutcliffe
2019-02-26 01:51:50 +00:00
parent 7df7de7f07
commit 2ddc5d51f7
17 changed files with 1163 additions and 2 deletions

View File

@@ -32,6 +32,11 @@
* \brief Utilities for sharing state between multiple objects.
*/
/*!
* \namespace BlackMisc::SharedState::DBus
* \brief Implementation detail classes used by SharedState::CDataLinkDBus.
*/
/*!
* \namespace BlackMisc::Predicates
* \brief Functor classes for evaluating predicate calculus expressions.

View File

@@ -39,6 +39,7 @@ HEADERS += *.h \
$$files($$PWD/network/external/*.h) \
$$files($$PWD/pq/*.h) \
$$files($$PWD/sharedstate/*.h) \
$$files($$PWD/sharedstate/dbus/*.h) \
$$files($$PWD/simulation/*.h) \
$$files($$PWD/simulation/data/*.h) \
$$files($$PWD/simulation/settings/*.h) \
@@ -63,6 +64,7 @@ SOURCES += *.cpp \
$$files($$PWD/network/external/*.cpp) \
$$files($$PWD/pq/*.cpp) \
$$files($$PWD/sharedstate/*.cpp) \
$$files($$PWD/sharedstate/dbus/*.cpp) \
$$files($$PWD/simulation/*.cpp) \
$$files($$PWD/simulation/data/*.cpp) \
$$files($$PWD/simulation/settings/*.cpp) \

View File

@@ -47,10 +47,8 @@ namespace BlackMisc
//! For each signal in parent, attempt to connect to it an interface signal of the same name.
//! \see BLACK_NO_RELAY
//! \deprecated KB 2018-08 this seems to be not used anymore and might be removed
void relayParentSignals()
{
Q_ASSERT_X(false, Q_FUNC_INFO, "crosscheck if still used");
const QMetaObject *metaObject = this->parent()->metaObject();
const QMetaObject *superMetaObject = metaObject;
while (strcmp(superMetaObject->superClass()->className(), "QObject") != 0) { superMetaObject = superMetaObject->superClass(); }

View File

@@ -0,0 +1,223 @@
/* Copyright (C) 2017
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/datalinkdbus.h"
#include "blackmisc/sharedstate/dbus/hub.h"
#include "blackmisc/sharedstate/dbus/duplex.h"
#include "blackmisc/sharedstate/dbus/duplexproxy.h"
#include "blackmisc/sharedstate/activeobserver.h"
#include "blackmisc/sharedstate/activemutator.h"
#include "blackmisc/dbusserver.h"
#include <QDBusServiceWatcher>
#include <QDBusConnection>
using namespace BlackMisc::SharedState::DBus;
namespace BlackMisc
{
namespace SharedState
{
CDataLinkDBus::CDataLinkDBus(QObject *parent) : QObject(parent), m_watchTimer(this)
{
connect(&m_watchTimer, &QTimer::timeout, this, &CDataLinkDBus::checkConnection);
m_watchTimer.setInterval(1000);
}
CDataLinkDBus::~CDataLinkDBus()
{
setConnectionStatus(false);
}
void CDataLinkDBus::initializeLocal(CDBusServer *server)
{
Q_ASSERT_X(!m_hub, Q_FUNC_INFO, "Already initialized");
m_hub = IHub::create(false, server, QDBusConnection("unused"), {}, this);
m_watchTimer.start();
checkConnection();
}
void CDataLinkDBus::initializeRemote(const QDBusConnection &connection, const QString &service)
{
Q_ASSERT_X(!m_hub, Q_FUNC_INFO, "Already initialized");
m_hub = IHub::create(true, nullptr, connection, service, this);
m_watchTimer.start();
checkConnection();
}
void CDataLinkDBus::checkConnection()
{
if (m_hub->isConnected()) { onConnected(); }
else { onDisconnected(); }
}
void CDataLinkDBus::onConnected()
{
if (m_duplex) { return; }
QFuture<void> ready;
std::tie(m_duplex, ready) = m_hub->getDuplex();
connect(m_duplex.get(), &IDuplex::eventPosted, this, &CDataLinkDBus::handlePeerEvent);
connect(m_duplex.get(), &IDuplex::peerSubscriptionsReceived, this, &CDataLinkDBus::setPeerSubscriptions);
connect(m_duplex.get(), &IDuplex::requestReceived, this, &CDataLinkDBus::handlePeerRequest);
doAfter(ready, m_duplex.get(), [this]
{
m_duplex->requestPeerSubscriptions();
announceLocalSubscriptions();
setConnectionStatus(true);
});
}
void CDataLinkDBus::onDisconnected()
{
m_duplex.reset();
setConnectionStatus(false);
}
void CDataLinkDBus::handleLocalEvent(const QString &channel, const CVariant &param)
{
handlePeerEvent(channel, param);
if (!m_duplex) { return; }
for (const auto &filter : as_const(getChannel(channel).peerSubscriptions))
{
if (filter.matches(param))
{
m_duplex->postEvent(channel, param);
return;
}
}
}
void CDataLinkDBus::handlePeerEvent(const QString &channel, const CVariant &param)
{
for (const auto &observerWeak : as_const(getChannel(channel).passiveObservers))
{
auto observer = observerWeak.lock();
if (observer && observer->eventSubscription().matches(param))
{
observer->handleEvent(param);
}
}
}
void CDataLinkDBus::announceLocalSubscriptions()
{
for (const auto &channel : getChannelNames())
{
announceLocalSubscriptions(channel);
}
}
void CDataLinkDBus::announceLocalSubscriptions(const QString &channel)
{
CVariantList filters;
for (const auto &observerWeak : as_const(getChannel(channel).passiveObservers))
{
auto observer = observerWeak.lock();
if (observer) { filters.push_back(observer->eventSubscription()); }
}
m_duplex->setSubscription(channel, filters);
}
void CDataLinkDBus::setPeerSubscriptions(const QString &channel, const CVariantList &filters)
{
getChannel(channel).peerSubscriptions = filters;
}
QFuture<CVariant> CDataLinkDBus::handleLocalRequest(const QString &channel, const CVariant &param)
{
auto mutator = getChannel(channel).activeMutator.lock();
if (mutator) { return mutator->handleRequest(param); }
if (!m_duplex) { return {}; }
return m_duplex->submitRequest(channel, param);
}
void CDataLinkDBus::handlePeerRequest(const QString &channel, const CVariant &param, quint32 token)
{
auto mutator = getChannel(channel).activeMutator.lock();
if (mutator)
{
doAfter(mutator->handleRequest(param), this, [this, token](auto future)
{
if (m_duplex) { m_duplex->reply(future.result(), token); }
});
}
}
void CDataLinkDBus::publish(const CPassiveMutator *mutator)
{
connect(mutator, &CPassiveMutator::eventPosted, this, [this, channel = getChannelName(mutator)](const CVariant &param)
{
handleLocalEvent(channel, param);
});
}
void CDataLinkDBus::publish(const CActiveMutator *mutator)
{
publish(static_cast<const CPassiveMutator *>(mutator));
auto &channel = getChannel(mutator);
Q_ASSERT_X(! channel.activeMutator, Q_FUNC_INFO, "Tried to publish two active mutators on one channel");
channel.activeMutator = mutator->weakRef();
if (m_duplex)
{
m_duplex->advertise(getChannelName(mutator));
}
connect(mutator, &QObject::destroyed, this, [this, channel = getChannelName(mutator)]
{
if (m_duplex) { m_duplex->withdraw(channel); }
});
}
void CDataLinkDBus::subscribe(const CPassiveObserver *observer)
{
getChannel(observer).passiveObservers.push_back(observer->weakRef());
auto announce = [this, channel = getChannelName(observer)]
{
if (m_duplex) { announceLocalSubscriptions(channel); }
};
connect(observer, &CPassiveObserver::eventSubscriptionChanged, this, announce);
connect(observer, &QObject::destroyed, this, announce);
announce();
}
void CDataLinkDBus::subscribe(const CActiveObserver *observer)
{
subscribe(static_cast<const CPassiveObserver *>(observer));
connect(observer, &CActiveObserver::requestPosted, this, [this, channel = getChannelName(observer)](const CVariant &param, CPromise<CVariant> reply)
{
reply.chainResult(handleLocalRequest(channel, param));
});
}
QStringList CDataLinkDBus::getChannelNames() const
{
QMutexLocker lock(&m_channelsMutex);
return m_channels.keys();
}
CDataLinkDBus::Channel &CDataLinkDBus::getChannel(const QString &name)
{
QMutexLocker lock(&m_channelsMutex);
return m_channels[name];
}
CDataLinkDBus::Channel &CDataLinkDBus::getChannel(const QObject *object)
{
return getChannel(getChannelName(object));
}
}
}

View File

@@ -0,0 +1,92 @@
/* Copyright (C) 2017
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DATALINKDBUS_H
#define BLACKMISC_SHAREDSTATE_DATALINKDBUS_H
#include "blackmisc/sharedstate/datalink.h"
#include "blackmisc/blackmiscexport.h"
#include "blackmisc/variantlist.h"
#include <QSharedPointer>
class QDBusConnection;
namespace BlackMisc
{
class CDBusServer;
namespace SharedState
{
namespace DBus
{
class IHub;
class IDuplex;
}
/*!
* A transport mechanism using signals and slots distributed by DBus.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT CDataLinkDBus : public QObject, public IDataLink
{
Q_OBJECT
Q_INTERFACES(BlackMisc::SharedState::IDataLink)
public:
//! Constructor.
CDataLinkDBus(QObject *parent = nullptr);
//! Destructor.
virtual ~CDataLinkDBus() override;
//! Initialize on server side.
void initializeLocal(CDBusServer *server = nullptr);
//! Initialize on client side.
void initializeRemote(const QDBusConnection &connection, const QString &service);
virtual void publish(const CPassiveMutator *mutator) override;
virtual void publish(const CActiveMutator *mutator) override;
virtual void subscribe(const CPassiveObserver *observer) override;
virtual void subscribe(const CActiveObserver *observer) override;
private:
struct Channel
{
QWeakPointer<const CActiveMutator> activeMutator;
QVector<QWeakPointer<const CPassiveObserver>> passiveObservers;
CVariantList peerSubscriptions;
};
QStringList getChannelNames() const;
Channel &getChannel(const QString &name);
Channel &getChannel(const QObject *object);
void checkConnection();
void onConnected();
void onDisconnected();
void handleLocalEvent(const QString &channel, const CVariant &param);
void handlePeerEvent(const QString &channel, const CVariant &param);
void announceLocalSubscriptions();
void announceLocalSubscriptions(const QString &channel);
void setPeerSubscriptions(const QString &channel, const CVariantList &filters);
QFuture<CVariant> handleLocalRequest(const QString &channel, const CVariant &param);
void handlePeerRequest(const QString &channel, const CVariant &param, quint32 token);
QTimer m_watchTimer;
DBus::IHub *m_hub = nullptr;
QSharedPointer<DBus::IDuplex> m_duplex;
QMap<QString, Channel> m_channels;
mutable QMutex m_channelsMutex { QMutex::Recursive };
};
}
}
#endif

View File

@@ -0,0 +1,60 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/duplex.h"
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
IDuplex::IDuplex(QObject* parent) : QObject(parent)
{
connect(this, &IDuplex::replyReceived, this, [this](const QString &, const CVariant &param, quint32 token)
{
const auto it = m_submittedRequests.find(token);
if (it == m_submittedRequests.end()) { return; }
it->setResult(param);
m_submittedRequests.erase(it);
});
}
QFuture<CVariant> IDuplex::submitRequest(const QString &channel, const CVariant &param)
{
const auto token = getToken();
auto future = m_submittedRequests.insert(token, {})->future();
submitRequest(channel, param, token);
return future;
}
QFuture<CVariant> IDuplex::receiveRequest(const QString &channel, const BlackMisc::CVariant &param)
{
const auto token = getToken();
auto future = m_receivedRequests.insert(token, {})->future();
emit requestReceived(channel, param, token, {});
return future;
}
void IDuplex::reply(const BlackMisc::CVariant &param, quint32 token)
{
const auto it = m_receivedRequests.find(token);
if (it == m_receivedRequests.end()) { return; }
it->setResult(param);
m_receivedRequests.erase(it);
}
quint32 IDuplex::getToken()
{
return m_token++;
}
}
}
}

View File

@@ -0,0 +1,104 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_DUPLEX_H
#define BLACKMISC_SHAREDSTATE_DBUS_DUPLEX_H
#include "blackmisc/genericdbusinterface.h"
#include "blackmisc/blackmiscexport.h"
#include "blackmisc/promise.h"
#include <QObject>
#include <QFuture>
#include <QMap>
//! DBus interface for sharedstate hub.
#define BLACKMISC_DUPLEX_INTERFACE "org.swift_project.blackmisc.sharedstate.duplex"
//! DBus object path root for sharedstate hub.
#define BLACKMISC_DUPLEX_PATH_ROOT "/org/swift_project/duplex"
namespace BlackMisc
{
class CVariant;
class CVariantList;
namespace SharedState
{
namespace DBus
{
/*!
* Abstract interface for the spoke in a star topology. An implementation detail of CDataLinkDBus.
*
* Signals send messages from server to client; slots send messages from client to server.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT IDuplex : public QObject
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_DUPLEX_INTERFACE)
public:
//! Client submits a request to the server. Reply is returned via a future.
QFuture<CVariant> submitRequest(const QString &channel, const BlackMisc::CVariant &param);
//! Server submits a request to the client. Reply is returned via a future.
QFuture<CVariant> receiveRequest(const QString &channel, const BlackMisc::CVariant &param);
public slots:
//! Client posts an event to the server.
virtual void postEvent(const QString &channel, const BlackMisc::CVariant &param) = 0;
//! Client announces its subscription to an event channel.
virtual void setSubscription(const QString &channel, const BlackMisc::CVariantList &filters) = 0;
//! Client requests to be notified of all other clients' subscriptions.
virtual void requestPeerSubscriptions() = 0;
//! \private Client submits a request to the server.
virtual void submitRequest(const QString &channel, const BlackMisc::CVariant &param, quint32 token) = 0;
//! Client replies to a submitted request.
virtual void reply(const BlackMisc::CVariant &param, quint32 token);
//! Client advertises that it can handle requests for the given channel.
virtual void advertise(const QString &channel) = 0;
//! Client advertises that it can no longer handle requests for the given channel.
virtual void withdraw(const QString &channel) = 0;
signals:
//! Server has notified the client that an event has been posted.
void eventPosted(const QString &channel, const BlackMisc::CVariant &param);
//! Server has notified the client that other clients' event subscriptions have changed.
void peerSubscriptionsReceived(const QString &channel, const BlackMisc::CVariantList &filters);
//! Server has submitted a request to be handled by the client.
void requestReceived(const QString &channel, const BlackMisc::CVariant &param, quint32 token, QPrivateSignal);
//! \private Server has relayed a reply to the client's request.
void replyReceived(const QString &channel, const BlackMisc::CVariant &param, quint32 token);
protected:
//! Constructor.
IDuplex(QObject *parent = nullptr);
private:
quint32 getToken();
quint32 m_token = 0;
QMap<quint32, CPromise<CVariant>> m_submittedRequests;
QMap<quint32, CPromise<CVariant>> m_receivedRequests;
};
}
}
}
#endif

View File

@@ -0,0 +1,110 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/dupleximpl.h"
#include "blackmisc/sharedstate/dbus/hubimpl.h"
#include "blackmisc/variantlist.h"
#include "blackmisc/dbusserver.h"
#include "blackmisc/identifier.h"
#include "blackmisc/verify.h"
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
CDuplex::CDuplex(CHub *hub, const CIdentifier &client, CDBusServer *server, QObject *parent) : IDuplex(parent), m_hub(hub)
{
if (server) { server->addObject(client.toDBusObjectPath(BLACKMISC_DUPLEX_PATH_ROOT), this); }
}
CDuplex::~CDuplex()
{
while (!m_subscriptions.isEmpty())
{
const auto channel = m_subscriptions.firstKey(); // explicit copy because a reference would dangle
setSubscription(channel, {});
}
}
void CDuplex::postEvent(const QString &channel, const CVariant &param)
{
for (auto client : m_hub->clients())
{
if (client != this && client->m_subscriptions.value(channel).matches(param))
{
emit client->eventPosted(channel, param);
}
}
}
void CDuplex::setSubscription(const QString &channel, const CVariantList &filters)
{
if (filters.isEmpty()) { m_subscriptions.remove(channel); }
else { m_subscriptions.insert(channel, filters); }
for (auto client : m_hub->clients())
{
if (client != this) { client->requestPeerSubscriptions(channel); }
}
}
void CDuplex::requestPeerSubscriptions()
{
QSet<QString> channels;
for (auto client : m_hub->clients())
{
if (client != this) { channels.unite(client->m_subscriptions.keys().toSet()); }
}
for (const auto channel : channels)
{
requestPeerSubscriptions(channel);
}
}
void CDuplex::requestPeerSubscriptions(const QString &channel)
{
CVariantList filters;
for (auto peer : m_hub->clients())
{
if (peer != this) { filters.push_back(peer->m_subscriptions.value(channel)); }
}
emit peerSubscriptionsReceived(channel, filters);
}
void CDuplex::submitRequest(const QString &channel, const CVariant &param, quint32 token)
{
for (auto handler : m_hub->clients())
{
if (handler != this && handler->m_handlingChannels.contains(channel))
{
doAfter(handler->receiveRequest(channel, param), this, [this, channel, token](QFuture<CVariant> future)
{
emit this->replyReceived(channel, future.result(), token);
});
return;
}
}
}
void CDuplex::advertise(const QString &channel)
{
m_handlingChannels.insert(channel);
}
void CDuplex::withdraw(const QString &channel)
{
m_handlingChannels.remove(channel);
}
}
}
}

View File

@@ -0,0 +1,66 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_DUPLEXIMPL_H
#define BLACKMISC_SHAREDSTATE_DBUS_DUPLEXIMPL_H
#include "blackmisc/sharedstate/dbus/duplex.h"
#include <functional>
namespace BlackMisc
{
class CIdentifier;
class CDBusServer;
namespace SharedState
{
namespace DBus
{
class CHub;
/*!
* Server side implementation of IDuplex. Receives messages from clients and forwards them to other clients via the CHub.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT CDuplex : public IDuplex
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_DUPLEX_INTERFACE)
public:
//! Constructor.
CDuplex(CHub *hub, const CIdentifier &client, CDBusServer *server, QObject *parent = nullptr);
//! Destructor.
virtual ~CDuplex() override;
public slots:
//! \name Interface implementations
//! @{
virtual void postEvent(const QString &channel, const BlackMisc::CVariant &param) override;
virtual void setSubscription(const QString &channel, const BlackMisc::CVariantList &filters) override;
virtual void requestPeerSubscriptions() override;
virtual void submitRequest(const QString &channel, const BlackMisc::CVariant &param, quint32 token) override;
virtual void advertise(const QString &channel) override;
virtual void withdraw(const QString &channel) override;
//! @}
private:
void requestPeerSubscriptions(const QString &channel);
CHub *m_hub = nullptr;
QMap<QString, CVariantList> m_subscriptions;
QSet<QString> m_handlingChannels;
};
}
}
}
#endif

View File

@@ -0,0 +1,65 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/duplexproxy.h"
#include "blackmisc/genericdbusinterface.h"
#include "blackmisc/variantlist.h"
#include "blackmisc/identifier.h"
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
CDuplexProxy::CDuplexProxy(const QDBusConnection &connection, const QString &service, QObject *parent) : IDuplex(parent)
{
const QString path = CIdentifier::anonymous().toDBusObjectPath(BLACKMISC_DUPLEX_PATH_ROOT);
m_interface = new CGenericDBusInterface(service, path, BLACKMISC_DUPLEX_INTERFACE, connection, this);
m_interface->relayParentSignals();
}
void CDuplexProxy::postEvent(const QString &channel, const CVariant &param)
{
m_interface->callDBus(QLatin1String("postEvent"), channel, param);
}
void CDuplexProxy::setSubscription(const QString &channel, const CVariantList &filters)
{
m_interface->callDBus(QLatin1String("setSubscription"), channel, filters);
}
void CDuplexProxy::requestPeerSubscriptions()
{
m_interface->callDBus(QLatin1String("requestPeerSubscriptions"));
}
void CDuplexProxy::submitRequest(const QString &channel, const CVariant &param, quint32 token)
{
m_interface->callDBus(QLatin1String("submitRequest"), channel, param, token);
}
void CDuplexProxy::reply(const CVariant &param, quint32 token)
{
m_interface->callDBus(QLatin1String("reply"), param, token);
}
void CDuplexProxy::advertise(const QString &channel)
{
m_interface->callDBus(QLatin1String("advertise"), channel);
}
void CDuplexProxy::withdraw(const QString &channel)
{
m_interface->callDBus(QLatin1String("withdraw"), channel);
}
}
}
}

View File

@@ -0,0 +1,56 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_DUPLEXPROXY_H
#define BLACKMISC_SHAREDSTATE_DBUS_DUPLEXPROXY_H
#include "blackmisc/sharedstate/dbus/duplex.h"
namespace BlackMisc
{
class CGenericDBusInterface;
namespace SharedState
{
namespace DBus
{
/*!
* Client side implementation of IDuplex, through which the client communicates with the server.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT CDuplexProxy : public IDuplex
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_DUPLEX_INTERFACE)
public:
//! Constructor.
CDuplexProxy(const QDBusConnection &connection, const QString &service, QObject *parent = nullptr);
public slots:
//! \name Interface implementations
//! @{
virtual void postEvent(const QString &channel, const BlackMisc::CVariant &param) override;
virtual void setSubscription(const QString &channel, const BlackMisc::CVariantList &filters) override;
virtual void requestPeerSubscriptions() override;
virtual void submitRequest(const QString &channel, const BlackMisc::CVariant &param, quint32 token) override;
virtual void reply(const BlackMisc::CVariant &param, quint32 token) override;
virtual void advertise(const QString &channel) override;
virtual void withdraw(const QString &channel) override;
//! @}
private:
CGenericDBusInterface *m_interface = nullptr;
};
}
}
}
#endif

View File

@@ -0,0 +1,32 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/hub.h"
#include "blackmisc/sharedstate/dbus/hubimpl.h"
#include "blackmisc/sharedstate/dbus/hubproxy.h"
#include <QDBusConnection>
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
IHub::IHub(QObject* parent) : QObject(parent)
{}
IHub* IHub::create(bool proxy, CDBusServer *server, const QDBusConnection &connection, const QString &service, QObject* parent)
{
if (proxy) { return new CHubProxy(connection, service, parent); }
else { return new CHub(server, parent); }
}
}
}
}

View File

@@ -0,0 +1,76 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_HUB_H
#define BLACKMISC_SHAREDSTATE_DBUS_HUB_H
#include "blackcore/corefacadeconfig.h"
#include "blackmisc/blackmiscexport.h"
#include <QSharedPointer>
#include <QObject>
#include <QFuture>
//! DBus interface for sharedstate hub.
#define BLACKMISC_HUB_INTERFACE "org.swift_project.blackmisc.sharedstate.hub"
//! DBus object path root for sharedstate hub.
#define BLACKMISC_HUB_PATH "/org/swift_project/hub"
class QDBusConnection;
namespace BlackMisc
{
class CDBusServer;
class CIdentifier;
namespace SharedState
{
namespace DBus
{
class IDuplex;
/*!
* Abstract interface for the hub in a star topology. An implementation detail of CDataLinkDBus.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT IHub : public QObject
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_HUB_INTERFACE)
public:
//! Construct a new hub.
static IHub *create(bool proxy, CDBusServer *server, const QDBusConnection &connection, const QString &service, QObject *parent = nullptr);
//! Is connected?
virtual bool isConnected() const = 0;
//! Get a duplex object for the calling process.
virtual std::pair<QSharedPointer<IDuplex>, QFuture<void>> getDuplex() = 0;
public slots:
//! Create a duplex object for the identified process.
virtual bool openDuplex(const BlackMisc::CIdentifier &client) = 0;
//! Destroy the duplex object for the identified process.
virtual void closeDuplex(const BlackMisc::CIdentifier &client) = 0;
protected:
//! Create a duplex object and return status via future.
virtual QFuture<void> openDuplexAsync(const CIdentifier &client) = 0;
//! Constructor.
IHub(QObject *parent = nullptr);
};
}
}
}
#endif

View File

@@ -0,0 +1,67 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/hubimpl.h"
#include "blackmisc/sharedstate/dbus/dupleximpl.h"
#include "blackmisc/dbusserver.h"
#include "blackmisc/variantlist.h"
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
CHub::CHub(CDBusServer *server, QObject* parent) : IHub(parent), m_server(server)
{
if (server) { server->addObject(BLACKMISC_HUB_PATH, this); }
}
std::pair<QSharedPointer<IDuplex>, QFuture<void>> CHub::getDuplex()
{
auto future = openDuplexAsync(CIdentifier::anonymous());
return std::make_pair(m_clients.value(CIdentifier::anonymous()), future);
}
bool CHub::openDuplex(const BlackMisc::CIdentifier &client)
{
if (!m_clients.contains(client))
{
m_clients.insert(client, QSharedPointer<CDuplex>::create(this, client, m_server, this));
}
return true;
}
void CHub::closeDuplex(const BlackMisc::CIdentifier &client)
{
// Using take() instead of remove() because we need the
// destruction to happen after the removal, not before.
m_clients.take(client);
}
QFuture<void> CHub::openDuplexAsync(const CIdentifier &client)
{
openDuplex(client);
CPromise<void> promise;
promise.setResult();
return promise.future();
}
CHub::~CHub()
{
// Disconnect clients from the hub before destroying them,
// to avoid thrashing peer event subscription updates.
const auto clients = std::move(m_clients);
Q_UNUSED(clients)
}
}
}
}

View File

@@ -0,0 +1,75 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_HUBIMPL_H
#define BLACKMISC_SHAREDSTATE_DBUS_HUBIMPL_H
#include "blackmisc/sharedstate/dbus/hub.h"
#include "blackmisc/identifier.h"
#include <QSharedPointer>
#include <QMap>
namespace BlackMisc
{
class CDBusServer;
namespace SharedState
{
namespace DBus
{
class CDuplex;
/*!
* Server side implementation of IHub. Maintains a collection of CDuplex objects.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT CHub : public IHub
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_HUB_INTERFACE)
public:
//! Constructor.
CHub(CDBusServer *server, QObject *parent = nullptr);
//! Destructor.
virtual ~CHub() override;
//! Returns a range containing all duplex objects.
const auto &clients() const { return m_clients; }
//! \name Interface implementations
//! @{
virtual bool isConnected() const override { return true; }
virtual std::pair<QSharedPointer<IDuplex>, QFuture<void>> getDuplex() override;
//! @}
public slots:
//! \name Interface implementations
//! @{
virtual bool openDuplex(const BlackMisc::CIdentifier &client) override;
virtual void closeDuplex(const BlackMisc::CIdentifier &client) override;
//! @}
protected:
//! \name Interface implementations
//! @{
virtual QFuture<void> openDuplexAsync(const CIdentifier &client) override;
//! @}
private:
CDBusServer *m_server = nullptr;
QMap<CIdentifier, QSharedPointer<CDuplex>> m_clients;
};
}
}
}
#endif

View File

@@ -0,0 +1,62 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#include "blackmisc/sharedstate/dbus/hubproxy.h"
#include "blackmisc/sharedstate/dbus/duplexproxy.h"
#include "blackmisc/genericdbusinterface.h"
#include "blackmisc/variantlist.h"
#include "blackmisc/identifier.h"
namespace BlackMisc
{
namespace SharedState
{
namespace DBus
{
CHubProxy::CHubProxy(const QDBusConnection &connection, const QString &service, QObject* parent) : IHub(parent), m_service(service)
{
m_interface = new CGenericDBusInterface(service, BLACKMISC_HUB_PATH, BLACKMISC_HUB_INTERFACE, connection, this);
m_interface->relayParentSignals();
}
bool CHubProxy::isConnected() const
{
return m_interface->isValid();
}
std::pair<QSharedPointer<IDuplex>, QFuture<void>> CHubProxy::getDuplex()
{
auto duplex = QSharedPointer<CDuplexProxy>::create(m_interface->connection(), m_service, this);
connect(duplex.get(), &QObject::destroyed, this, [ = ] { closeDuplex(CIdentifier::anonymous()); });
return std::make_pair(duplex, openDuplexAsync(CIdentifier::anonymous()));
}
bool CHubProxy::openDuplex(const CIdentifier& client)
{
return m_interface->callDBusRet<bool>(QLatin1String("openDuplex"), client);
}
void CHubProxy::closeDuplex(const CIdentifier& client)
{
m_interface->callDBus(QLatin1String("closeDuplex"), client);
}
QFuture<void> CHubProxy::openDuplexAsync(const CIdentifier &client)
{
return m_interface->callDBusFuture<bool>(QLatin1String("openDuplex"), client);
}
CHubProxy::~CHubProxy()
{
closeDuplex(CIdentifier::anonymous());
}
}
}
}

View File

@@ -0,0 +1,68 @@
/* Copyright (C) 2020
* swift Project Community / Contributors
*
* This file is part of swift project. It is subject to the license terms in the LICENSE file found in the top-level
* directory of this distribution. No part of swift project, including this file, may be copied, modified, propagated,
* or distributed except according to the terms contained in the LICENSE file.
*/
//! \file
#ifndef BLACKMISC_SHAREDSTATE_DBUS_HUBPROXY_H
#define BLACKMISC_SHAREDSTATE_DBUS_HUBPROXY_H
#include <QDBusConnection>
#include "blackmisc/sharedstate/dbus/hub.h"
namespace BlackMisc
{
class CGenericDBusInterface;
namespace SharedState
{
namespace DBus
{
/*!
* Client side implementation of IHub.
* \ingroup SharedState
*/
class BLACKMISC_EXPORT CHubProxy : public IHub
{
Q_OBJECT
Q_CLASSINFO("D-Bus Interface", BLACKMISC_HUB_INTERFACE)
public:
//! Constructor.
CHubProxy(const QDBusConnection &connection, const QString &service, QObject *parent = nullptr);
//! Destructor.
virtual ~CHubProxy() override;
//! \name Interface implementations
//! @{
virtual bool isConnected() const override;
virtual std::pair<QSharedPointer<IDuplex>, QFuture<void>> getDuplex() override;
//! @}
public slots:
//! \name Interface implementations
//! @{
virtual bool openDuplex(const BlackMisc::CIdentifier &client) override;
virtual void closeDuplex(const BlackMisc::CIdentifier &client) override;
//! @}
protected:
//! \name Interface implementations
//! @{
virtual QFuture<void> openDuplexAsync(const CIdentifier &client) override;
//! @}
private:
CGenericDBusInterface *m_interface = nullptr;
QString m_service;
};
}
}
}
#endif