ONE - On-device Neural Engine
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
onert::exec::ParallelExecutor Class Reference

Class to execute Graph in parallel. More...

#include <ParallelExecutor.h>

Collaboration diagram for onert::exec::ParallelExecutor:

Public Member Functions

 ParallelExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a ParallelExecutor object.
 
void executeImpl (const ExecutionObservee &subject) override
 
- Public Member Functions inherited from onert::exec::DataflowExecutor
 DataflowExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a DataflowExecutor object.
 
- Public Member Functions inherited from onert::exec::ExecutorBase
 ExecutorBase (std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
 Construct a new ExecutorBase object.
 
virtual ~ExecutorBase ()=default
 
const ir::Graphgraph () const final
 Returns graph object.
 
void execute (const std::vector< backend::IPortableTensor * > &inputs, const std::vector< backend::IPortableTensor * > &outputs, const ExecutionOptions &options) override
 Execute with given input/output tensors.
 
uint32_t inputSize () const override
 Get input size.
 
uint32_t outputSize () const override
 Get output size.
 
const ir::OperandInfoinputInfo (uint32_t index) const override
 Get input info at index.
 
const ir::OperandInfooutputInfo (uint32_t index) const override
 Get output info at index.
 
ir::Layout inputLayout (uint32_t index) const override
 Get input layout at index.
 
ir::Layout outputLayout (uint32_t index) const override
 Get output layout at index.
 
void setIndexedRanks (std::shared_ptr< ir::OperationIndexMap< int64_t > > ranks) final
 Set an ordering on operations.
 
void addObserver (std::unique_ptr< IExecutionObserver > ref)
 
backend::BackendContextsgetBackendContexts ()
 
const ExecutionOptionscurrentOptions () const override
 Return current execution configuration.
 
- Public Member Functions inherited from onert::exec::IExecutor
 IExecutor ()=default
 Construct a new IExecutor object.
 
virtual ~IExecutor ()=default
 Destroy the IExecutor object.
 

Protected Member Functions

void notify (uint32_t finished_job_id) override
 
- Protected Member Functions inherited from onert::exec::DataflowExecutor
bool noWaitingJobs ()
 
int64_t calculateRank (const std::vector< ir::OperationIndex > &operations)
 
void emplaceToReadyJobs (const uint32_t &id)
 
- Protected Member Functions inherited from onert::exec::ExecutorBase
bool hasDynamicInput ()
 Returns true if any input tensor is dynamic; false if all are static tensors.
 

Additional Inherited Members

- Protected Attributes inherited from onert::exec::DataflowExecutor
compiler::CodeMap _code_map
 
std::vector< std::unique_ptr< Job > > _finished_jobs
 A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.
 
std::vector< std::unique_ptr< Job > > _waiting_jobs
 A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.
 
std::vector< std::list< uint32_t > > _output_info
 Jobs' output info Used for notifying after finishing a job.
 
std::vector< uint32_t > _initial_input_info
 
std::vector< uint32_t > _input_info
 
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
 A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks
 
std::unordered_map< uint32_t, ir::OperationIndex_job_to_op
 Which job runs which op and function.
 
- Protected Attributes inherited from onert::exec::ExecutorBase
ExecObservers _observers
 
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
 
std::unique_ptr< compiler::LoweredGraph_lowered_graph
 
backend::BackendContexts _backend_contexts
 
const ir::Graph_graph
 
std::vector< backend::builtin::IOTensor * > _input_tensors
 
std::vector< backend::builtin::IOTensor * > _output_tensors
 
std::mutex _mutex
 
const util::TracingCtx_tracing_ctx
 
ExecutionOptions _current_options
 

Detailed Description

Class to execute Graph in parallel.

Definition at line 33 of file ParallelExecutor.h.

Constructor & Destructor Documentation

◆ ParallelExecutor()

onert::exec::ParallelExecutor::ParallelExecutor ( std::unique_ptr< compiler::LoweredGraph lowered_graph,
backend::BackendContexts &&  backend_contexts,
const compiler::TensorRegistries tensor_regs,
compiler::CodeMap &&  code_map,
const util::TracingCtx tracing_ctx 
)

Constructs a ParallelExecutor object.

Parameters
lowered_graphLoweredGraph object
tensor_buildersTensor builders that are currently used
code_mapir::Operation and its code map

Definition at line 60 of file ParallelExecutor.cc.

65 : DataflowExecutor{std::move(lowered_graph), std::move(backend_contexts), tensor_regs,
66 std::move(code_map), tracing_ctx}
67{
68 VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
69}
DataflowExecutor(std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
Constructs a DataflowExecutor object.
#define VERBOSE(name, lv)
Definition Log.h:71

References VERBOSE.

Member Function Documentation

◆ executeImpl()

void onert::exec::ParallelExecutor::executeImpl ( const ExecutionObservee subject)
overridevirtual

Reimplemented from onert::exec::DataflowExecutor.

Definition at line 71 of file ParallelExecutor.cc.

72{
73 bool dynamic_input_exists = hasDynamicInput();
74
75 // Init scheduler
76 // TODO Consider to have distinct backend set in GraphLowerInfo
78 for (const auto &[idx, backend] : _lowered_graph->lower_info().operation)
79 backends.add(backend);
80
81 _scheduler = std::make_unique<ParallelScheduler>(backends);
82
83 assert(noWaitingJobs());
84
85 // Execution setup
86 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
87
88 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
89 {
90 VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
91 if (_input_info[i] == 0)
92 {
94 }
95 }
96 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
97
98 VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
99
100 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
101
102 subject.notifySubgraphBegin(profiling_subg_index);
103
104 while (true)
105 {
106 std::unique_lock<std::mutex> lock{_mu_jobs};
107
108 if (_ready_jobs.empty())
109 {
110 _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); });
111 // Check finish condition
112 if (_ready_jobs.empty() && noWaitingJobs())
113 {
114 break;
115 }
116 }
117
118 auto job = std::move(_ready_jobs.begin()->second);
119 _ready_jobs.erase(_ready_jobs.begin());
120
121 lock.unlock();
122
123 VERBOSE(ParallelExecutor) << "Assigning fn " << job->index() << std::endl;
124
125 auto job_index = job->index();
126 auto op_ind = _job_to_op[job_index];
127 const auto backend = _lowered_graph->lower_info().operation.at(op_ind);
128 auto setup = [&, op_ind, backend]() {
129 subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
130 };
131 auto teardown = [&, job_index, op_ind, backend]() {
132 subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
133 notify(job_index);
134 };
135
136 job->fn_seq()->initRunning();
137
138 // dynamic tensor setting
139 bool handle_dynamic_tensor =
140 _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
141 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
142
143 _scheduler->assign(std::make_unique<HookFunction>(job->fn_seq(), setup, teardown), backend);
144 _finished_jobs[job_index] = std::move(job);
145 }
146
147 assert(noWaitingJobs());
148
149 // Wait for all the jobs done
150 _scheduler->finish();
151 subject.notifySubgraphEnd(profiling_subg_index);
152
153 // Reset input info for the next execution
155}
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
A collection of jobs that are ready for execution Jobs in it are ready to be scheduled....
std::vector< uint32_t > _input_info
std::vector< std::unique_ptr< Job > > _waiting_jobs
A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when ...
std::vector< std::unique_ptr< Job > > _finished_jobs
A vector of finished jobs for current execution After a run it has all the jobs of this execution for...
std::vector< uint32_t > _initial_input_info
void emplaceToReadyJobs(const uint32_t &id)
std::unordered_map< uint32_t, ir::OperationIndex > _job_to_op
Which job runs which op and function.
std::unique_ptr< compiler::LoweredGraph > _lowered_graph
bool hasDynamicInput()
Returns true if any input tensor is dynamic; false if all are static tensors.
const util::TracingCtx * _tracing_ctx
const ir::Graph & _graph
void notify(uint32_t finished_job_id) override
ir::SubgraphIndex getSubgraphIndex(const ir::Graph *g) const
Get subgraph index of a graph.
Definition TracingCtx.h:59
util::Set< const backend::Backend * > BackendSet
Definition BackendSet.h:30

