ONE - On-device Neural Engine
Loading...
Searching...
No Matches
WorkflowRunner.WorkflowRunner Class Reference

Public Member Functions

 __init__ (self, path)
 
 run (self, working_dir, verbose=False)
 

Data Fields

 json_contents
 
 adj
 
 workflow_sequence
 

Static Public Attributes

str WORKFLOWS_K = 'workflows'
 
str DEPENDENCIES_K = 'run-after'
 
str CFG_REFERENCE_K = 'cfg-reference'
 
str WORKFLOW_STEPS_K = 'steps'
 
str ONE_CMD_TOOL_K = 'one-cmd'
 
str COMMANDS_K = 'commands'
 

Protected Member Functions

 _check_cycle (self)
 
 _verify_workflow (self, json_contents)
 

Detailed Description

Definition at line 26 of file WorkflowRunner.py.

Constructor & Destructor Documentation

◆ __init__()

WorkflowRunner.WorkflowRunner.__init__ (   self,
  path 
)

Definition at line 34 of file WorkflowRunner.py.

34 def __init__(self, path):
35 try:
36 with open(path) as f:
37 self.json_contents = json.load(f)
38 except FileNotFoundError:
39 raise FileNotFoundError("Not found given workflow file")
40 except json.decoder.JSONDecodeError:
41 raise ImportError("Invalid workflow file")
42
43 self._verify_workflow(self.json_contents)
44
45 workflows = self.json_contents[self.WORKFLOWS_K]
46 self.adj = dict.fromkeys(workflows, [])
47 # decide the order according to the dependencies of each workflow.
48 helper = TopologicalSortHelper(workflows)
49 for workflow_k in workflows:
50 workflow = self.json_contents[workflow_k]
51 if self.DEPENDENCIES_K in workflow:
52 for previous_workflow in workflow[self.DEPENDENCIES_K]:
53 helper.add_edge(previous_workflow, workflow_k)
54 self.adj[previous_workflow].append(workflow_k)
55 self.workflow_sequence = helper.sort()
56
57 self._check_cycle()
58

Member Function Documentation

◆ _check_cycle()

WorkflowRunner.WorkflowRunner._check_cycle (   self)
protected

Definition at line 59 of file WorkflowRunner.py.

59 def _check_cycle(self):
60 pos = dict()
61 index = 0
62 workflow_num = len(self.workflow_sequence)
63 # number the order
64 for seq_idx in range(workflow_num):
65 pos[self.workflow_sequence[seq_idx]] = index
66 index += 1
67
68 for seq_idx in range(workflow_num):
69 first_wf = self.workflow_sequence[seq_idx]
70 for adj_wf in self.adj[first_wf]:
71 first_pos = 0 if first_wf not in pos else pos[first_wf]
72 second_pos = 0 if adj_wf not in pos else pos[adj_wf]
73 if (first_pos > second_pos):
74 raise RuntimeError("Workflows should not have a cycle")
75

References WorkflowRunner.WorkflowRunner.adj, and WorkflowRunner.WorkflowRunner.workflow_sequence.

◆ _verify_workflow()

WorkflowRunner.WorkflowRunner._verify_workflow (   self,
  json_contents 
)
protected

Definition at line 76 of file WorkflowRunner.py.

76 def _verify_workflow(self, json_contents):
77 # workflow file should have WORKFLOWS_K
78 if not self.WORKFLOWS_K in json_contents:
79 raise ValueError("Not found \"" + self.WORKFLOWS_K +
80 "\" key in workflow file")
81
82 workflows = json_contents[self.WORKFLOWS_K]
83 # workflow file should have keys listed in WORKFLOWS_K
84 for workflow_k in workflows:
85 if not workflow_k in json_contents:
86 raise ValueError("Not found " + workflow_k + " key listed in \"" +
87 self.WORKFLOWS_K + "\"")
88
89 # each workflow should have either WORKFLOW_STEPS_K or CFG_REFERENCE_K
90 for workflow_k in workflows:
91 if not self.WORKFLOW_STEPS_K in json_contents[
92 workflow_k] and not self.CFG_REFERENCE_K in json_contents[workflow_k]:
93 raise ValueError("Each workflow should have either \"" +
94 self.WORKFLOW_STEPS_K + "\" or \"" +
95 self.CFG_REFERENCE_K + "\"")
96 for workflow_k in workflows:
97 if self.WORKFLOW_STEPS_K in json_contents[
98 workflow_k] and self.CFG_REFERENCE_K in json_contents[workflow_k]:
99 raise ValueError("\"" + self.WORKFLOW_STEPS_K + "\" and \"" +
100 self.CFG_REFERENCE_K + "\" are exclusive key")
101
102 # each step should have ONE_CMD_TOOL_K and COMMANDS_K
103 for workflow_k in workflows:
104 workflow = json_contents[workflow_k]
105 if self.WORKFLOW_STEPS_K in workflow:
106 step_keys = workflow[self.WORKFLOW_STEPS_K]
107 for step_k in step_keys:
108 step = workflow[step_k]
109 if not self.ONE_CMD_TOOL_K in step or not self.COMMANDS_K in step:
110 raise ValueError("Each step should have \"" +
111 self.ONE_CMD_TOOL_K + "\"" + " and \"" +
112 self.COMMANDS_K + "\"")
113

