Pipeline Stages¶
Overview¶
A PipelineStage implements a single calculation step within a wider pipeline. Each different type of analysis stge is represented by a subclass of PipelineStage.
The base class handles the connection between different pipeline stages, and the execution of the stages within a workflow system (parsl or cwl), potentially in parallel (MPI).
- The subclasses must:
- define their name
- define their inputs and outputs
- provide a “run” method which does the actual execution of the pipeline step.
They must use base class methods within the run method to find their input and output file paths. They can optionally use further methods in this class to open and prepare those files too.
Inputs/Outputs and Tags¶
The I/O system for Ceci uses the concept of “tags”. A tag is a string which corresponds to a single input or output file. Using it allows us to easily connect together pipeline stages by matching output tags from earlier stages to input tags for later ones. Tags must be unique across a pipeline.
Configuration Parameters¶
Every stage has an additional implicit input: a configuration file in YAML format.
Stage classes can define a dictionary as an instance variable listing what variables it needs in that configuration file and their types, or give default values in case they are not found in the parameter file.
Here is an example:
class MyStage:
...
config_options = {
'T_cut':float,
's2n_cut':float,
'delta_gamma': float,
'max_rows':0,
'chunk_rows':10000,
'zbin_edges':[float]
}
Some parameters like T_cut
have been given just a python type, indicating that there
is no default value for them and the use should specify a value of type “float” in the
parameter file. Others like max_rows
have a default value that will be used if the
parameter is not otherwise specified.
The parameter file will automatically be read and the results put into a dictionary that
the stage can access via self.config
, for example: cut = self.config['T_cut']
.
More complicated parameter types such as dictionaries can also be used, but they cannot currently be
specified in the config_option
dictionary and so the system will not automatically check
for their presence in the parameter file - you will have to do that yourself.
Parameters can also be overridden when running a stage on its own on the command line,
(see “Execution”, below) by using them as flag: --T_cut=0.4
Pipeline Methods¶
The full set of pipeline methods is documented below. Of particular note are the methods described here, which are designed to be used by subclasses.
Return the path to input or output files:
self.get_input(tag)
self.get_output(tag)
Get the base class to find and open an input or output file for you, optionally returning a wrapper class instead of the file:
self.open_input(tag, wrapper=False, **kwargs)
self.open_output(tag, wrapper=False, **kwargs)
Look for a section in a yaml input file tagged “config” and read it. If the config_options class variable exists in the class then it checks those options are set or uses any supplied defaults.
self.get_config()
MPI attributes for parallelization.
self.rank
self.size
self.comm
If the code is not being run in parallel, comm will be None, rank will be 0, and size will be 1.
IO tools - reading data in chunks, splitting up according to MPI rank, if used
self.iterate_fits(tag, hdunum, cols, chunk_rows)
self.iterate_hdf(tag, group_name, cols, chunk_rows)
Execution¶
Pipeline stages can be automatically run as part of a pipeline, or manually run on the command line, using the syntax:
python </path/to/pipeline_implementation.py> <StageName> --<input_name1>=</path/to/input1.dat>
--<input_name2>=</path/to/input2.dat> --<output_name1>=</path/to/output1.dat>
API¶
The complete pipeline stage API is below - stages not described above are mostly used internally by the pipeline system.