repype.stage

class repype.stage.Stage

Bases: object

A pipeline stage.

Each stage can be controlled by a separate set of hyperparameters. Those hyperparameters reside in namespaces, which are uniquely associated with the corresponding pipeline stages.

Each stage must declare the pipeline fields it requires as input, and the output fields it produces. These are used by repype.pipeline.create_pipeline() function to automatically determine the stage order and by the repype.pipeline.Pipeline.get_extra_stages() method to determine the stages that are required to be executed additionally. The field input_id is provided by the pipeline itself via the repype.pipeline.Pipeline.process() method.

Parameters:
  • id – The stage identifier, used as the hyperparameter namespace. Defaults to the result of the suggest_stage_id() function.

  • inputs – List of fields read by this stage.

  • consumes – List of fields consumed by this stage (read and cannot be used by subsequent stages).

  • outputs – List of fields produced by this stage.

__str__() str

Get a brief string representation of the stage (this is the stage identifier).

add_callback(event: Literal['start', 'end', 'skip', 'after'], callback: StageCallback) None

Add a callback for the specified event.

callback(event: Literal['start', 'end', 'skip', 'after'], **kwargs) None

Call the callbacks for the specified event.

Parameters:
  • event – The event for which to call the callbacks.

  • **kwargs – The keyword arguments to pass to the callbacks.

configure(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, *args, **kwargs) dict

Returns the rules to adopt hyperparameters based on the input data.

Sometimes it can be necessary to automatically adopt hyperparameters based on the input data. For those cases where linear adoptation is suitable, this method can be overridden to return the rules which specify how to adopt the hyperparameters. The rules are then applied by the repype.pipeline.Pipeline.configure() method.

The rules must be specified by the following structure:

{
    'key': [
        factor,
        default_user_factor,
    ],
}

The rules are resolved by mapping the above structure to the arguments of the repype.pipeline.create_config_entry() function. In this example, two new hyperparameters are created:

  1. The hyperparameter AF_key is created and defaults to the value of default_user_factor.

  2. The hyperparameter key is created and defaults to the value of the hyperparameter AF_key times the value of factor.

In addition, a third element can be added to the list to further constrain the resulting values:

{
    'key': [
        factor,
        default_user_factor,
        {
            type: 'float',
            min: 0.0,
            max: 1.0,
        },
    ],
}
Parameters:
  • pipeline – The pipeline object that this stage is a part of.

  • input_id – The identifier of the input data to adopt the hyperparameters for.

  • *args – Sequential arguments passed to Pipeline.configure.

  • **kwargs – Keyword arguments passed to Pipeline.configure.

consumes: Collection[str] = []

List of fields consumed by this stage (read and cannot be used by subsequent stages).

enabled_by_default: bool = True

Whether the stage is enabled by default.

The default value can be overridden by the enabled hyperparameter of the stage.

id: str

The stage identifier.

inputs: Collection[str] = []

List of fields read by this stage.

outputs: Collection[str] = []

List of fields produced by this stage.

process(pipeline: ~repype.typing...pipeline.Pipeline, config: ~repype.config.Config, status: ~repype.status.Status | None = None, **inputs) Dict[str, Any]

Processes the input fields of this stage of the pipeline.

This method implements a stage of the pipeline with the provided inputs and configuration parameters. It then returns the outputs produced by the stage.

Parameters:
  • pipeline – The pipeline object that this stage is a part of.

  • config – The hyperparameters to be used for this stage.

  • status – A status object to report the progress of the computations.

  • **inputs – The fields of the pipeline read by this stage. Each key-value pair represents an input field and the corresponding value.

Returns:

A dictionary containing the outputs of this stage. Each key-value pair in the dictionary represents an output field and the corresponding value.

Raises:

NotImplementedError – If the method is not implemented by the subclass.

remove_callback(event: Literal['start', 'end', 'skip', 'after'], callback: StageCallback) None

Remove a callback for the specified event.

run(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, data: ~typing.Dict[str, ~typing.Any], config: ~repype.config.Config, status: ~repype.status.Status | None = None, **kwargs) float

Run this stage of the pipeline by calling process(), if the stage is enabled.

The stage is enabled if the enabled hyperparameter is set to True, or the enabled hyperparameter is not set and enabled_by_default is True.

Parameters:
  • pipeline – The pipeline object that this stage is a part of.

  • input_id – The identifier of the input data to be processed.

  • data – The pipeline data object to be used for this stage. This is a dictionary that contains all available fields of the pipeline. The output fields of this stage are added to this dictionary.

  • config – The hyperparameters to be used for this stage.

  • status – A status object to report the progress of the computations.

Returns:

The duration of the stage run in seconds, if the stage is enabled, and 0 otherwise.

property sha: str

Get an SHA-1 hash which represents the implementation of this stage.

The restrictions of the signature property apply.

property signature: str

Get a serializable representation of the implementation of the stage.

The signature contains the attributes and the methods of the stage. Methods are represented by their bytecode. Further callables beyond the direct methods of the object are not respected. If any of those changes, incrementing a signature_bump attribute should be considered.

skip(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, data: ~typing.Dict[str, ~typing.Any], config: ~repype.config.Config, status: ~repype.status.Status | None = None, **kwargs) None

Skips this stage of the pipeline.

Parameters:
  • pipeline – The pipeline object that this stage is a part of.

  • input_id – The identifier of the input data to be processed.

  • data – The pipeline data object. This is a dictionary that contains all available fields of the pipeline.

  • config – The hyperparameters for this stage.

  • status – A status object to report the progress of the computations.

class repype.stage.StageCallback(*args, **kwargs)

Bases: Protocol

Stage callback protocol.

__call__(stage: Stage, event: Literal['start', 'end', 'skip', 'after'], pipeline: Pipeline, input_id: InputID, data: Dict[str, Any], config: Config, status: Status | None, **kwargs) None
Parameters:
  • stage – The stage that triggered the event.

  • event – The event that triggered the callback.

  • pipeline – The pipeline object that the stage is a part of.

  • data – The current pipeline data object.

  • config – The hyperparameters to be used for this stage.

  • status – A status object to report the progress of the computations.

  • **kwargs – The keyword arguments passed to repype.pipeline.Pipeline.process().

repype.stage.suggest_stage_id(class_name: str) str

Suggests a stage identifier based on a class name.

This function validates the class_name, then tokenizes it. Tokens are grouped if they are consecutive and alphanumeric, but do not start with numbers. The function then converts the tokens to lowercase, removes underscores, and joins them with hyphens.

Example

>>> from repype.stage import suggest_stage_id
>>> print(suggest_stage_id('TheGreatMapperStage'))
the-great-mapper
>>> print(suggest_stage_id('TheGreat123PCMapper'))
the-great-123-pc-mapper
Parameters:

class_name – The name of the class to suggest a configuration namespace for.

Returns:

A string of hyphen-separated tokens from the class name.

Raises:

AssertionError – If the class name is not valid.