References WorkflowRunner.WorkflowRunner.CFG_REFERENCE_K, WorkflowRunner.WorkflowRunner.COMMANDS_K, WorkflowRunner.WorkflowRunner.ONE_CMD_TOOL_K, WorkflowRunner.WorkflowRunner.WORKFLOW_STEPS_K, and WorkflowRunner.WorkflowRunner.WORKFLOWS_K.

◆ run()

WorkflowRunner.WorkflowRunner.run (   self,
  working_dir,
  verbose = False 
)

Definition at line 114 of file WorkflowRunner.py.

114 def run(self, working_dir, verbose=False):
115 # run workflows in sequence
116 for workflow_k in self.workflow_sequence:
117 workflow = self.json_contents[workflow_k]
118 if self.WORKFLOW_STEPS_K in workflow:
119 steps = workflow[self.WORKFLOW_STEPS_K]
120 for step_k in steps:
121 step = workflow[step_k]
122 commands = step[self.COMMANDS_K]
123 driver_name = step[self.ONE_CMD_TOOL_K]
124 option_builder = OptionBuilder(driver_name)
125 options = option_builder.build(commands)
126 # get the absolute path of the caller
127 driver_path = os.path.join(working_dir, driver_name)
128 cmd = [driver_path] + options
129 oneutils.run(cmd)
130 elif self.CFG_REFERENCE_K in workflow:
131 cfg_path = workflow[self.CFG_REFERENCE_K]['path']
132 runner = CfgRunner(cfg_path)
133 runner.run(working_dir, verbose)
void run(std::ofstream &os, const circle::Model *model)

References WorkflowRunner.WorkflowRunner.CFG_REFERENCE_K, WorkflowRunner.WorkflowRunner.COMMANDS_K, WorkflowRunner.WorkflowRunner.json_contents, WorkflowRunner.WorkflowRunner.ONE_CMD_TOOL_K, WorkflowRunner.WorkflowRunner.workflow_sequence, and WorkflowRunner.WorkflowRunner.WORKFLOW_STEPS_K.

Field Documentation

◆ adj

WorkflowRunner.WorkflowRunner.adj

Definition at line 46 of file WorkflowRunner.py.

Referenced by WorkflowRunner.WorkflowRunner._check_cycle().

◆ CFG_REFERENCE_K

str WorkflowRunner.WorkflowRunner.CFG_REFERENCE_K = 'cfg-reference'
static

◆ COMMANDS_K

str WorkflowRunner.WorkflowRunner.COMMANDS_K = 'commands'
static

◆ DEPENDENCIES_K

str WorkflowRunner.WorkflowRunner.DEPENDENCIES_K = 'run-after'
static

Definition at line 28 of file WorkflowRunner.py.

◆ json_contents

WorkflowRunner.WorkflowRunner.json_contents

Definition at line 37 of file WorkflowRunner.py.

Referenced by WorkflowRunner.WorkflowRunner.run().

◆ ONE_CMD_TOOL_K

str WorkflowRunner.WorkflowRunner.ONE_CMD_TOOL_K = 'one-cmd'
static

◆ workflow_sequence

WorkflowRunner.WorkflowRunner.workflow_sequence

◆ WORKFLOW_STEPS_K

str WorkflowRunner.WorkflowRunner.WORKFLOW_STEPS_K = 'steps'
static

◆ WORKFLOWS_K

str WorkflowRunner.WorkflowRunner.WORKFLOWS_K = 'workflows'
static

Definition at line 27 of file WorkflowRunner.py.

Referenced by WorkflowRunner.WorkflowRunner._verify_workflow().


The documentation for this class was generated from the following file: