repype.task
- repype.task.PendingReason
Reasons why a task is pending, or not pending at all.
alias of
Literal[‘incomplete’, ‘pipeline’, ‘specification’, ‘’]
- class repype.task.Task(path: PathLike, spec: dict, parent: Self | None = None)
Bases:
objectTask in batch processing.
Each task can have a parent task, so each task is part of a task tree.
- Parameters:
path – The path to the task directory.
spec – The task specification.
parent – The parent task of this task.
- compute_sha(config: Config | None = None) str
Compute the SHA-1 hash of the full task specification adopted for the config.
- create_config() Config
Creates object which represents the hyperparameters of this task.
The hyperparameters are combined from three sources, in the following order, where the later sources take precedence:
The hyperparameters of the parent task.
The
base_configfile specified in the task specification (if any).The
configsection of the task specification (if any).
The hyperparameters can be adopted by making changes to the returned object before running the task, but the changes are not reflected in the task specification.
- create_pipeline(*args, **kwargs) Pipeline
Instantiates and returns the pipeline from the task specification.
Can be overridden in subclasses to create custom pipelines.
- property digest: Mapping[str, Any] | None
Immutable full specification of the task completion (or None).
- property digest_sha_filepath: Path
The path to the hash values of the task completion.
This contains the SHA-1 hashes of the full task specification adopted for the configuration which was used to complete the task, and the hashes of the stages of the pipeline.
- property digest_task_filepath: Path
The path to the full task specification of the task completion.
This is the full task specification adopted for the configuration which was used to complete the task.
- find_first_diverging_stage(pipeline: Pipeline, config: Config) Stage | None
Find the first diverging stage of the task.
Stages are considered diverging if they are new, or if their implementation or hyperparameters have changed. Changes of the implementation or hyperparameters of a stage are detected by comparing the SHA-1 hashes of the
stage.signatureand hyperparameters of the stage.- Parameters:
pipeline – The pipeline object.
config – The hyperparameters.
- Returns:
The first diverging stage of the task, or None if there is no diverging stage.
- find_pickup_task(pipeline: Pipeline, config: Config) Dict[str, Self | Stage]
Find a previosly completed task to pick up computations from.
Returns a dictionary with the following keys:
task: The task to pick up from, or None if there is no task to pick up from.first_diverging_stage: The first stage of the pipeline which needs to run, or None if no further computations are required.
- Parameters:
pipeline – The pipeline object.
config – The hyperparameters.
- property full_spec: Dict[str, Any]
The full specification of the task, including the parent task specifications.
- get_full_spec_with_config(config: Config) Dict[str, Any]
Get the full specification of the task adopted for the config.
- get_marginal_fields(pipeline: Pipeline) FrozenSet[str]
Get the marginal fields from a pipeline.
The marginal fields are all outputs produced by marginal stages. Marginal stages are those stages which are listed in the
marginal_stagesproperty. Marginal fields are removed from the pipeline data objects before merging it into the task data object, and when storing the results of the task.- Parameters:
pipeline – The pipeline object.
- Returns:
Set of marginal fields.
- property input_ids
The identifiers of the inputs of the task.
- is_pending(pipeline: Pipeline, config: Config) Literal['incomplete', 'pipeline', 'specification', '']
Tells whether the task needs to run, or whether the task is completed or not runnable.
- Returns:
The task needs to run because it is not completed. - “pipeline”: The task needs to run because the pipeline has changed. - “specification”: The task needs to run because the task specification or hyperparameters have changed. - “”: The task is completed and does not need to run.
- Return type:
“incomplete”
- load(pipeline: Pipeline | None = None) Dict[InputID, Dict[str, Any]]
Load the previously computed task data object.
To ensure consistency with the task specification, it is verified that the loaded data contains results for all input identifiers of the task, and no spurious identifiers. If the pipeline is not None, a check for consistency of the data with the pipeline is also performed. The loaded task data object is consistent with the pipeline if the data contains all fields which are not marginal according to the
get_marginal_fields()method, and no additional fields.- Parameters:
pipeline – The pipeline object.
- Returns:
The previously stored task data object.
- property marginal_stages: Iterator[str]
The stages which are considered marginal.
Outputs of marginal stages are removed from the pipeline data objects when storing the results of the task. The default implementation reads the list of marginal stages from the
marginal_stagesfield in the task specification.- Yields:
Stage identifiers corresponding to the marginal stages.
- property parents: Iterator[Self]
Generator which yields all parent tasks of this task, starting with the immediate parent.
- reset()
Reset the task by removing all stored data.
- resolve_path(path: PathLike | None) Path | None
Resolves a path relatively to the task directory.
In addition, following placeholders are replaced:
{DIRNAME}: The name of the task directory.{ROOTDIR}: The root directory of the task tree.
- run(config: Config, pipeline: Pipeline | None = None, pickup: bool = True, strip_marginals: bool = True, status: Status | None = None) Dict[InputID, Dict[str, Any]]
Run the task.
- Parameters:
config – The hyperparameters to run the task with.
pipeline – The pipeline to run the task with. Defaults to
create_pipeline().pickup – If True, pick up computations from a previously completed task.
strip_marginals – If True, strip the marginal fields from the task data object before storing it.
status – The status object to update.
- Raises:
AssertionError – If the task is not runnable.
- setup_callbacks(pipeline: Pipeline) None
Add callbacks to the pipeline stages.
Callbacks are added to the pipeline stages for those stages and events, for which there is a corresponding method defined in the task object. The method name is constructed from the stage identifier and the event name, separated by underscores. For example, the method
on_stage1_startis called when the stage with the identifierstage1starts.
- store(pipeline: Pipeline, data: Dict[InputID, Dict[str, Any]], config: Config, times: Benchmark[float]) None
Store the computed task data object.
- Parameters:
pipeline – The pipeline used to compute data.
data – The task data object.
config – The hyperparameters used to compute data.
times – The run times of the pipeline stages.
- strip_marginals(pipeline: Pipeline, data_chunk: Dict[InputID, Dict[str, Any]]) Dict[str, Any]
Strip the marginal fields from the task data object.
- Parameters:
pipeline – The pipeline object.
data_chunk – The task data object (not modified).
- Returns:
Shallow copy of the task data object without the marginal fields.
- repype.task.TaskData
Task data object. A dictionary with input objects as keys and pipeline data objects as values.