ONE - On-device Neural Engine
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
onert::exec::DataflowExecutor Class Reference

#include <DataflowExecutor.h>

Collaboration diagram for onert::exec::DataflowExecutor:

Public Member Functions

 DataflowExecutor (std::unique_ptr< compiler::LoweredGraph > lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, compiler::CodeMap &&code_map, const util::TracingCtx *tracing_ctx)
 Constructs a DataflowExecutor object.
 
void executeImpl (const ExecutionObservee &subject) override
 
- Public Member Functions inherited from onert::exec::ExecutorBase
 ExecutorBase (std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
 Construct a new ExecutorBase object.
 
virtual ~ExecutorBase ()=default
 
const ir::Graphgraph () const final
 Returns graph object.
 
void execute (const std::vector< backend::IPortableTensor * > &inputs, const std::vector< backend::IPortableTensor * > &outputs, const ExecutionOptions &options) override
 Execute with given input/output tensors.
 
uint32_t inputSize () const override
 Get input size.
 
uint32_t outputSize () const override
 Get output size.
 
const ir::OperandInfoinputInfo (uint32_t index) const override
 Get input info at index.
 
const ir::OperandInfooutputInfo (uint32_t index) const override
 Get output info at index.
 
ir::Layout inputLayout (uint32_t index) const override
 Get input layout at index.
 
ir::Layout outputLayout (uint32_t index) const override
 Get output layout at index.
 
void setIndexedRanks (std::shared_ptr< ir::OperationIndexMap< int64_t > > ranks) final
 Set an ordering on operations.
 
void addObserver (std::unique_ptr< IExecutionObserver > ref)
 
backend::BackendContextsgetBackendContexts ()
 
const ExecutionOptionscurrentOptions () const override
 Return current execution configuration.
 
- Public Member Functions inherited from onert::exec::IExecutor
 IExecutor ()=default
 Construct a new IExecutor object.
 
virtual ~IExecutor ()=default
 Destroy the IExecutor object.
 

Protected Member Functions

virtual void notify (uint32_t finished_job_id)
 
bool noWaitingJobs ()
 
int64_t calculateRank (const std::vector< ir::OperationIndex > &operations)
 
void emplaceToReadyJobs (const uint32_t &id)
 
- Protected Member Functions inherited from onert::exec::ExecutorBase
bool hasDynamicInput ()
 Returns true if any input tensor is dynamic; false if all are static tensors.
 

Protected Attributes

compiler::CodeMap _code_map
 
std::vector< std::unique_ptr< Job > > _finished_jobs
 A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.
 
std::vector< std::unique_ptr< Job > > _waiting_jobs
 A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.
 
std::vector< std::list< uint32_t > > _output_info
 Jobs' output info Used for notifying after finishing a job.
 
std::vector< uint32_t > _initial_input_info
 
std::vector< uint32_t > _input_info
 
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
 A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks
 
std::unordered_map< uint32_t, ir::OperationIndex_job_to_op
 Which job runs which op and function.
 
- Protected Attributes inherited from onert::exec::ExecutorBase
ExecObservers _observers
 
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
 
std::unique_ptr< compiler::LoweredGraph_lowered_graph
 
backend::BackendContexts _backend_contexts
 
const ir::Graph_graph
 
std::vector< backend::builtin::IOTensor * > _input_tensors
 
std::vector< backend::builtin::IOTensor * > _output_tensors
 
std::mutex _mutex
 
const util::TracingCtx_tracing_ctx
 
ExecutionOptions _current_options
 

Detailed Description

Definition at line 35 of file DataflowExecutor.h.

Constructor & Destructor Documentation

◆ DataflowExecutor()

onert::exec::DataflowExecutor::DataflowExecutor ( std::unique_ptr< compiler::LoweredGraph lowered_graph,
backend::BackendContexts &&  backend_contexts,
const compiler::TensorRegistries tensor_regs,
compiler::CodeMap &&  code_map,
const util::TracingCtx tracing_ctx 
)

Constructs a DataflowExecutor object.

Parameters
lowered_graphLoweredGraph object
tensor_buildersTensor builders that are currently used
code_mapir::Operation and its code map

Definition at line 77 of file DataflowExecutor.cc.

82 : ExecutorBase{std::move(lowered_graph), std::move(backend_contexts), tensor_regs, tracing_ctx},
83 _code_map{std::move(code_map)}
84{
85 VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
86
87 // Assign jobs convert OperationIndex to job index(uint32_t)
88 uint32_t next_job_index = 0;
89 std::unordered_map<ir::OperationIndex, uint32_t> op_to_job;
90 const auto &operations = _lowered_graph->graph().operations();
91 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &) {
92 VERBOSE(DataflowExecutor) << "Create a job " << next_job_index << " with Operation " << op_ind
93 << std::endl;
94 _finished_jobs.emplace_back(
95 std::make_unique<Job>(next_job_index, _code_map.at(op_ind).fn_seq.get()));
96 op_to_job[op_ind] = next_job_index++;
97 });
98
99 _waiting_jobs.resize(next_job_index);
100 _output_info.resize(next_job_index);
101 _initial_input_info.resize(next_job_index, 0);
102
103 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &op) {
104 auto job_index = op_to_job[op_ind];
105 for (auto &&output : op.getOutputs())
106 {
107 // Update output and input info
108 operations.iterate([&](const ir::OperationIndex &op_cur_ind, const ir::IOperation &op_cur) {
109 if (op_cur.getInputs().contains(output))
110 {
111 auto dep_index = op_to_job[op_cur_ind];
112 ++_initial_input_info[dep_index];
113 _output_info[job_index].push_back(dep_index);
114 }
115 });
116 }
117 });
118 for (const auto &[op_ind, job_ind] : op_to_job)
119 _job_to_op.emplace(job_ind, op_ind);
120
122}
std::vector< uint32_t > _input_info
std::vector< std::list< uint32_t > > _output_info
Jobs' output info Used for notifying after finishing a job.
std::vector< std::unique_ptr< Job > > _waiting_jobs
A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when ...
std::vector< std::unique_ptr< Job > > _finished_jobs
A vector of finished jobs for current execution After a run it has all the jobs of this execution for...
std::vector< uint32_t > _initial_input_info
std::unordered_map< uint32_t, ir::OperationIndex > _job_to_op
Which job runs which op and function.
ExecutorBase(std::unique_ptr< compiler::LoweredGraph > &&lowered_graph, backend::BackendContexts &&backend_contexts, const compiler::TensorRegistries &tensor_regs, const util::TracingCtx *tracing_ctx)
Construct a new ExecutorBase object.
std::unique_ptr< compiler::LoweredGraph > _lowered_graph
#define VERBOSE(name, lv)
Definition Log.h:71
::onert::util::Index< uint32_t, OperationIndexTag > OperationIndex
Definition Index.h:30

References _code_map, _finished_jobs, _initial_input_info, _input_info, _job_to_op, onert::exec::ExecutorBase::_lowered_graph, _output_info, _waiting_jobs, onert::ir::OperandIndexSequence::contains(), onert::ir::IOperation::getInputs(), onert::ir::IOperation::getOutputs(), and VERBOSE.

Member Function Documentation

◆ calculateRank()

int64_t onert::exec::DataflowExecutor::calculateRank ( const std::vector< ir::OperationIndex > &  operations)
protected

Definition at line 26 of file DataflowExecutor.cc.

27{
28 int64_t rank = 0;
29 if (!_indexed_ranks)
30 {
31 return rank;
32 }
33 for (const auto &operation_idx : operations)
34 {
35 auto it = _indexed_ranks->find(operation_idx);
36 if (it == _indexed_ranks->end())
37 {
38 assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
39 operations.size() == 1);
40 // run Permute ASAP for next operations to be ready for other backends
41 return std::numeric_limits<int64_t>::max();
42 }
43 else
44 {
45 rank += it->second;
46 }
47 }
48 return rank;
49}
std::shared_ptr< ir::OperationIndexMap< int64_t > > _indexed_ranks
const ir::Graph & _graph
const Operations & operations() const override
Definition Graph.h:112
const Object & at(const Index &index) const
Get the object that is associated with the given index.

References onert::exec::ExecutorBase::_graph, onert::exec::ExecutorBase::_indexed_ranks, onert::util::ObjectManager< Index, Object >::at(), and onert::ir::Graph::operations().

Referenced by emplaceToReadyJobs().

◆ emplaceToReadyJobs()

void onert::exec::DataflowExecutor::emplaceToReadyJobs ( const uint32_t &  id)
protected

Definition at line 51 of file DataflowExecutor.cc.

52{
53 auto &job = _waiting_jobs[id];
54 assert(job != nullptr);
55 auto rank = calculateRank({_job_to_op[job->index()]});
56 _ready_jobs.emplace(rank, std::move(job));
57}
std::multimap< int64_t, std::unique_ptr< Job >, std::greater< int64_t > > _ready_jobs
A collection of jobs that are ready for execution Jobs in it are ready to be scheduled....
int64_t calculateRank(const std::vector< ir::OperationIndex > &operations)

References _job_to_op, _ready_jobs, _waiting_jobs, and calculateRank().

Referenced by executeImpl(), onert::exec::ParallelExecutor::executeImpl(), and notify().

◆ executeImpl()

void onert::exec::DataflowExecutor::executeImpl ( const ExecutionObservee subject)
overridevirtual

Implements onert::exec::ExecutorBase.

Reimplemented in onert::exec::ParallelExecutor.

Definition at line 124 of file DataflowExecutor.cc.

125{
126 assert(noWaitingJobs());
127
128 bool dynamic_input_exists = hasDynamicInput();
129
130 // Execution setup
131 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
132
133 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
134 {
135 if (_input_info[i] == 0)
136 {
138 }
139 }
140 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
141
142 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
143
144 subject.notifySubgraphBegin(profiling_subg_index);
145
146 while (!_ready_jobs.empty())
147 {
148 auto job = std::move((_ready_jobs.begin())->second);
149 _ready_jobs.erase(_ready_jobs.begin());
150 auto job_index = job->index();
151 VERBOSE(DataflowExecutor) << "Run job " << job_index << std::endl;
152
153 auto op_ind = _job_to_op[job_index];
154 const auto backend = _lowered_graph->lower_info().operation.at(op_ind);
155
156 subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
157
158 job->fn_seq()->initRunning();
159
160 // check if FunctionSequence needs to handle dynamic tensor
161 bool handle_dynamic_tensor =
162 _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
163 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
164
165 job->run();
166
167 subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
168 notify(job_index);
169 _finished_jobs[job_index] = std::move(job);
170 }
171 assert(noWaitingJobs());
172
173 subject.notifySubgraphEnd(profiling_subg_index);
174
175 // Reset input info for the next execution
177}
virtual void notify(uint32_t finished_job_id)
void emplaceToReadyJobs(const uint32_t &id)
bool hasDynamicInput()
Returns true if any input tensor is dynamic; false if all are static tensors.
const util::TracingCtx * _tracing_ctx
ir::SubgraphIndex getSubgraphIndex(const ir::Graph *g) const
Get subgraph index of a graph.
Definition TracingCtx.h:59

References _finished_jobs, onert::exec::ExecutorBase::_graph, _initial_input_info, _input_info, _job_to_op, onert::exec::ExecutorBase::_lowered_graph, _ready_jobs, onert::exec::ExecutorBase::_tracing_ctx, _waiting_jobs, emplaceToReadyJobs(), onert::util::TracingCtx::getSubgraphIndex(), onert::exec::ExecutorBase::hasDynamicInput(), notify(), onert::exec::ExecutionObservee::notifyJobBegin(), onert::exec::ExecutionObservee::notifyJobEnd(), onert::exec::ExecutionObservee::notifySubgraphBegin(), onert::exec::ExecutionObservee::notifySubgraphEnd(), noWaitingJobs(), and VERBOSE.

◆ notify()

void onert::exec::DataflowExecutor::notify ( uint32_t  finished_job_id)
protectedvirtual

Reimplemented in onert::exec::ParallelExecutor.

Definition at line 59 of file DataflowExecutor.cc.

60{
61 for (auto &&id : _output_info[finished_job_id])
62 {
63 assert(_input_info[id] > 0);
64 auto count = --_input_info[id];
65 if (count == 0) // No dependent jobs left, ready for execution
66 {
68 }
69 }
70}

References _input_info, _output_info, and emplaceToReadyJobs().

Referenced by executeImpl(), and onert::exec::ParallelExecutor::notify().

◆ noWaitingJobs()

bool onert::exec::DataflowExecutor::noWaitingJobs ( )
protected

Definition at line 71 of file DataflowExecutor.cc.

72{
73 return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
74 [](const std::unique_ptr<Job> &job) { return job == nullptr; });
75}

References _waiting_jobs.

Referenced by executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

Field Documentation

◆ _code_map

compiler::CodeMap onert::exec::DataflowExecutor::_code_map
protected

Definition at line 62 of file DataflowExecutor.h.

Referenced by DataflowExecutor().

◆ _finished_jobs

std::vector<std::unique_ptr<Job> > onert::exec::DataflowExecutor::_finished_jobs
protected

A vector of finished jobs for current execution After a run it has all the jobs of this execution for the next run.

Definition at line 67 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _initial_input_info

std::vector<uint32_t> onert::exec::DataflowExecutor::_initial_input_info
protected

◆ _input_info

std::vector<uint32_t> onert::exec::DataflowExecutor::_input_info
protected

◆ _job_to_op

std::unordered_map<uint32_t, ir::OperationIndex> onert::exec::DataflowExecutor::_job_to_op
protected

Which job runs which op and function.

Definition at line 88 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), emplaceToReadyJobs(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _output_info

std::vector<std::list<uint32_t> > onert::exec::DataflowExecutor::_output_info
protected

Jobs' output info Used for notifying after finishing a job.

Definition at line 77 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), and notify().

◆ _ready_jobs

std::multimap<int64_t, std::unique_ptr<Job>, std::greater<int64_t> > onert::exec::DataflowExecutor::_ready_jobs
protected

A collection of jobs that are ready for execution Jobs in it are ready to be scheduled. Ordered by priority from _indexed_ranks

Definition at line 85 of file DataflowExecutor.h.

Referenced by emplaceToReadyJobs(), executeImpl(), and onert::exec::ParallelExecutor::executeImpl().

◆ _waiting_jobs

std::vector<std::unique_ptr<Job> > onert::exec::DataflowExecutor::_waiting_jobs
protected

A vector of waiting jobs for current execution All the jobs are moved from _finished_jobs to it when start a run.

Definition at line 72 of file DataflowExecutor.h.

Referenced by DataflowExecutor(), emplaceToReadyJobs(), executeImpl(), onert::exec::ParallelExecutor::executeImpl(), and noWaitingJobs().


The documentation for this class was generated from the following files: