repype.stage
- class repype.stage.Stage
Bases:
objectA pipeline stage.
Each stage can be controlled by a separate set of hyperparameters. Those hyperparameters reside in namespaces, which are uniquely associated with the corresponding pipeline stages.
Each stage must declare the pipeline fields it requires as input, and the output fields it produces. These are used by
repype.pipeline.create_pipeline()function to automatically determine the stage order and by therepype.pipeline.Pipeline.get_extra_stages()method to determine the stages that are required to be executed additionally. The fieldinput_idis provided by the pipeline itself via therepype.pipeline.Pipeline.process()method.- Parameters:
id – The stage identifier, used as the hyperparameter namespace. Defaults to the result of the
suggest_stage_id()function.inputs – List of fields read by this stage.
consumes – List of fields consumed by this stage (read and cannot be used by subsequent stages).
outputs – List of fields produced by this stage.
- add_callback(event: Literal['start', 'end', 'skip', 'after'], callback: StageCallback) None
Add a callback for the specified event.
- callback(event: Literal['start', 'end', 'skip', 'after'], **kwargs) None
Call the callbacks for the specified event.
- Parameters:
event – The event for which to call the callbacks.
**kwargs – The keyword arguments to pass to the callbacks.
- configure(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, *args, **kwargs) dict
Returns the rules to adopt hyperparameters based on the input data.
Sometimes it can be necessary to automatically adopt hyperparameters based on the input data. For those cases where linear adoptation is suitable, this method can be overridden to return the rules which specify how to adopt the hyperparameters. The rules are then applied by the
repype.pipeline.Pipeline.configure()method.The rules must be specified by the following structure:
{ 'key': [ factor, default_user_factor, ], }
The rules are resolved by mapping the above structure to the arguments of the
repype.pipeline.create_config_entry()function. In this example, two new hyperparameters are created:The hyperparameter
AF_keyis created and defaults to the value ofdefault_user_factor.The hyperparameter
keyis created and defaults to the value of the hyperparameterAF_keytimes the value offactor.
In addition, a third element can be added to the list to further constrain the resulting values:
{ 'key': [ factor, default_user_factor, { type: 'float', min: 0.0, max: 1.0, }, ], }
- Parameters:
pipeline – The pipeline object that this stage is a part of.
input_id – The identifier of the input data to adopt the hyperparameters for.
*args – Sequential arguments passed to
Pipeline.configure.**kwargs – Keyword arguments passed to
Pipeline.configure.
- consumes: Collection[str] = []
List of fields consumed by this stage (read and cannot be used by subsequent stages).
- enabled_by_default: bool = True
Whether the stage is enabled by default.
The default value can be overridden by the
enabledhyperparameter of the stage.
- inputs: Collection[str] = []
List of fields read by this stage.
- outputs: Collection[str] = []
List of fields produced by this stage.
- process(pipeline: ~repype.typing...pipeline.Pipeline, config: ~repype.config.Config, status: ~repype.status.Status | None = None, **inputs) Dict[str, Any]
Processes the input fields of this stage of the pipeline.
This method implements a stage of the pipeline with the provided inputs and configuration parameters. It then returns the outputs produced by the stage.
- Parameters:
pipeline – The pipeline object that this stage is a part of.
config – The hyperparameters to be used for this stage.
status – A status object to report the progress of the computations.
**inputs – The fields of the pipeline read by this stage. Each key-value pair represents an input field and the corresponding value.
- Returns:
A dictionary containing the outputs of this stage. Each key-value pair in the dictionary represents an output field and the corresponding value.
- Raises:
NotImplementedError – If the method is not implemented by the subclass.
- remove_callback(event: Literal['start', 'end', 'skip', 'after'], callback: StageCallback) None
Remove a callback for the specified event.
- run(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, data: ~typing.Dict[str, ~typing.Any], config: ~repype.config.Config, status: ~repype.status.Status | None = None, **kwargs) float
Run this stage of the pipeline by calling
process(), if the stage is enabled.The stage is enabled if the
enabledhyperparameter is set to True, or theenabledhyperparameter is not set andenabled_by_defaultis True.- Parameters:
pipeline – The pipeline object that this stage is a part of.
input_id – The identifier of the input data to be processed.
data – The pipeline data object to be used for this stage. This is a dictionary that contains all available fields of the pipeline. The output fields of this stage are added to this dictionary.
config – The hyperparameters to be used for this stage.
status – A status object to report the progress of the computations.
- Returns:
The duration of the stage run in seconds, if the stage is enabled, and 0 otherwise.
- property sha: str
Get an SHA-1 hash which represents the implementation of this stage.
The restrictions of the
signatureproperty apply.
- property signature: str
Get a serializable representation of the implementation of the stage.
The signature contains the attributes and the methods of the stage. Methods are represented by their bytecode. Further callables beyond the direct methods of the object are not respected. If any of those changes, incrementing a signature_bump attribute should be considered.
- skip(pipeline: ~repype.typing...pipeline.Pipeline, input_id: ~repype.typing.InputID, data: ~typing.Dict[str, ~typing.Any], config: ~repype.config.Config, status: ~repype.status.Status | None = None, **kwargs) None
Skips this stage of the pipeline.
- Parameters:
pipeline – The pipeline object that this stage is a part of.
input_id – The identifier of the input data to be processed.
data – The pipeline data object. This is a dictionary that contains all available fields of the pipeline.
config – The hyperparameters for this stage.
status – A status object to report the progress of the computations.
- class repype.stage.StageCallback(*args, **kwargs)
Bases:
ProtocolStage callback protocol.
- __call__(stage: Stage, event: Literal['start', 'end', 'skip', 'after'], pipeline: Pipeline, input_id: InputID, data: Dict[str, Any], config: Config, status: Status | None, **kwargs) None
- Parameters:
stage – The stage that triggered the event.
event – The event that triggered the callback.
pipeline – The pipeline object that the stage is a part of.
data – The current pipeline data object.
config – The hyperparameters to be used for this stage.
status – A status object to report the progress of the computations.
**kwargs – The keyword arguments passed to
repype.pipeline.Pipeline.process().
- repype.stage.suggest_stage_id(class_name: str) str
Suggests a stage identifier based on a class name.
This function validates the class_name, then tokenizes it. Tokens are grouped if they are consecutive and alphanumeric, but do not start with numbers. The function then converts the tokens to lowercase, removes underscores, and joins them with hyphens.
Example
>>> from repype.stage import suggest_stage_id >>> print(suggest_stage_id('TheGreatMapperStage')) the-great-mapper >>> print(suggest_stage_id('TheGreat123PCMapper')) the-great-123-pc-mapper
- Parameters:
class_name – The name of the class to suggest a configuration namespace for.
- Returns:
A string of hyphen-separated tokens from the class name.
- Raises:
AssertionError – If the class name is not valid.