repype.task

repype.task.PendingReason

Reasons why a task is pending, or not pending at all.

alias of Literal[‘incomplete’, ‘pipeline’, ‘specification’, ‘’]

class repype.task.Task(path: PathLike, spec: dict, parent: Self | None = None)

Bases: object

Task in batch processing.

Each task can have a parent task, so each task is part of a task tree.

Parameters:
  • path – The path to the task directory.

  • spec – The task specification.

  • parent – The parent task of this task.

compute_sha(config: Config | None = None) str

Compute the SHA-1 hash of the full task specification adopted for the config.

create_config() Config

Creates object which represents the hyperparameters of this task.

The hyperparameters are combined from three sources, in the following order, where the later sources take precedence:

  1. The hyperparameters of the parent task.

  2. The base_config file specified in the task specification (if any).

  3. The config section of the task specification (if any).

The hyperparameters can be adopted by making changes to the returned object before running the task, but the changes are not reflected in the task specification.

create_pipeline(*args, **kwargs) Pipeline

Instantiates and returns the pipeline from the task specification.

Can be overridden in subclasses to create custom pipelines.

property data_filepath: Path

The path to the stored task data object of the task.

property digest: Mapping[str, Any] | None

Immutable full specification of the task completion (or None).

property digest_sha_filepath: Path

The path to the hash values of the task completion.

This contains the SHA-1 hashes of the full task specification adopted for the configuration which was used to complete the task, and the hashes of the stages of the pipeline.

property digest_task_filepath: Path

The path to the full task specification of the task completion.

This is the full task specification adopted for the configuration which was used to complete the task.

find_first_diverging_stage(pipeline: Pipeline, config: Config) Stage | None

Find the first diverging stage of the task.

Stages are considered diverging if they are new, or if their implementation or hyperparameters have changed. Changes of the implementation or hyperparameters of a stage are detected by comparing the SHA-1 hashes of the stage.signature and hyperparameters of the stage.

Parameters:
  • pipeline – The pipeline object.

  • config – The hyperparameters.

Returns:

The first diverging stage of the task, or None if there is no diverging stage.

find_pickup_task(pipeline: Pipeline, config: Config) Dict[str, Self | Stage]

Find a previosly completed task to pick up computations from.

Returns a dictionary with the following keys:

  • task: The task to pick up from, or None if there is no task to pick up from.

  • first_diverging_stage: The first stage of the pipeline which needs to run, or None if no further computations are required.

Parameters:
  • pipeline – The pipeline object.

  • config – The hyperparameters.

property full_spec: Dict[str, Any]

The full specification of the task, including the parent task specifications.

get_full_spec_with_config(config: Config) Dict[str, Any]

Get the full specification of the task adopted for the config.

get_marginal_fields(pipeline: Pipeline) FrozenSet[str]

Get the marginal fields from a pipeline.

The marginal fields are all outputs produced by marginal stages. Marginal stages are those stages which are listed in the marginal_stages property. Marginal fields are removed from the pipeline data objects before merging it into the task data object, and when storing the results of the task.

Parameters:

pipeline – The pipeline object.

Returns:

Set of marginal fields.

property input_ids

The identifiers of the inputs of the task.

is_pending(pipeline: Pipeline, config: Config) Literal['incomplete', 'pipeline', 'specification', '']

Tells whether the task needs to run, or whether the task is completed or not runnable.

Returns:

The task needs to run because it is not completed. - “pipeline”: The task needs to run because the pipeline has changed. - “specification”: The task needs to run because the task specification or hyperparameters have changed. - “”: The task is completed and does not need to run.

Return type:

  • “incomplete”

load(pipeline: Pipeline | None = None) Dict[InputID, Dict[str, Any]]

Load the previously computed task data object.

To ensure consistency with the task specification, it is verified that the loaded data contains results for all input identifiers of the task, and no spurious identifiers. If the pipeline is not None, a check for consistency of the data with the pipeline is also performed. The loaded task data object is consistent with the pipeline if the data contains all fields which are not marginal according to the get_marginal_fields() method, and no additional fields.

Parameters:

pipeline – The pipeline object.

Returns:

The previously stored task data object.

property marginal_stages: Iterator[str]

The stages which are considered marginal.

Outputs of marginal stages are removed from the pipeline data objects when storing the results of the task. The default implementation reads the list of marginal stages from the marginal_stages field in the task specification.

Yields:

Stage identifiers corresponding to the marginal stages.

parent: Self | None

The parent task of this task.

property parents: Iterator[Self]

Generator which yields all parent tasks of this task, starting with the immediate parent.

path: Path

The path to the task directory.

reset()

Reset the task by removing all stored data.

resolve_path(path: PathLike | None) Path | None

Resolves a path relatively to the task directory.

In addition, following placeholders are replaced:

  • {DIRNAME}: The name of the task directory.

  • {ROOTDIR}: The root directory of the task tree.

property root: Self

The root task of the task tree.

run(config: Config, pipeline: Pipeline | None = None, pickup: bool = True, strip_marginals: bool = True, status: Status | None = None) Dict[InputID, Dict[str, Any]]

Run the task.

Parameters:
  • config – The hyperparameters to run the task with.

  • pipeline – The pipeline to run the task with. Defaults to create_pipeline().

  • pickup – If True, pick up computations from a previously completed task.

  • strip_marginals – If True, strip the marginal fields from the task data object before storing it.

  • status – The status object to update.

Raises:

AssertionError – If the task is not runnable.

property runnable: bool

True if the task is runnable, and False otherwise.

setup_callbacks(pipeline: Pipeline) None

Add callbacks to the pipeline stages.

Callbacks are added to the pipeline stages for those stages and events, for which there is a corresponding method defined in the task object. The method name is constructed from the stage identifier and the event name, separated by underscores. For example, the method on_stage1_start is called when the stage with the identifier stage1 starts.

spec: Dict[str, Any]

The task specification.

store(pipeline: Pipeline, data: Dict[InputID, Dict[str, Any]], config: Config, times: Benchmark[float]) None

Store the computed task data object.

Parameters:
  • pipeline – The pipeline used to compute data.

  • data – The task data object.

  • config – The hyperparameters used to compute data.

  • times – The run times of the pipeline stages.

strip_marginals(pipeline: Pipeline, data_chunk: Dict[InputID, Dict[str, Any]]) Dict[str, Any]

Strip the marginal fields from the task data object.

Parameters:
  • pipeline – The pipeline object.

  • data_chunk – The task data object (not modified).

Returns:

Shallow copy of the task data object without the marginal fields.

property times: Benchmark[float]

The run times of the task completion.

property times_filepath: Path

The path to the CSV file with the run times of the task completion.

repype.task.TaskData

Task data object. A dictionary with input objects as keys and pipeline data objects as values.

alias of Dict[InputID, Dict[str, Any]]

repype.task.decode_input_ids(spec: InputID | List[InputID]) List[InputID]

Convert a string of comma-separated integers (or ranges thereof) to a list of integers.

If spec is a list already, or a single input identifier, it is returned as is.

repype.task.load_from_module(name: str) Any

Load an object from a module.