ONE - On-device Neural Engine
Loading...
Searching...
No Matches
onert::exec::DataflowExecutor Class Reference

#include <DataflowExecutor.h>

Collaboration diagram for onert::exec::DataflowExecutor:

Public Member Functions

 DataflowExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a DataflowExecutor object.
 
void executeImpl (const ExecutionObservee &subject) override
 
- Public Member Functions inherited from onert::exec::ExecutorBase
 ExecutorBase (std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
 Construct a new ExecutorBase object.
 
virtual ~ExecutorBase ()=default
 
const ir::Graphgraph () const final
 Returns graph object.
 
void execute (const std::vector< backend::IPortableTensor * > &inputs, const std::vector< backend::IPortableTensor * > &outputs, const ExecutionOptions &options) override
 Execute with given input/output tensors.
 
uint32_t inputSize () const override
 Get input size.
 
uint32_t outputSize () const override
 Get output size.
 
const ir::OperandInfoinputInfo (uint32_t index) const override
 Get input info at index.
 
const ir::OperandInfooutputInfo (uint32_t index) const override
 Get output info at index.
 
ir::Layout inputLayout (uint32_t index) const override
 Get input layout at index.
 
ir::Layout outputLayout (uint32_t index) const override
 Get output layout at index.
 
void setIndexedRanks (std::shared_ptr< ir::OperationIndexMap< int64_t > > ranks) final
 Set an ordering on operations.
 
void addObserver (std::unique_ptr< IExecutionObserver > ref)
 
backend::BackendContextsgetBackendContexts ()
 
const ExecutionOptionscurrentOptions () const override
 Return current execution configuration.
 
- Public Member Functions inherited from onert::exec::IExecutor
 IExecutor ()=default
 Construct a new IExecutor object.
 
virtual ~IExecutor ()=default
 Destroy the IExecutor object.
 

Protected Member Functions

virtual void notify (uint32_t finished_job_id)
 
bool noWaitingJobs ()
 
int64_t calculateRank (const std::vector< ir::OperationIndex > &operations)
 
void emplaceToReadyJobs (const uint32_t &id)
 
- Protected Member Functions inherited from onert::exec::ExecutorBase
bool hasDynamicInput ()
 Returns true if any input tensor is dynamic; false if all are static tensors.
 

Protected Attributes

compiler::CodeMap _code_map
 
std::vector< std::unique_ptr< Job > > _finished_jobs
 A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.
 
std::vector< std::unique_ptr< Job > > _waiting_jobs
 A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.
 
std::vector< std::list< uint32_t > > _output_info
 Jobs' output info Used for notifying after finishing a job.
 
std::vector< uint32_t > _initial_input_info
 
std::vector< uint32_t > _input_info
 
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
 A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks
 
std::unordered_map< uint32_t, ir::OperationIndex_job_to_op
 Which job runs which op and function.
 
- Protected Attributes inherited from onert::exec::ExecutorBase
ExecObservers _observers
 
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
 
std::unique_ptr< compiler::LoweredGraph_lowered_graph
 
backend::BackendContexts _backend_contexts
 
const ir::Graph_graph
 
std::vector< backend::builtin::IOTensor * > _input_tensors
 
std::vector< backend::builtin::IOTensor * > _output_tensors
 
std::mutex _mutex
 
const util::TracingCtx_tracing_ctx
 
ExecutionOptions _current_options
 

Detailed Description

Definition at line 37 of file DataflowExecutor.h.

Constructor & Destructor Documentation

◆ DataflowExecutor()

onert::exec::DataflowExecutor::DataflowExecutor ( std::unique_ptr< compiler::LoweredGraph lowered_graph,
backend::BackendContexts &&  backend_contexts,
const compiler::TensorRegistries tensor_regs,
compiler::CodeMap &&  code_map,
const util::TracingCtx tracing_ctx 
)

Constructs a DataflowExecutor object.

Parameters
lowered_graphLoweredGraph object
tensor_buildersTensor builders that are currently used
code_mapir::Operation and its code map

Definition at line 79 of file DataflowExecutor.cc.

84 : ExecutorBase{std::move(lowered_graph), std::move(backend_contexts), tensor_regs, tracing_ctx},
85 _code_map{std::move(code_map)}
86{
87 VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
88
89 // Assign jobs convert OperationIndex to job index(uint32_t)
90 uint32_t next_job_index = 0;
91 std::unordered_map<ir::OperationIndex, uint32_t> op_to_job;
92 const auto &operations = _lowered_graph->graph().operations();
93 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &) {
94 VERBOSE(DataflowExecutor) << "Create a job " << next_job_index << " with Operation " << op_ind
95 << std::endl;
96 _finished_jobs.emplace_back(
97 std::make_unique<Job>(next_job_index, _code_map.at(op_ind).fn_seq.get()));
98 op_to_job[op_ind] = next_job_index++;
99 });
100
101 _waiting_jobs.resize(next_job_index);
102 _output_info.resize(next_job_index);
103 _initial_input_info.resize(next_job_index, 0);
104
105 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &op) {
106 auto job_index = op_to_job[op_ind];
107 for (auto &&output : op.getOutputs())
108 {
109 // Update output and input info
110 operations.iterate([&](const ir::OperationIndex &op_cur_ind, const ir::IOperation &op_cur) {
111 if (op_cur.getInputs().contains(output))
112 {
113 auto dep_index = op_to_job[op_cur_ind];
114 ++_initial_input_info[dep_index];
115 _output_info[job_index].push_back(dep_index);
116 }
117 });
118 }
119 });
120 for (const auto &[op_ind, job_ind] : op_to_job)
121 _job_to_op.emplace(job_ind, op_ind);
122
124}
std::vector< uint32_t > _input_info
std::vector< std::list< uint32_t > > _output_info
Jobs' output info Used for notifying after finishing a job.
std::vector< std::unique_ptr< Job > > _waiting_jobs
A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when ...
std::vector< std::unique_ptr< Job > > _finished_jobs
A vector of finished jobs for current execution After a run it has all the jobs of this execution for...
std::vector< uint32_t > _initial_input_info
std::unordered_map< uint32_t, ir::OperationIndex > _job_to_op
Which job runs which op and function.
ExecutorBase(std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
Construct a new ExecutorBase object.
std::unique_ptr< compiler::LoweredGraph > _lowered_graph
#define VERBOSE(name, lv)
Definition Log.h:71
::onert::util::Index< uint32_t, OperationIndexTag > OperationIndex
Definition Index.h:32

References _code_map, _finished_jobs, _initial_input_info, _input_info, _job_to_op, onert::exec::ExecutorBase::_lowered_graph, _output_info, _waiting_jobs, onert::ir::OperandIndexSequence::contains(), onert::ir::IOperation::getInputs(), onert::ir::IOperation::getOutputs(), and VERBOSE.

Member Function Documentation

◆ calculateRank()

int64_t onert::exec::DataflowExecutor::calculateRank ( const std::vector< ir::OperationIndex > &  operations)
protected

Definition at line 28 of file DataflowExecutor.cc.

29{
30 int64_t rank = 0;
31 if (!_indexed_ranks)
32 {
33 return rank;
34 }
35 for (const auto &operation_idx : operations)
36 {
37 auto it = _indexed_ranks->find(operation_idx);
38 if (it == _indexed_ranks->end())
39 {
40 assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41 operations.size() == 1);
42 // run Permute ASAP for next operations to be ready for other backends
43 return std::numeric_limits<int64_t>::max();
44 }
45 else
46 {
47 rank += it->second;
48 }
49 }
50 return rank;
51}
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
const ir::Graph & _graph
const Operations & operations() const override
Definition Graph.h:114
const Object & at(const Index &index) const
Get the object that is associated with the given index.

References onert::exec::ExecutorBase::_graph, onert::exec::ExecutorBase::_indexed_ranks, onert::util::ObjectManager< Index, Object >::at(), and onert::ir::Graph::operations().

Referenced by emplaceToReadyJobs().

◆ emplaceToReadyJobs()

void onert::exec::DataflowExecutor::emplaceToReadyJobs ( const uint32_t &  id)
protected

Definition at line 53 of file DataflowExecutor.cc.

54{
55 auto &job = _waiting_jobs[id];
56 assert(job != nullptr);
57 auto rank = calculateRank({_job_to_op[job->index()]});
58 _ready_jobs.emplace(rank, std::move(job));
59}
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
A collection of jobs that are ready for execution Jobs in it are ready to be scheduled....
int64_t calculateRank(const std::vector< ir::OperationIndex > &operations)

References _job_to_op, _ready_jobs, _waiting_jobs, and calculateRank().

Referenced by executeImpl(), onert::exec::ParallelExecutor::executeImpl(), and notify().

◆ executeImpl()

void onert::exec::DataflowExecutor::executeImpl ( const ExecutionObservee subject)
overridevirtual

Implements onert::exec::ExecutorBase.

Reimplemented in onert::exec::ParallelExecutor.

Definition at line 126 of file DataflowExecutor.cc.

127{
128 assert(noWaitingJobs());
129
130 bool dynamic_input_exists = hasDynamicInput();
131
132 // Execution setup
133 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
134
135 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
136 {
137 if (_input_info[i] == 0)
138 {
140 }
141 }
142 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
143
144 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
145
146 subject.notifySubgraphBegin(profiling_subg_index);
147
148 while (!_ready_jobs.empty())
149 {
150 auto job = std::move((_ready_jobs.begin())->second);
151 _ready_jobs.erase(_ready_jobs.begin());
152 auto job_index = job->index();
153 VERBOSE(DataflowExecutor) << "Run job " << job_index << std::endl;
154
155 auto op_ind = _job_to_op[job_index];
156 const auto backend = _lowered_graph->lower_info().operation.at(op_ind);
157
158 subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
159
160 job->fn_seq()->initRunning();
161
162 // check if FunctionSequence needs to handle dynamic tensor
163 bool handle_dynamic_tensor =
164 _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
165 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
166
167 job->run();
168
169 subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
170 notify(job_index);
171 _finished_jobs[job_index] = std::move(job);
172 }
173 assert(noWaitingJobs());
174
175 subject.notifySubgraphEnd(profiling_subg_index);
176
177 // Reset input info for the next execution
179}
virtual void notify(uint32_t finished_job_id)
void emplaceToReadyJobs(const uint32_t &id)
bool hasDynamicInput()
Returns true if any input tensor is dynamic; false if all are static tensors.
const util::TracingCtx * _tracing_ctx
ir::SubgraphIndex getSubgraphIndex(const ir::Graph *g) const
Get subgraph index of a graph.
Definition TracingCtx.h:61

References _finished_jobs, onert::exec::ExecutorBase::_graph, _initial_input_info, _input_info, _job_to_op, onert::exec::ExecutorBase::_lowered_graph, _ready_jobs, onert::exec::ExecutorBase::_tracing_ctx, _waiting_jobs, emplaceToReadyJobs(), onert::util::TracingCtx::getSubgraphIndex(), onert::exec::ExecutorBase::hasDynamicInput(), notify(), onert::exec::ExecutionObservee::notifyJobBegin(), onert::exec::ExecutionObservee::notifyJobEnd(), onert::exec::ExecutionObservee::notifySubgraphBegin(), onert::exec::ExecutionObservee::notifySubgraphEnd(), noWaitingJobs(), and VERBOSE.

◆ notify()

void onert::exec::DataflowExecutor::notify ( uint32_t  finished_job_id)
protectedvirtual

Reimplemented in onert::exec::ParallelExecutor.

Definition at line 61 of file DataflowExecutor.cc.

62{
63 for (auto &&id : _output_info[finished_job_id])
64 {
65 assert(_input_info[id] > 0);
66 auto count = --_input_info[id];
67 if (count == 0) // No dependent jobs left, ready for execution
68 {
70 }
71 }
72}

References _input_info, _output_info, and emplaceToReadyJobs().

Referenced by executeImpl(), and onert::exec::ParallelExecutor::notify().

◆ noWaitingJobs()

bool onert::exec::DataflowExecutor::noWaitingJobs ( )
protected

Definition at line 73 of file DataflowExecutor.cc.

74{
75 return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
76 [](const std::unique_ptr<Job> &job) { return job == nullptr; });
77}

References _waiting_jobs.

Referenced by executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

Field Documentation

◆ _code_map

compiler::CodeMap onert::exec::DataflowExecutor::_code_map
protected

Definition at line 64 of file DataflowExecutor.h.

Referenced by DataflowExecutor().

◆ _finished_jobs

std::vector<std::unique_ptr<Job> > onert::exec::DataflowExecutor::_finished_jobs
protected

A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.

Definition at line 69 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _initial_input_info

std::vector<uint32_t> onert::exec::DataflowExecutor::_initial_input_info
protected

◆ _input_info

std::vector<uint32_t> onert::exec::DataflowExecutor::_input_info
protected

◆ _job_to_op

std::unordered_map<uint32_t, ir::OperationIndex> onert::exec::DataflowExecutor::_job_to_op
protected

Which job runs which op and function.

Definition at line 90 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), emplaceToReadyJobs(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _output_info

std::vector<std::list<uint32_t> > onert::exec::DataflowExecutor::_output_info
protected

Jobs' output info Used for notifying after finishing a job.

Definition at line 79 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), and notify().

◆ _ready_jobs

std::multimap<int64_t, std::unique_ptr<Job>, std::greater<int64_t> > onert::exec::DataflowExecutor::_ready_jobs
protected

A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks

Definition at line 87 of file DataflowExecutor.h.

Referenced by emplaceToReadyJobs(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _waiting_jobs

std::vector<std::unique_ptr<Job> > onert::exec::DataflowExecutor::_waiting_jobs
protected

A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.

Definition at line 74 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), emplaceToReadyJobs(), executeImpl(), onert::exec::ParallelExecutor::executeImpl(), and noWaitingJobs().


The documentation for this class was generated from the following files: