ONE - On-device Neural Engine
Loading...
Searching...
No Matches
onert::exec::ParallelExecutor Class Reference

Class to execute Graph in parallel. More...

#include <ParallelExecutor.h>

Collaboration diagram for onert::exec::ParallelExecutor:

Public Member Functions

 ParallelExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a ParallelExecutor object.
 
void executeImpl (const ExecutionObservee &subject) override
 
- Public Member Functions inherited from onert::exec::DataflowExecutor
 DataflowExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a DataflowExecutor object.
 
- Public Member Functions inherited from onert::exec::ExecutorBase
 ExecutorBase (std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
 Construct a new ExecutorBase object.
 
virtual ~ExecutorBase ()=default
 
const ir::Graphgraph () const final
 Returns graph object.
 
void execute (const std::vector< backend::IPortableTensor * > &inputs, const std::vector< backend::IPortableTensor * > &outputs, const ExecutionOptions &options) override
 Execute with given input/output tensors.
 
uint32_t inputSize () const override
 Get input size.
 
uint32_t outputSize () const override
 Get output size.
 
const ir::OperandInfoinputInfo (uint32_t index) const override
 Get input info at index.
 
const ir::OperandInfooutputInfo (uint32_t index) const override
 Get output info at index.
 
ir::Layout inputLayout (uint32_t index) const override
 Get input layout at index.
 
ir::Layout outputLayout (uint32_t index) const override
 Get output layout at index.
 
void setIndexedRanks (std::shared_ptr< ir::OperationIndexMap< int64_t > > ranks) final
 Set an ordering on operations.
 
void addObserver (std::unique_ptr< IExecutionObserver > ref)
 
backend::BackendContextsgetBackendContexts ()
 
const ExecutionOptionscurrentOptions () const override
 Return current execution configuration.
 
- Public Member Functions inherited from onert::exec::IExecutor
 IExecutor ()=default
 Construct a new IExecutor object.
 
virtual ~IExecutor ()=default
 Destroy the IExecutor object.
 

Protected Member Functions

void notify (uint32_t finished_job_id) override
 
- Protected Member Functions inherited from onert::exec::DataflowExecutor
bool noWaitingJobs ()
 
int64_t calculateRank (const std::vector< ir::OperationIndex > &operations)
 
void emplaceToReadyJobs (const uint32_t &id)
 
- Protected Member Functions inherited from onert::exec::ExecutorBase
bool hasDynamicInput ()
 Returns true if any input tensor is dynamic; false if all are static tensors.
 

Additional Inherited Members

- Protected Attributes inherited from onert::exec::DataflowExecutor
compiler::CodeMap _code_map
 
std::vector< std::unique_ptr< Job > > _finished_jobs
 A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.
 
std::vector< std::unique_ptr< Job > > _waiting_jobs
 A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.
 
std::vector< std::list< uint32_t > > _output_info
 Jobs' output info Used for notifying after finishing a job.
 
std::vector< uint32_t > _initial_input_info
 
std::vector< uint32_t > _input_info
 
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
 A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks
 
std::unordered_map< uint32_t, ir::OperationIndex_job_to_op
 Which job runs which op and function.
 
- Protected Attributes inherited from onert::exec::ExecutorBase
ExecObservers _observers
 
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
 
std::unique_ptr< compiler::LoweredGraph_lowered_graph
 
backend::BackendContexts _backend_contexts
 
const ir::Graph_graph
 
std::vector< backend::builtin::IOTensor * > _input_tensors
 
std::vector< backend::builtin::IOTensor * > _output_tensors
 
std::mutex _mutex
 
const util::TracingCtx_tracing_ctx
 
ExecutionOptions _current_options
 

Detailed Description

Class to execute Graph in parallel.

Definition at line 35 of file ParallelExecutor.h.

Constructor & Destructor Documentation

◆ ParallelExecutor()

onert::exec::ParallelExecutor::ParallelExecutor ( std::unique_ptr< compiler::LoweredGraph lowered_graph,
backend::BackendContexts &&  backend_contexts,
const compiler::TensorRegistries tensor_regs,
compiler::CodeMap &&  code_map,
const util::TracingCtx tracing_ctx 
)

Constructs a ParallelExecutor object.

Parameters
lowered_graphLoweredGraph object
tensor_buildersTensor builders that are currently used
code_mapir::Operation and its code map

Definition at line 62 of file ParallelExecutor.cc.

67 : DataflowExecutor{std::move(lowered_graph), std::move(backend_contexts), tensor_regs,
68 std::move(code_map), tracing_ctx}
69{
70 VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
71}
DataflowExecutor(std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
Constructs a DataflowExecutor object.
#define VERBOSE(name, lv)
Definition Log.h:71

References VERBOSE.

Member Function Documentation

◆ executeImpl()

void onert::exec::ParallelExecutor::executeImpl ( const ExecutionObservee subject)
overridevirtual

Reimplemented from onert::exec::DataflowExecutor.

Definition at line 73 of file ParallelExecutor.cc.

74{
75 bool dynamic_input_exists = hasDynamicInput();
76
77 // Init scheduler
78 // TODO Consider to have distinct backend set in GraphLowerInfo
80 for (const auto &[idx, backend] : _lowered_graph->lower_info().operation)
81 backends.add(backend);
82
83 _scheduler = std::make_unique<ParallelScheduler>(backends);
84
85 assert(noWaitingJobs());
86
87 // Execution setup
88 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
89
90 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
91 {
92 VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
93 if (_input_info[i] == 0)
94 {
96 }
97 }
98 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
99
100 VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
101
102 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
103
104 subject.notifySubgraphBegin(profiling_subg_index);
105
106 while (true)
107 {
108 std::unique_lock<std::mutex> lock{_mu_jobs};
109
110 if (_ready_jobs.empty())
111 {
112 _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); });
113 // Check finish condition
114 if (_ready_jobs.empty() && noWaitingJobs())
115 {
116 break;
117 }
118 }
119
120 auto job = std::move(_ready_jobs.begin()->second);
121 _ready_jobs.erase(_ready_jobs.begin());
122
123 lock.unlock();
124
125 VERBOSE(ParallelExecutor) << "Assigning fn " << job->index() << std::endl;
126
127 auto job_index = job->index();
128 auto op_ind = _job_to_op[job_index];
129 const auto backend = _lowered_graph->lower_info().operation.at(op_ind);
130 auto setup = [&, op_ind, backend]() {
131 subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
132 };
133 auto teardown = [&, job_index, op_ind, backend]() {
134 subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
135 notify(job_index);
136 };
137
138 job->fn_seq()->initRunning();
139
140 // dynamic tensor setting
141 bool handle_dynamic_tensor =
142 _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
143 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
144
145 _scheduler->assign(std::make_unique<HookFunction>(job->fn_seq(), setup, teardown), backend);
146 _finished_jobs[job_index] = std::move(job);
147 }
148
149 assert(noWaitingJobs());
150
151 // Wait for all the jobs done
152 _scheduler->finish();
153 subject.notifySubgraphEnd(profiling_subg_index);
154
155 // Reset input info for the next execution
157}
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
A collection of jobs that are ready for execution Jobs in it are ready to be scheduled....
std::vector< uint32_t > _input_info
std::vector< std::unique_ptr< Job > > _waiting_jobs
A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when ...
std::vector< std::unique_ptr< Job > > _finished_jobs
A vector of finished jobs for current execution After a run it has all the jobs of this execution for...
std::vector< uint32_t > _initial_input_info
void emplaceToReadyJobs(const uint32_t &id)
std::unordered_map< uint32_t, ir::OperationIndex > _job_to_op
Which job runs which op and function.
std::unique_ptr< compiler::LoweredGraph > _lowered_graph
bool hasDynamicInput()
Returns true if any input tensor is dynamic; false if all are static tensors.
const util::TracingCtx * _tracing_ctx
const ir::Graph & _graph
void notify(uint32_t finished_job_id) override
ir::SubgraphIndex getSubgraphIndex(const ir::Graph *g) const
Get subgraph index of a graph.
Definition TracingCtx.h:61
util::Set< const backend::Backend * > BackendSet
Definition BackendSet.h:35

References onert::exec::DataflowExecutor::_finished_jobs, onert::exec::ExecutorBase::_graph, onert::exec::DataflowExecutor::_initial_input_info, onert::exec::DataflowExecutor::_input_info, onert::exec::DataflowExecutor::_job_to_op, onert::exec::ExecutorBase::_lowered_graph, onert::exec::DataflowExecutor::_ready_jobs, onert::exec::ExecutorBase::_tracing_ctx, onert::exec::DataflowExecutor::_waiting_jobs, onert::exec::DataflowExecutor::emplaceToReadyJobs(), onert::util::TracingCtx::getSubgraphIndex(), onert::exec::ExecutorBase::hasDynamicInput(), notify(), onert::exec::ExecutionObservee::notifyJobBegin(), onert::exec::ExecutionObservee::notifyJobEnd(), onert::exec::ExecutionObservee::notifySubgraphBegin(), onert::exec::ExecutionObservee::notifySubgraphEnd(), onert::exec::DataflowExecutor::noWaitingJobs(), and VERBOSE.

◆ notify()

void onert::exec::ParallelExecutor::notify ( uint32_t  finished_job_id)
overrideprotectedvirtual

Reimplemented from onert::exec::DataflowExecutor.

Definition at line 52 of file ParallelExecutor.cc.

53{
54 std::unique_lock<std::mutex> lock{_mu_jobs};
55
56 DataflowExecutor::notify(finished_job_id);
57
58 lock.unlock();
59 _cv_jobs.notify_all();
60}
virtual void notify(uint32_t finished_job_id)

References onert::exec::DataflowExecutor::notify().

Referenced by executeImpl().


The documentation for this class was generated from the following files: