25 #ifndef CARGO_IPC_INTERNALS_PROCESSOR_HPP
26 #define CARGO_IPC_INTERNALS_PROCESSOR_HPP
48 #include <condition_variable>
56 #include <unordered_map>
129 const std::string& logName =
"",
156 void stop(
bool wait);
193 template<
typename SentDataType,
typename ReceivedDataType>
211 template<
typename ReceivedDataType>
227 const std::shared_ptr<void>&
data);
240 const std::string& message);
279 template<
typename SentDataType,
typename ReceivedDataType>
282 const std::shared_ptr<SentDataType>&
data,
283 unsigned int timeoutMS = 5000);
295 template<
typename SentDataType,
typename ReceivedDataType>
298 const std::shared_ptr<SentDataType>&
data,
311 template<
typename SentDataType,
typename ReceivedDataType>
314 const std::shared_ptr<SentDataType>&
data,
325 template<
typename SentDataType>
327 const std::shared_ptr<SentDataType>&
data);
360 typedef std::unique_lock<std::mutex>
Lock;
383 std::vector<MethodID>
ids;
394 : messageID(messageID), code(code), message(message) {}
439 : peerID(peerID), parse(parse), process(process) {}
455 : peerID(peerID), socketPtr(socketPtr) {}
463 typedef std::vector<PeerInfo>
Peers;
487 template<
typename SentDataType,
typename ReceivedDataType>
491 template<
typename ReceivedDataType>
495 template<
typename SentDataType>
498 const std::shared_ptr<SentDataType>&
data);
514 std::shared_ptr<MethodHandlers> methodCallbacks);
518 std::shared_ptr<SignalHandlers> signalCallbacks);
521 const std::exception_ptr& exceptionPtr);
525 std::shared_ptr<RegisterSignalsProtocolMessage>&
data);
528 std::shared_ptr<ErrorProtocolMessage>&
data);
535 template<
typename SentDataType,
typename ReceivedDataType>
541 methodCall.
parse = [](
const int fd)->std::shared_ptr<void> {
542 std::shared_ptr<ReceivedDataType>
data(
new ReceivedDataType());
543 cargo::loadFromFD<ReceivedDataType>(fd, *
data);
547 methodCall.
serialize = [](
const int fd, std::shared_ptr<void>&
data)->
void {
548 cargo::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(
data));
552 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(
data);
553 return method(peerID, tmpData, std::forward<MethodResult::Pointer>(methodResult));
556 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
559 template<
typename SentDataType,
typename ReceivedDataType>
565 throw IPCException(
"Forbidden methodID: " + std::to_string(methodID));
573 throw IPCException(
"MethodID used by a signal: " + std::to_string(methodID));
576 setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
581 template<
typename ReceivedDataType>
587 signalCall.
parse = [](
const int fd)->std::shared_ptr<void> {
588 std::shared_ptr<ReceivedDataType> dataToFill(
new ReceivedDataType());
589 cargo::loadFromFD<ReceivedDataType>(fd, *dataToFill);
593 signalCall.
signal = [handler](
const PeerID peerID, std::shared_ptr<void>& dataReceived) {
594 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
595 return handler(peerID, tmpData);
598 mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
601 template<
typename ReceivedDataType>
607 throw IPCException(
"Forbidden methodID: " + std::to_string(methodID));
610 std::shared_ptr<RegisterSignalsProtocolMessage>
data;
618 throw IPCException(
"MethodID used by a method: " + std::to_string(methodID));
621 setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
624 std::vector<MethodID> ids {methodID};
625 data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
636 template<
typename SentDataType,
typename ReceivedDataType>
639 const std::shared_ptr<SentDataType>&
data,
643 return callAsyncNonBlock<SentDataType, ReceivedDataType>(methodID, peerID,
data, process);
646 template<
typename SentDataType,
typename ReceivedDataType>
649 const std::shared_ptr<SentDataType>&
data,
652 auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID,
data, process);
654 return request->messageID;
658 template<
typename SentDataType,
typename ReceivedDataType>
661 const std::shared_ptr<SentDataType>&
data,
662 unsigned int timeoutMS)
665 std::condition_variable cv;
669 result = std::move(r);
674 MessageID messageID = callAsyncNonBlock<SentDataType, ReceivedDataType>(methodID,
679 auto isResultInitialized = [&result]() {
686 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
697 LOGE(
mLogPrefix +
"Function call timeout; methodID: " << methodID);
701 LOGW(
mLogPrefix +
"Timeout started during the return value processing, so wait for it to finish");
702 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
703 LOGE(
mLogPrefix +
"Function call timeout; methodID: " << methodID);
712 template<
typename SentDataType>
715 const std::shared_ptr<SentDataType>&
data)
717 auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID,
data);
721 template<
typename SentDataType>
723 const std::shared_ptr<SentDataType>&
data)
728 LOGW(
mLogPrefix +
"No peer is handling signal with methodID: " << methodID);
731 for (
const PeerID peerID : it->second) {
732 auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID,
data);
742 #endif // CARGO_IPC_INTERNALS_PROCESSOR_HPP
int code
Definition: processor.hpp:397
Peers::iterator getPeerInfoIterator(const FileDescriptor fd)
Definition: processor.cpp:90
void pushFront(const RequestIdType requestID, const std::shared_ptr< void > &data=nullptr)
Push data to back of the queue.
Definition: request-queue.hpp:146
Processor & operator=(const Processor &)=delete
std::function< void(Result< Data > &&) > type
Definition: result.hpp:73
PeerCallback mRemovedPeerCallback
Definition: processor.hpp:483
bool onRemoteMethod(Peers::iterator &peerIt, const MethodID methodID, const MessageID &messageID, std::shared_ptr< MethodHandlers > methodCallbacks)
Definition: processor.cpp:443
std::function< void(ResultBuilder &)> ResultBuilderHandler
Definition: result-builder.hpp:66
std::unique_lock< std::mutex > Lock
Definition: processor.hpp:360
bool onRemovePeerRequest(RemovePeerRequest &request)
Definition: processor.cpp:644
bool onRemoteSignal(Peers::iterator &peerIt, const MethodID methodID, const MessageID &messageID, std::shared_ptr< SignalHandlers > signalCallbacks)
Definition: processor.cpp:402
std::shared_ptr< MethodResult > Pointer
Definition: method-result.hpp:47
bool mIsRunning
Definition: processor.hpp:469
Processor's request to send the result of a method.
MethodHandlers & operator=(const MethodHandlers &)=delete
std::string PeerID
Definition: types.hpp:45
friend std::ostream & operator<<(std::ostream &os, const Processor::Event &event)
Definition: processor.cpp:759
Class for managing a queue of Requests carrying any data.
Definition: request-queue.hpp:44
bool onErrorSignal(const PeerID &peerID, std::shared_ptr< ErrorProtocolMessage > &data)
Definition: processor.cpp:352
Definition: finish-request.hpp:33
PeerCallback mNewPeerCallback
Definition: processor.hpp:482
#define CARGO_REGISTER_EMPTY
Register empty cargo class.
Definition: fields.hpp:42
Definition: method-request.hpp:38
bool handleEvent()
Handle one event from the internal event's queue.
Definition: processor.cpp:495
bool onSignalRequest(SignalRequest &request)
Definition: processor.cpp:575
void setSignalHandler(const MethodID methodID, const typename SignalHandler< ReceivedDataType >::type &process)
Saves the callbacks connected to the method id.
Definition: processor.hpp:602
std::vector< PeerInfo > Peers
Definition: processor.hpp:463
void sendError(const PeerID &peerID, const MessageID &messageID, const int errorCode, const std::string &message)
Send error result of the method.
Definition: processor.cpp:175
Definition: signal-request.hpp:35
PeerID peerID
Definition: processor.hpp:457
Definition: processor.hpp:363
bool handleInput(const FileDescriptor fd)
Handles input from one peer.
Definition: processor.cpp:290
std::shared_ptr< Data > get() const
Definition: result.hpp:55
Definition: processor.hpp:378
Definition: add-peer-request.hpp:34
#define LOGE(MESSAGE)
Logging errors.
Definition: logger.hpp:140
void setSignalHandlerInternal(const MethodID methodID, const typename SignalHandler< ReceivedDataType >::type &handler)
Definition: processor.hpp:582
void sendResult(const MethodID methodID, const PeerID &peerID, const MessageID &messageID, const std::shared_ptr< void > &data)
Send result of the method.
Definition: processor.cpp:166
~Processor()
Definition: processor.cpp:80
std::function< std::shared_ptr< void >cargo::ipc::FileDescriptor fd)> ParseCallback
Generic function type used as callback for reading and parsing data.
Definition: types.hpp:72
SerializeCallback serialize
Definition: processor.hpp:415
unsigned int mMaxNumberOfPeers
Definition: processor.hpp:485
void signalInternal(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data)
Definition: processor.hpp:713
Definition: processor.hpp:391
bool isHandled(const MethodID methodID)
Definition: processor.cpp:199
bool onRemoveMethodRequest(RemoveMethodRequest &request)
Definition: processor.cpp:703
PeerID peerID
Definition: processor.hpp:441
void pushBack(const RequestIdType requestID, const std::shared_ptr< void > &data=nullptr)
Push data to back of the queue.
Definition: request-queue.hpp:136
bool onFinishRequest(FinishRequest &request)
Definition: processor.cpp:709
Definition: remove-peer-request.hpp:36
void setNewPeerCallback(const PeerCallback &newPeerCallback)
Set the callback called for each new connection to a peer.
Definition: processor.cpp:148
MethodHandler< void, void >::type method
Definition: processor.hpp:417
Processor(epoll::EventPoll &eventPoll, const std::string &logName="", const PeerCallback &newPeerCallback=nullptr, const PeerCallback &removedPeerCallback=nullptr, const unsigned int maxNumberOfPeers=DEFAULT_MAX_NUMBER_OF_PEERS)
Constructs the Processor, but doesn't start it.
Definition: processor.cpp:59
std::function< void(const cargo::ipc::PeerID peerID, const cargo::ipc::FileDescriptor fd)> PeerCallback
Generic function type used as callback for peer events.
Definition: types.hpp:54
std::unordered_map< MethodID, std::list< PeerID > > mSignalsPeers
Definition: processor.hpp:473
MessageID callAsync(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, const typename ResultHandler< ReceivedDataType >::type &process)
Asynchronous method call.
Definition: processor.hpp:637
Definition: result.hpp:36
ParseCallback parse
Definition: processor.hpp:416
Processor's request to remove a peer.
ErrorProtocolMessage()=default
void removeMethod(const MethodID methodID)
Removes the callback associated with specific method id.
Definition: processor.cpp:193
Definition: processor.hpp:408
void setMethodHandlerInternal(const MethodID methodID, const typename MethodHandler< SentDataType, ReceivedDataType >::type &process)
Definition: processor.hpp:536
std::function< bool(PeerID peerID, std::shared_ptr< ReceivedDataType > &data)> type
Definition: types.hpp:99
std::string message
Definition: processor.hpp:398
std::unordered_map< MethodID, std::shared_ptr< SignalHandlers > > mSignalsCallbacks
Definition: processor.hpp:472
char data[368]
Definition: initctl.cpp:41
void sendVoid(const MethodID methodID, const PeerID &peerID, const MessageID &messageID)
Indicate that the method handler finished.
Definition: processor.cpp:184
Peers mPeerInfo
Definition: processor.hpp:475
RegisterSignalsProtocolMessage()=default
RequestQueue< Event > mRequestQueue
Definition: processor.hpp:467
SignalHandlers & operator=(const SignalHandlers &)=delete
MessageID messageID
Definition: processor.hpp:396
RequestQueue< Event >::Request Request
Definition: processor.hpp:361
Exception to indicate timeout event error.
Definition: exception.hpp:102
Event
Definition: processor.hpp:90
Definition: remove-method-request.hpp:33
#define LOGW(MESSAGE)
Logging warnings.
Definition: logger.hpp:143
ParseCallback parse
Definition: processor.hpp:442
PeerInfo & operator=(const PeerInfo &)=delete
ErrorProtocolMessage(const MessageID &messageID, const int code, const std::string &message)
Definition: processor.hpp:393
ReturnCallbacks(PeerID peerID, const ParseCallback &parse, const ResultBuilderHandler &process)
Definition: processor.hpp:438
SignalHandler< void >::type signal
Definition: processor.hpp:428
static const MethodID ERROR_METHOD_ID
Error return message.
Definition: processor.hpp:116
void setMethodHandler(const MethodID methodID, const typename MethodHandler< SentDataType, ReceivedDataType >::type &process)
Saves the callbacks connected to the method id.
Definition: processor.hpp:560
bool isValid() const
Definition: result.hpp:61
#define CARGO_REGISTER(...)
Registers cargo fields within class.
Definition: fields.hpp:74
Class for sending the result of a method.
bool handleLostConnection(const FileDescriptor fd)
Removes one peer.
Definition: processor.cpp:281
void removePeerInternal(Peers::iterator peerIt, const std::exception_ptr &exceptionPtr)
Definition: processor.cpp:242
ResultBuilderHandler process
Definition: processor.hpp:443
std::mutex mStateMutex
Definition: processor.hpp:480
void stop(bool wait)
Stops the processing thread.
Definition: processor.cpp:123
Managing the queue with requests.
bool isStarted()
Definition: processor.cpp:104
void setRemovedPeerCallback(const PeerCallback &removedPeerCallback)
Set the callback called when connection to a peer is lost.
Definition: processor.cpp:154
Processor's request to call a method.
bool onMethodRequest(MethodRequest &request)
Definition: processor.cpp:524
Definition: processor.hpp:420
std::shared_ptr< Socket > socketPtr
Definition: processor.hpp:458
Definition: method-result.hpp:75
bool onReturnValue(Peers::iterator &peerIt, const MessageID &messageID)
Definition: processor.cpp:366
ReturnCallbacks & operator=(const ReturnCallbacks &)=delete
unsigned int MethodID
Definition: types.hpp:43
Definition: exception.hpp:39
ParseCallback parse
Definition: processor.hpp:427
void start()
Start processing.
Definition: processor.cpp:110
const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS
Definition: processor.hpp:62
Processor's request to send a signal.
std::function< void(cargo::ipc::FileDescriptor fd, std::shared_ptr< void > &data)> SerializeCallback
Generic function type used as callback for serializing and saving serialized data to the descriptor...
Definition: types.hpp:64
Managing the queue of messages carrying any kind of data.
Class for storing result of a method - data or exception.
This class waits on registered file descriptor for events.
Definition: event-poll.hpp:47
bool onNewSignals(const PeerID &peerID, std::shared_ptr< RegisterSignalsProtocolMessage > &data)
Definition: processor.cpp:341
bool removeIf(Predicate predicate)
Remove elements from the queue when the predicate returns true.
Definition: request-queue.hpp:172
std::function< bool(PeerID peerID, std::shared_ptr< ReceivedDataType > &data, MethodResult::Pointer methodResult) > type
Definition: method-result.hpp:78
Definition: send-result-request.hpp:34
std::shared_ptr< ReceivedDataType > callSync(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, unsigned int timeoutMS=5000)
Synchronous method call.
Definition: processor.hpp:659
static const MethodID RETURN_METHOD_ID
Used to indicate a message with the return value.
Definition: processor.hpp:106
#define LOGT(MESSAGE)
Logging tracing information.
Definition: logger.hpp:156
std::string mLogPrefix
Definition: processor.hpp:465
This class wraps communication via UX sockets.
Definition: processor.hpp:88
static const MethodID REGISTER_SIGNAL_METHOD_ID
Indicates an Processor's internal request/broadcast to register a Signal.
Definition: processor.hpp:111
int FileDescriptor
Definition: types.hpp:42
bool onSendResultRequest(SendResultRequest &request)
Definition: processor.cpp:656
Processor's request to remove the specified method handler.
std::unordered_map< MethodID, std::shared_ptr< MethodHandlers > > mMethodsCallbacks
Definition: processor.hpp:471
FileDescriptor getEventFD()
Definition: processor.cpp:160
ReturnCallbacks()=default
epoll::EventPoll & mEventPoll
Definition: processor.hpp:461
PeerInfo(PeerID peerID, const std::shared_ptr< Socket > &socketPtr)
Definition: processor.hpp:454
Definition: processor.hpp:446
PeerID addPeer(const std::shared_ptr< Socket > &socketPtr)
From now on socket is owned by the Processor object.
Definition: processor.cpp:206
Generic type used as a callback function for handling signals.
Definition: types.hpp:97
std::string MessageID
Definition: types.hpp:44
void removePeerSyncInternal(const PeerID &peerID, Lock &lock)
Definition: processor.cpp:220
Scope logger class declaration.
Processor's request to add a peer.
MessageID callAsyncNonBlock(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, const typename ResultHandler< ReceivedDataType >::type &process)
The same as callAsync, but not blocking on the state mutex.
Definition: processor.hpp:647
Definition: processor.hpp:431
RegisterSignalsProtocolMessage(const std::vector< MethodID > &ids)
Definition: processor.hpp:380
std::vector< MethodID > ids
Definition: processor.hpp:383
std::unordered_map< MessageID, ReturnCallbacks > mReturnCallbacks
Definition: processor.hpp:477
void signal(const MethodID methodID, const std::shared_ptr< SentDataType > &data)
Send a signal to the peer.
Definition: processor.hpp:722
bool onAddPeerRequest(AddPeerRequest &request)
Definition: processor.cpp:606