Fork Vasum on GitHub Official Vasum Wiki on Tizen.org
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
processor.hpp
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 * Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
17 */
18 
25 #ifndef CARGO_IPC_INTERNALS_PROCESSOR_HPP
26 #define CARGO_IPC_INTERNALS_PROCESSOR_HPP
27 
39 #include "cargo-ipc/exception.hpp"
41 #include "cargo-ipc/types.hpp"
42 #include "cargo-fd/cargo-fd.hpp"
43 #include "cargo/fields.hpp"
44 #include "logger/logger.hpp"
45 #include "logger/logger-scope.hpp"
46 
47 #include <ostream>
48 #include <condition_variable>
49 #include <mutex>
50 #include <chrono>
51 #include <vector>
52 #include <thread>
53 #include <string>
54 #include <list>
55 #include <functional>
56 #include <unordered_map>
57 #include <utility>
58 
59 namespace cargo {
60 namespace ipc {
61 
62 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
88 class Processor {
89 private:
90  enum class Event {
91  FINISH, // Shutdown request
92  METHOD, // New method call in the queue
93  SIGNAL, // New signal call in the queue
94  ADD_PEER, // New peer in the queue
95  REMOVE_PEER, // Remove peer
96  SEND_RESULT, // Send the result of a method's call
97  REMOVE_METHOD // Remove method handler
98  };
99 
100 public:
101  friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
102 
107 
112 
116  static const MethodID ERROR_METHOD_ID;
117 
128  Processor(epoll::EventPoll& eventPoll,
129  const std::string& logName = "",
130  const PeerCallback& newPeerCallback = nullptr,
131  const PeerCallback& removedPeerCallback = nullptr,
132  const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
133  ~Processor();
134 
135  Processor(const Processor&) = delete;
136  Processor(Processor&&) = delete;
137  Processor& operator=(const Processor&) = delete;
138 
139 
143  void start();
144 
148  bool isStarted();
149 
156  void stop(bool wait);
157 
163  void setNewPeerCallback(const PeerCallback& newPeerCallback);
164 
170  void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
171 
179  PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
180 
193  template<typename SentDataType, typename ReceivedDataType>
194  void setMethodHandler(const MethodID methodID,
196 
211  template<typename ReceivedDataType>
212  void setSignalHandler(const MethodID methodID,
213  const typename SignalHandler<ReceivedDataType>::type& process);
214 
224  void sendResult(const MethodID methodID,
225  const PeerID& peerID,
226  const MessageID& messageID,
227  const std::shared_ptr<void>& data);
228 
237  void sendError(const PeerID& peerID,
238  const MessageID& messageID,
239  const int errorCode,
240  const std::string& message);
241 
249  void sendVoid(const MethodID methodID,
250  const PeerID& peerID,
251  const MessageID& messageID);
252 
260  void removeMethod(const MethodID methodID);
261 
266  bool isHandled(const MethodID methodID);
267 
279  template<typename SentDataType, typename ReceivedDataType>
280  std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
281  const PeerID& peerID,
282  const std::shared_ptr<SentDataType>& data,
283  unsigned int timeoutMS = 5000);
284 
295  template<typename SentDataType, typename ReceivedDataType>
296  MessageID callAsync(const MethodID methodID,
297  const PeerID& peerID,
298  const std::shared_ptr<SentDataType>& data,
299  const typename ResultHandler<ReceivedDataType>::type& process);
300 
311  template<typename SentDataType, typename ReceivedDataType>
312  MessageID callAsyncNonBlock(const MethodID methodID,
313  const PeerID& peerID,
314  const std::shared_ptr<SentDataType>& data,
315  const typename ResultHandler<ReceivedDataType>::type& process);
325  template<typename SentDataType>
326  void signal(const MethodID methodID,
327  const std::shared_ptr<SentDataType>& data);
328 
336  bool handleLostConnection(const FileDescriptor fd);
337 
345  bool handleInput(const FileDescriptor fd);
346 
352  bool handleEvent();
353 
358 
359 private:
360  typedef std::unique_lock<std::mutex> Lock;
362 
363  struct EmptyData {
365  };
366 
367  struct MessageHeader {
370 
372  (
373  methodID,
374  messageID
375  )
376  };
377 
379  RegisterSignalsProtocolMessage() = default;
380  explicit RegisterSignalsProtocolMessage(const std::vector<MethodID>& ids)
381  : ids(ids) {}
382 
383  std::vector<MethodID> ids;
384 
386  (
387  ids
388  )
389  };
390 
392  ErrorProtocolMessage() = default;
393  ErrorProtocolMessage(const MessageID& messageID, const int code, const std::string& message)
394  : messageID(messageID), code(code), message(message) {}
395 
397  int code;
398  std::string message;
399 
401  (
402  messageID,
403  code,
404  message
405  )
406  };
407 
408  struct MethodHandlers {
409  MethodHandlers(const MethodHandlers& other) = delete;
410  MethodHandlers& operator=(const MethodHandlers&) = delete;
411  MethodHandlers() = default;
412  MethodHandlers(MethodHandlers&&) = default;
413  MethodHandlers& operator=(MethodHandlers &&) = default;
414 
418  };
419 
420  struct SignalHandlers {
421  SignalHandlers(const SignalHandlers& other) = delete;
422  SignalHandlers& operator=(const SignalHandlers&) = delete;
423  SignalHandlers() = default;
424  SignalHandlers(SignalHandlers&&) = default;
425  SignalHandlers& operator=(SignalHandlers &&) = default;
426 
429  };
430 
432  ReturnCallbacks(const ReturnCallbacks& other) = delete;
433  ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
434  ReturnCallbacks() = default;
435  ReturnCallbacks(ReturnCallbacks&&) = default;
437 
439  : peerID(peerID), parse(parse), process(process) {}
440 
444  };
445 
446  struct PeerInfo {
447  PeerInfo(const PeerInfo& other) = delete;
448  PeerInfo& operator=(const PeerInfo&) = delete;
449  PeerInfo() = delete;
450 
451  PeerInfo(PeerInfo&&) = default;
452  PeerInfo& operator=(PeerInfo &&) = default;
453 
454  PeerInfo(PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
455  : peerID(peerID), socketPtr(socketPtr) {}
456 
458  std::shared_ptr<Socket> socketPtr;
459  };
460 
462 
463  typedef std::vector<PeerInfo> Peers;
464 
465  std::string mLogPrefix;
466 
468 
470 
471  std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
472  std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
473  std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
474 
476 
477  std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
478 
479  // Mutex for modifying any internal data
480  std::mutex mStateMutex;
481 
484 
485  unsigned int mMaxNumberOfPeers;
486 
487  template<typename SentDataType, typename ReceivedDataType>
488  void setMethodHandlerInternal(const MethodID methodID,
490 
491  template<typename ReceivedDataType>
492  void setSignalHandlerInternal(const MethodID methodID,
493  const typename SignalHandler<ReceivedDataType>::type& handler);
494 
495  template<typename SentDataType>
496  void signalInternal(const MethodID methodID,
497  const PeerID& peerID,
498  const std::shared_ptr<SentDataType>& data);
499 
500  // Request handlers
501  bool onMethodRequest(MethodRequest& request);
502  bool onSignalRequest(SignalRequest& request);
503  bool onAddPeerRequest(AddPeerRequest& request);
504  bool onRemovePeerRequest(RemovePeerRequest& request);
505  bool onSendResultRequest(SendResultRequest& request);
507  bool onFinishRequest(FinishRequest& request);
508 
509  bool onReturnValue(Peers::iterator& peerIt,
510  const MessageID& messageID);
511  bool onRemoteMethod(Peers::iterator& peerIt,
512  const MethodID methodID,
513  const MessageID& messageID,
514  std::shared_ptr<MethodHandlers> methodCallbacks);
515  bool onRemoteSignal(Peers::iterator& peerIt,
516  const MethodID methodID,
517  const MessageID& messageID,
518  std::shared_ptr<SignalHandlers> signalCallbacks);
519 
520  void removePeerInternal(Peers::iterator peerIt,
521  const std::exception_ptr& exceptionPtr);
522  void removePeerSyncInternal(const PeerID& peerID, Lock& lock);
523 
524  bool onNewSignals(const PeerID& peerID,
525  std::shared_ptr<RegisterSignalsProtocolMessage>& data);
526 
527  bool onErrorSignal(const PeerID& peerID,
528  std::shared_ptr<ErrorProtocolMessage>& data);
529 
530  Peers::iterator getPeerInfoIterator(const FileDescriptor fd);
531  Peers::iterator getPeerInfoIterator(const PeerID& peerID);
532 
533 };
534 
535 template<typename SentDataType, typename ReceivedDataType>
538 {
539  MethodHandlers methodCall;
540 
541  methodCall.parse = [](const int fd)->std::shared_ptr<void> {
542  std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
543  cargo::loadFromFD<ReceivedDataType>(fd, *data);
544  return data;
545  };
546 
547  methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
548  cargo::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
549  };
550 
551  methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data, MethodResult::Pointer && methodResult) {
552  std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
553  return method(peerID, tmpData, std::forward<MethodResult::Pointer>(methodResult));
554  };
555 
556  mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
557 }
558 
559 template<typename SentDataType, typename ReceivedDataType>
562 {
563  if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
564  LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
565  throw IPCException("Forbidden methodID: " + std::to_string(methodID));
566  }
567 
568  {
569  Lock lock(mStateMutex);
570 
571  if (mSignalsCallbacks.count(methodID)) {
572  LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
573  throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
574  }
575 
576  setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
577  }
578 
579 }
580 
581 template<typename ReceivedDataType>
583  const typename SignalHandler<ReceivedDataType>::type& handler)
584 {
585  SignalHandlers signalCall;
586 
587  signalCall.parse = [](const int fd)->std::shared_ptr<void> {
588  std::shared_ptr<ReceivedDataType> dataToFill(new ReceivedDataType());
589  cargo::loadFromFD<ReceivedDataType>(fd, *dataToFill);
590  return dataToFill;
591  };
592 
593  signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& dataReceived) {
594  std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
595  return handler(peerID, tmpData);
596  };
597 
598  mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
599 }
600 
601 template<typename ReceivedDataType>
603  const typename SignalHandler<ReceivedDataType>::type& handler)
604 {
605  if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
606  LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
607  throw IPCException("Forbidden methodID: " + std::to_string(methodID));
608  }
609 
610  std::shared_ptr<RegisterSignalsProtocolMessage> data;
611 
612  {
613  Lock lock(mStateMutex);
614 
615  // Andd the signal handler:
616  if (mMethodsCallbacks.count(methodID)) {
617  LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
618  throw IPCException("MethodID used by a method: " + std::to_string(methodID));
619  }
620 
621  setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
622 
623  // Broadcast the new signal:
624  std::vector<MethodID> ids {methodID};
625  data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
626 
627  for (const PeerInfo& peerInfo : mPeerInfo) {
628  signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
629  peerInfo.peerID,
630  data);
631  }
632  }
633 }
634 
635 
636 template<typename SentDataType, typename ReceivedDataType>
638  const PeerID& peerID,
639  const std::shared_ptr<SentDataType>& data,
640  const typename ResultHandler<ReceivedDataType>::type& process)
641 {
642  Lock lock(mStateMutex);
643  return callAsyncNonBlock<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
644 }
645 
646 template<typename SentDataType, typename ReceivedDataType>
648  const PeerID& peerID,
649  const std::shared_ptr<SentDataType>& data,
650  const typename ResultHandler<ReceivedDataType>::type& process)
651 {
652  auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
654  return request->messageID;
655 }
656 
657 
658 template<typename SentDataType, typename ReceivedDataType>
659 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
660  const PeerID& peerID,
661  const std::shared_ptr<SentDataType>& data,
662  unsigned int timeoutMS)
663 {
665  std::condition_variable cv;
666 
667  auto process = [&result, &cv](const Result<ReceivedDataType> && r) {
668  // This is called under lock(mStateMutex)
669  result = std::move(r);
670  cv.notify_all();
671  };
672 
673  Lock lock(mStateMutex);
674  MessageID messageID = callAsyncNonBlock<SentDataType, ReceivedDataType>(methodID,
675  peerID,
676  data,
677  process);
678 
679  auto isResultInitialized = [&result]() {
680  return result.isValid();
681  };
682 
683  LOGT(mLogPrefix + "Waiting for the response...");
684  //In the case of too large sending time response can be received far after timeoutMS but
685  //before this thread wakes up and before predicate check (there will by no timeout exception)
686  if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
687  LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
688 
689  // Call isn't sent or call is sent but there is no reply
690  bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
691  return request.requestID == Event::METHOD &&
692  request.get<MethodRequest>()->messageID == messageID;
693  })
694  || 1 == mReturnCallbacks.erase(messageID);
695 
696  if (isTimeout) {
697  LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
698  removePeerSyncInternal(peerID, lock);
699  throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
700  } else {
701  LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
702  if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
703  LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
704  throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
705  }
706  }
707  }
708 
709  return result.get();
710 }
711 
712 template<typename SentDataType>
714  const PeerID& peerID,
715  const std::shared_ptr<SentDataType>& data)
716 {
717  auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
719 }
720 
721 template<typename SentDataType>
722 void Processor::signal(const MethodID methodID,
723  const std::shared_ptr<SentDataType>& data)
724 {
725  Lock lock(mStateMutex);
726  const auto it = mSignalsPeers.find(methodID);
727  if (it == mSignalsPeers.end()) {
728  LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
729  return;
730  }
731  for (const PeerID peerID : it->second) {
732  auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
733  mRequestQueue.pushBack(Event::SIGNAL, requestPtr);
734  }
735 }
736 
737 
738 
739 } // namespace ipc
740 } // namespace cargo
741 
742 #endif // CARGO_IPC_INTERNALS_PROCESSOR_HPP
int code
Definition: processor.hpp:397
Peers::iterator getPeerInfoIterator(const FileDescriptor fd)
Definition: processor.cpp:90
C++ epoll wrapper.
void pushFront(const RequestIdType requestID, const std::shared_ptr< void > &data=nullptr)
Push data to back of the queue.
Definition: request-queue.hpp:146
Processor & operator=(const Processor &)=delete
std::function< void(Result< Data > &&) > type
Definition: result.hpp:73
PeerCallback mRemovedPeerCallback
Definition: processor.hpp:483
bool onRemoteMethod(Peers::iterator &peerIt, const MethodID methodID, const MessageID &messageID, std::shared_ptr< MethodHandlers > methodCallbacks)
Definition: processor.cpp:443
std::function< void(ResultBuilder &)> ResultBuilderHandler
Definition: result-builder.hpp:66
std::unique_lock< std::mutex > Lock
Definition: processor.hpp:360
Definition: processor.hpp:367
bool onRemovePeerRequest(RemovePeerRequest &request)
Definition: processor.cpp:644
bool onRemoteSignal(Peers::iterator &peerIt, const MethodID methodID, const MessageID &messageID, std::shared_ptr< SignalHandlers > signalCallbacks)
Definition: processor.cpp:402
std::shared_ptr< MethodResult > Pointer
Definition: method-result.hpp:47
bool mIsRunning
Definition: processor.hpp:469
Processor's request to send the result of a method.
MethodHandlers & operator=(const MethodHandlers &)=delete
std::string PeerID
Definition: types.hpp:45
friend std::ostream & operator<<(std::ostream &os, const Processor::Event &event)
Definition: processor.cpp:759
Class for managing a queue of Requests carrying any data.
Definition: request-queue.hpp:44
bool onErrorSignal(const PeerID &peerID, std::shared_ptr< ErrorProtocolMessage > &data)
Definition: processor.cpp:352
Definition: finish-request.hpp:33
PeerCallback mNewPeerCallback
Definition: processor.hpp:482
#define CARGO_REGISTER_EMPTY
Register empty cargo class.
Definition: fields.hpp:42
Definition: method-request.hpp:38
bool handleEvent()
Handle one event from the internal event's queue.
Definition: processor.cpp:495
bool onSignalRequest(SignalRequest &request)
Definition: processor.cpp:575
void setSignalHandler(const MethodID methodID, const typename SignalHandler< ReceivedDataType >::type &process)
Saves the callbacks connected to the method id.
Definition: processor.hpp:602
std::vector< PeerInfo > Peers
Definition: processor.hpp:463
void sendError(const PeerID &peerID, const MessageID &messageID, const int errorCode, const std::string &message)
Send error result of the method.
Definition: processor.cpp:175
Definition: signal-request.hpp:35
PeerID peerID
Definition: processor.hpp:457
Definition: processor.hpp:363
bool handleInput(const FileDescriptor fd)
Handles input from one peer.
Definition: processor.cpp:290
std::shared_ptr< Data > get() const
Definition: result.hpp:55
Types definitions.
Definition: add-peer-request.hpp:34
#define LOGE(MESSAGE)
Logging errors.
Definition: logger.hpp:140
void setSignalHandlerInternal(const MethodID methodID, const typename SignalHandler< ReceivedDataType >::type &handler)
Definition: processor.hpp:582
void sendResult(const MethodID methodID, const PeerID &peerID, const MessageID &messageID, const std::shared_ptr< void > &data)
Send result of the method.
Definition: processor.cpp:166
~Processor()
Definition: processor.cpp:80
std::function< std::shared_ptr< void >cargo::ipc::FileDescriptor fd)> ParseCallback
Generic function type used as callback for reading and parsing data.
Definition: types.hpp:72
SerializeCallback serialize
Definition: processor.hpp:415
unsigned int mMaxNumberOfPeers
Definition: processor.hpp:485
void signalInternal(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data)
Definition: processor.hpp:713
Definition: processor.hpp:391
bool isHandled(const MethodID methodID)
Definition: processor.cpp:199
bool onRemoveMethodRequest(RemoveMethodRequest &request)
Definition: processor.cpp:703
PeerID peerID
Definition: processor.hpp:441
void pushBack(const RequestIdType requestID, const std::shared_ptr< void > &data=nullptr)
Push data to back of the queue.
Definition: request-queue.hpp:136
bool onFinishRequest(FinishRequest &request)
Definition: processor.cpp:709
Definition: remove-peer-request.hpp:36
void setNewPeerCallback(const PeerCallback &newPeerCallback)
Set the callback called for each new connection to a peer.
Definition: processor.cpp:148
MethodHandler< void, void >::type method
Definition: processor.hpp:417
Processor(epoll::EventPoll &eventPoll, const std::string &logName="", const PeerCallback &newPeerCallback=nullptr, const PeerCallback &removedPeerCallback=nullptr, const unsigned int maxNumberOfPeers=DEFAULT_MAX_NUMBER_OF_PEERS)
Constructs the Processor, but doesn't start it.
Definition: processor.cpp:59
std::function< void(const cargo::ipc::PeerID peerID, const cargo::ipc::FileDescriptor fd)> PeerCallback
Generic function type used as callback for peer events.
Definition: types.hpp:54
std::unordered_map< MethodID, std::list< PeerID > > mSignalsPeers
Definition: processor.hpp:473
MessageID callAsync(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, const typename ResultHandler< ReceivedDataType >::type &process)
Asynchronous method call.
Definition: processor.hpp:637
Definition: result.hpp:36
ParseCallback parse
Definition: processor.hpp:416
Processor's request to remove a peer.
void removeMethod(const MethodID methodID)
Removes the callback associated with specific method id.
Definition: processor.cpp:193
Definition: processor.hpp:408
void setMethodHandlerInternal(const MethodID methodID, const typename MethodHandler< SentDataType, ReceivedDataType >::type &process)
Definition: processor.hpp:536
std::function< bool(PeerID peerID, std::shared_ptr< ReceivedDataType > &data)> type
Definition: types.hpp:99
std::string message
Definition: processor.hpp:398
std::unordered_map< MethodID, std::shared_ptr< SignalHandlers > > mSignalsCallbacks
Definition: processor.hpp:472
char data[368]
Definition: initctl.cpp:41
void sendVoid(const MethodID methodID, const PeerID &peerID, const MessageID &messageID)
Indicate that the method handler finished.
Definition: processor.cpp:184
Peers mPeerInfo
Definition: processor.hpp:475
RequestQueue< Event > mRequestQueue
Definition: processor.hpp:467
SignalHandlers & operator=(const SignalHandlers &)=delete
MessageID messageID
Definition: processor.hpp:396
RequestQueue< Event >::Request Request
Definition: processor.hpp:361
Exception to indicate timeout event error.
Definition: exception.hpp:102
Event
Definition: processor.hpp:90
Definition: remove-method-request.hpp:33
#define LOGW(MESSAGE)
Logging warnings.
Definition: logger.hpp:143
ParseCallback parse
Definition: processor.hpp:442
PeerInfo & operator=(const PeerInfo &)=delete
ErrorProtocolMessage(const MessageID &messageID, const int code, const std::string &message)
Definition: processor.hpp:393
ReturnCallbacks(PeerID peerID, const ParseCallback &parse, const ResultBuilderHandler &process)
Definition: processor.hpp:438
Linux socket wrapper.
SignalHandler< void >::type signal
Definition: processor.hpp:428
static const MethodID ERROR_METHOD_ID
Error return message.
Definition: processor.hpp:116
void setMethodHandler(const MethodID methodID, const typename MethodHandler< SentDataType, ReceivedDataType >::type &process)
Saves the callbacks connected to the method id.
Definition: processor.hpp:560
bool isValid() const
Definition: result.hpp:61
#define CARGO_REGISTER(...)
Registers cargo fields within class.
Definition: fields.hpp:74
Class for sending the result of a method.
bool handleLostConnection(const FileDescriptor fd)
Removes one peer.
Definition: processor.cpp:281
void removePeerInternal(Peers::iterator peerIt, const std::exception_ptr &exceptionPtr)
Definition: processor.cpp:242
ResultBuilderHandler process
Definition: processor.hpp:443
std::mutex mStateMutex
Definition: processor.hpp:480
void stop(bool wait)
Stops the processing thread.
Definition: processor.cpp:123
Managing the queue with requests.
bool isStarted()
Definition: processor.cpp:104
void setRemovedPeerCallback(const PeerCallback &removedPeerCallback)
Set the callback called when connection to a peer is lost.
Definition: processor.cpp:154
Processor's request to call a method.
bool onMethodRequest(MethodRequest &request)
Definition: processor.cpp:524
Definition: processor.hpp:420
std::shared_ptr< Socket > socketPtr
Definition: processor.hpp:458
Definition: method-result.hpp:75
bool onReturnValue(Peers::iterator &peerIt, const MessageID &messageID)
Definition: processor.cpp:366
ReturnCallbacks & operator=(const ReturnCallbacks &)=delete
unsigned int MethodID
Definition: types.hpp:43
Definition: exception.hpp:39
ParseCallback parse
Definition: processor.hpp:427
void start()
Start processing.
Definition: processor.cpp:110
const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS
Definition: processor.hpp:62
Processor's request to send a signal.
MethodID methodID
Definition: processor.hpp:368
std::function< void(cargo::ipc::FileDescriptor fd, std::shared_ptr< void > &data)> SerializeCallback
Generic function type used as callback for serializing and saving serialized data to the descriptor...
Definition: types.hpp:64
Managing the queue of messages carrying any kind of data.
MessageID messageID
Definition: processor.hpp:369
Class for storing result of a method - data or exception.
This class waits on registered file descriptor for events.
Definition: event-poll.hpp:47
bool onNewSignals(const PeerID &peerID, std::shared_ptr< RegisterSignalsProtocolMessage > &data)
Definition: processor.cpp:341
bool removeIf(Predicate predicate)
Remove elements from the queue when the predicate returns true.
Definition: request-queue.hpp:172
std::function< bool(PeerID peerID, std::shared_ptr< ReceivedDataType > &data, MethodResult::Pointer methodResult) > type
Definition: method-result.hpp:78
Definition: send-result-request.hpp:34
std::shared_ptr< ReceivedDataType > callSync(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, unsigned int timeoutMS=5000)
Synchronous method call.
Definition: processor.hpp:659
static const MethodID RETURN_METHOD_ID
Used to indicate a message with the return value.
Definition: processor.hpp:106
#define LOGT(MESSAGE)
Logging tracing information.
Definition: logger.hpp:156
std::string mLogPrefix
Definition: processor.hpp:465
This class wraps communication via UX sockets.
Definition: processor.hpp:88
static const MethodID REGISTER_SIGNAL_METHOD_ID
Indicates an Processor's internal request/broadcast to register a Signal.
Definition: processor.hpp:111
int FileDescriptor
Definition: types.hpp:42
bool onSendResultRequest(SendResultRequest &request)
Definition: processor.cpp:656
Processor's request to remove the specified method handler.
Exceptions for the IPC.
std::unordered_map< MethodID, std::shared_ptr< MethodHandlers > > mMethodsCallbacks
Definition: processor.hpp:471
FileDescriptor getEventFD()
Definition: processor.cpp:160
epoll::EventPoll & mEventPoll
Definition: processor.hpp:461
PeerInfo(PeerID peerID, const std::shared_ptr< Socket > &socketPtr)
Definition: processor.hpp:454
Definition: processor.hpp:446
PeerID addPeer(const std::shared_ptr< Socket > &socketPtr)
From now on socket is owned by the Processor object.
Definition: processor.cpp:206
Generic type used as a callback function for handling signals.
Definition: types.hpp:97
std::string MessageID
Definition: types.hpp:44
void removePeerSyncInternal(const PeerID &peerID, Lock &lock)
Definition: processor.cpp:220
Scope logger class declaration.
Processor's request to add a peer.
MessageID callAsyncNonBlock(const MethodID methodID, const PeerID &peerID, const std::shared_ptr< SentDataType > &data, const typename ResultHandler< ReceivedDataType >::type &process)
The same as callAsync, but not blocking on the state mutex.
Definition: processor.hpp:647
Definition: processor.hpp:431
RegisterSignalsProtocolMessage(const std::vector< MethodID > &ids)
Definition: processor.hpp:380
std::vector< MethodID > ids
Definition: processor.hpp:383
std::unordered_map< MessageID, ReturnCallbacks > mReturnCallbacks
Definition: processor.hpp:477
void signal(const MethodID methodID, const std::shared_ptr< SentDataType > &data)
Send a signal to the peer.
Definition: processor.hpp:722
bool onAddPeerRequest(AddPeerRequest &request)
Definition: processor.cpp:606