References onert::exec::DataflowExecutor::_finished_jobs, onert::exec::ExecutorBase::_graph, onert::exec::DataflowExecutor::_initial_input_info, onert::exec::DataflowExecutor::_input_info, onert::exec::DataflowExecutor::_job_to_op, onert::exec::ExecutorBase::_lowered_graph, onert::exec::DataflowExecutor::_ready_jobs, onert::exec::ExecutorBase::_tracing_ctx, onert::exec::DataflowExecutor::_waiting_jobs, onert::exec::DataflowExecutor::emplaceToReadyJobs(), onert::util::TracingCtx::getSubgraphIndex(), onert::exec::ExecutorBase::hasDynamicInput(), notify(), onert::exec::ExecutionObservee::notifyJobBegin(), onert::exec::ExecutionObservee::notifyJobEnd(), onert::exec::ExecutionObservee::notifySubgraphBegin(), onert::exec::ExecutionObservee::notifySubgraphEnd(), onert::exec::DataflowExecutor::noWaitingJobs(), and VERBOSE.

◆ notify()

void onert::exec::ParallelExecutor::notify ( uint32_t  finished_job_id)
overrideprotectedvirtual

Reimplemented from onert::exec::DataflowExecutor.

Definition at line 50 of file ParallelExecutor.cc.

51{
52 std::unique_lock<std::mutex> lock{_mu_jobs};
53
54 DataflowExecutor::notify(finished_job_id);
55
56 lock.unlock();
57 _cv_jobs.notify_all();
58}
virtual void notify(uint32_t finished_job_id)

References onert::exec::DataflowExecutor::notify().

Referenced by executeImpl().


The documentation for this class was generated from the following files: