repype.pipeline
- class repype.pipeline.Pipeline(stages: Iterable[Stage] = [], scopes: Dict[str, Path] = {})
Bases:
objectDefines a processing pipeline.
This class defines a processing pipeline that consists of multiple stages. Each stage performs a specific operation on the fields of the pipeline (i.e. the input data, or the data computed by a previous stage). The pipeline processes the input data by executing the
stage.process()method of each stage successively.In addition, the pipeline can be configured to use different scopes for resolving file paths using the
resolve()method (e.g., logs, debug information, or results to be written).- append(stage: Stage, after: str | int | None = None) int
Adds a stage to the pipeline.
By default, the stage is appended to the end of the pipeline. If after is given, the stage is instead inserted after the stage with the given ID or index.
- Returns:
The index of the added stage.
- configure(base_config: Config, input_id: InputID, *args, **kwargs) Config
Automatically adopts hyperparameters by applying linear adoptation rules.
The hyperparameters are configured by the
create_config_entry()function, and the arguments (rules) are determined by callingstage.configure()for each stage.- Parameters:
base_config – The base hyperparameters to be used (not modified).
input_id – The identifier of the input data to adopt the hyperparameters for.
*args – Sequential arguments passed to
stage.configure().**kwargs – Keyword arguments passed to
stage.configure().
- Returns:
The adopted hyperparameters.
- find(stage_id: str, not_found_dummy: Any = inf) Stage
Returns the position of the stage identified by stage_id.
Returns not_found_dummy if the stage is not found.
- get_extra_stages(first_stage: str | None, last_stage: str | None, available_inputs: Iterable[str]) List[str]
Returns the stages that are required to be executed in addition, in order to process the pipeline from first_stage to last_stage.
- Parameters:
first_stage – The ID of the first stage to be executed (or None to start with the first).
last_stage – The ID of the last stage to be executed (or None to end with the last).
available_inputs – The stage inputs (pipeline fields) that are already available (e.g., from previous computations).
- Returns:
The IDs of the stages that are required to be executed in addition, in order to process the pipeline from first_stage to last_stage.
- property persistent_fields: FrozenSet[str]
List all fields that are produced by the pipeline, minus those which are consumed.
- process(input_id: InputID | None, config: Config, first_stage: str | None = None, last_stage: str | None = None, data: Dict[str, Any] | None = None, status: Status | None = None, **kwargs) Tuple[Dict[str, Any], Config, Dict[str, float]]
Processes the input data identified by the input_id using this pipeline.
The
stage.process()method of each stage of the pipeline is executed successively.- Parameters:
input_id – The identifier of the input data to be processed. Can be None if and only if data is not None (then the input_id is deduced from data).
config – The hyperparameters to be used.
first_stage – The ID of the first stage to run (defaults to the first). Earlier stages may still be required to run due to pipeline fields consumed by stages, marginal fields, or if data is None.
last_stage – The ID of the last stage to run (defaults to the last).
data – The pipeline data object from previous processing.
status – A status object to report the progress of the computations.
- Returns:
Tuple (data, config, times), where data is the pipeline data object comprising all final and intermediate results, config are the finally used hyperparameters, and times is a dictionary with the run times of each individual pipeline stage (in seconds).
- Raises:
StageError – If an error occurs during the run of a pipeline stage.
- resolve(scope: str, input_id: InputID | None = None) Path | None
Resolves the path of a file based on the given scope and input identifier.
Returns None if the input_id is None, or the scope is not defined.
- class repype.pipeline.ProcessingControl(first_stage: str | None = None, last_stage: str | None = None)
Bases:
objectClass used to control the processing of stages in a pipeline.
This class keeps track of the first and last stages of a pipeline, and determines whether a given stage should be processed based on its position in the pipeline.
- Parameters:
first_stage – The first stage of the pipeline. Processing starts from this stage. If None, processing starts from the beginning.
last_stage – The last stage of the pipeline. Processing ends after this stage. If None, processing goes until the end.
- first_stage: str | None
The first stage of the pipeline. Processing starts from this stage. If None, processing starts from the beginning.
- last_stage: str | None
The last stage of the pipeline. Processing ends after this stage. If None, processing goes until the end.
- step(stage: str) bool
Determines whether the given stage should be processed.
If the stage is the first stage of the pipeline, processing starts. If the stage is the last stage of the pipeline, processing ends after this stage. The attribute
startedis updated accordingly.- Parameters:
stage – The stage to check.
- Returns:
True if the stage should be processed, False otherwise.
- exception repype.pipeline.StageError(stage: Stage)
Bases:
ExceptionAn error raised when a stage fails to execute.
- repype.pipeline.create_config_entry(config: Config, key: str, factor: Real, default_user_factor: Real, type: Type[Real] | None = None, min: Real | None = None, max: Real | None = None) None
Creates the hyperparameter key in the config if it does not exist yet.
The value of the hyperparameter is set to factor times the value of the hyperparameter
AF_key, whereAF_keyis the hyperparameter with the same key but prefixed withAF_.In addition, the value of the hyperparameter is updated according to the following rules:
If type is not None, the value is converted to the specified type.
If min is not None, the value is set to the maximum of the value and min.
If max is not None, the value is set to the minimum of the value and max.
See also
This function is used by the
Pipeline.configure()method to automatically configure the hyperparameters of the pipeline.
- repype.pipeline.create_pipeline(stages: ~typing.Sequence[~repype.stage.Stage], *args, pipeline_cls: ~typing.Type[~repype.pipeline.Pipeline] = <class 'repype.pipeline.Pipeline'>, **kwargs) Pipeline
Creates and returns a new pipeline configured for the given stages.
- Parameters:
stages – The stages of the pipeline, the order is determined automatically.
pipeline_cls – The class to be used for the pipeline.
- Returns:
Object of the pipeline_cls class.