repype.pipeline

class repype.pipeline.Pipeline(stages: Iterable[Stage] = [], scopes: Dict[str, Path] = {})

Bases: object

Defines a processing pipeline.

This class defines a processing pipeline that consists of multiple stages. Each stage performs a specific operation on the fields of the pipeline (i.e. the input data, or the data computed by a previous stage). The pipeline processes the input data by executing the stage.process() method of each stage successively.

In addition, the pipeline can be configured to use different scopes for resolving file paths using the resolve() method (e.g., logs, debug information, or results to be written).

append(stage: Stage, after: str | int | None = None) int

Adds a stage to the pipeline.

By default, the stage is appended to the end of the pipeline. If after is given, the stage is instead inserted after the stage with the given ID or index.

Returns:

The index of the added stage.

configure(base_config: Config, input_id: InputID, *args, **kwargs) Config

Automatically adopts hyperparameters by applying linear adoptation rules.

The hyperparameters are configured by the create_config_entry() function, and the arguments (rules) are determined by calling stage.configure() for each stage.

Parameters:
  • base_config – The base hyperparameters to be used (not modified).

  • input_id – The identifier of the input data to adopt the hyperparameters for.

  • *args – Sequential arguments passed to stage.configure().

  • **kwargs – Keyword arguments passed to stage.configure().

Returns:

The adopted hyperparameters.

property fields: FrozenSet[str]

List all fields that are produced by the pipeline.

find(stage_id: str, not_found_dummy: Any = inf) Stage

Returns the position of the stage identified by stage_id.

Returns not_found_dummy if the stage is not found.

get_extra_stages(first_stage: str | None, last_stage: str | None, available_inputs: Iterable[str]) List[str]

Returns the stages that are required to be executed in addition, in order to process the pipeline from first_stage to last_stage.

Parameters:
  • first_stage – The ID of the first stage to be executed (or None to start with the first).

  • last_stage – The ID of the last stage to be executed (or None to end with the last).

  • available_inputs – The stage inputs (pipeline fields) that are already available (e.g., from previous computations).

Returns:

The IDs of the stages that are required to be executed in addition, in order to process the pipeline from first_stage to last_stage.

property persistent_fields: FrozenSet[str]

List all fields that are produced by the pipeline, minus those which are consumed.

process(input_id: InputID | None, config: Config, first_stage: str | None = None, last_stage: str | None = None, data: Dict[str, Any] | None = None, status: Status | None = None, **kwargs) Tuple[Dict[str, Any], Config, Dict[str, float]]

Processes the input data identified by the input_id using this pipeline.

The stage.process() method of each stage of the pipeline is executed successively.

Parameters:
  • input_id – The identifier of the input data to be processed. Can be None if and only if data is not None (then the input_id is deduced from data).

  • config – The hyperparameters to be used.

  • first_stage – The ID of the first stage to run (defaults to the first). Earlier stages may still be required to run due to pipeline fields consumed by stages, marginal fields, or if data is None.

  • last_stage – The ID of the last stage to run (defaults to the last).

  • data – The pipeline data object from previous processing.

  • status – A status object to report the progress of the computations.

Returns:

Tuple (data, config, times), where data is the pipeline data object comprising all final and intermediate results, config are the finally used hyperparameters, and times is a dictionary with the run times of each individual pipeline stage (in seconds).

Raises:

StageError – If an error occurs during the run of a pipeline stage.

resolve(scope: str, input_id: InputID | None = None) Path | None

Resolves the path of a file based on the given scope and input identifier.

Returns None if the input_id is None, or the scope is not defined.

scopes: Dict[str, Path]

The scopes used to resolve file paths.

stage(stage_id: str) Stage | None

Returns the stage identified by stage_id, or None if there is none.

stages: List[Stage]

The stages of the pipeline.

class repype.pipeline.ProcessingControl(first_stage: str | None = None, last_stage: str | None = None)

Bases: object

Class used to control the processing of stages in a pipeline.

This class keeps track of the first and last stages of a pipeline, and determines whether a given stage should be processed based on its position in the pipeline.

Parameters:
  • first_stage – The first stage of the pipeline. Processing starts from this stage. If None, processing starts from the beginning.

  • last_stage – The last stage of the pipeline. Processing ends after this stage. If None, processing goes until the end.

first_stage: str | None

The first stage of the pipeline. Processing starts from this stage. If None, processing starts from the beginning.

last_stage: str | None

The last stage of the pipeline. Processing ends after this stage. If None, processing goes until the end.

started: bool

Indicates whether processing has started (and not ended yet).

step(stage: str) bool

Determines whether the given stage should be processed.

If the stage is the first stage of the pipeline, processing starts. If the stage is the last stage of the pipeline, processing ends after this stage. The attribute started is updated accordingly.

Parameters:

stage – The stage to check.

Returns:

True if the stage should be processed, False otherwise.

exception repype.pipeline.StageError(stage: Stage)

Bases: Exception

An error raised when a stage fails to execute.

stage: Stage

The stage that failed to execute.

repype.pipeline.create_config_entry(config: Config, key: str, factor: Real, default_user_factor: Real, type: Type[Real] | None = None, min: Real | None = None, max: Real | None = None) None

Creates the hyperparameter key in the config if it does not exist yet.

The value of the hyperparameter is set to factor times the value of the hyperparameter AF_key, where AF_key is the hyperparameter with the same key but prefixed with AF_.

In addition, the value of the hyperparameter is updated according to the following rules:

  • If type is not None, the value is converted to the specified type.

  • If min is not None, the value is set to the maximum of the value and min.

  • If max is not None, the value is set to the minimum of the value and max.

See also

This function is used by the Pipeline.configure() method to automatically configure the hyperparameters of the pipeline.

repype.pipeline.create_pipeline(stages: ~typing.Sequence[~repype.stage.Stage], *args, pipeline_cls: ~typing.Type[~repype.pipeline.Pipeline] = <class 'repype.pipeline.Pipeline'>, **kwargs) Pipeline

Creates and returns a new pipeline configured for the given stages.

Parameters:
  • stages – The stages of the pipeline, the order is determined automatically.

  • pipeline_cls – The class to be used for the pipeline.

Returns:

Object of the pipeline_cls class.