repype.batch

class repype.batch.Batch(task_cls: ~typing.Type[~repype.task.Task] = <class 'repype.task.Task'>)

Bases: object

A collection of tasks to run. Each task is uniquely identified by its path.

Parameters:

task_cls – The class to use for tasks. Defaults to repype.task.Task.

async cancel() None

Cancel currently running tasks (if any).

context(path: PathLike) RunContext | None

Get a run context for a specific task.

Returns:

The run context for the task, or None if the task is not loaded.

property contexts: List[RunContext]

Get a list of run contexts for all tasks.

The list is sorted alphabetically by the task path.

load(root_path: PathLike) None

Load all tasks from a directory tree.

property pending: List[RunContext]

Get a list of run contexts for all pending tasks.

property resolved_tasks: Dict[Path, Task]

Get a dictionary of all tasks, indexed by their resolved path.

async run(contexts: List[RunContext] | None = None, status: Status | None = None) bool

Run all pending tasks (or a subset).

Each task is run in a separate process using run_task_process(). This ensures that each task runs with a clean environment, and no memory is leaked in between of tasks.

Parameters:
  • contexts – List of run contexts to run. Defaults to all pending tasks.

  • status – The status object to update during task execution. Defaults to a new status object.

Returns:

True if all tasks were completed successfully, and False otherwise

task(path: PathLike, spec: dict | None = None) Task | None

Retrieve a task by its path.

The task is loaded from the task specification if it has not been loaded before. Otherwise, the previously loaded task is returned. The task specification is either the spec argument, or the task.yml file in the task directory. The former is precedencial over the latter.

The path argument is used to later:

  1. Identitfy the task using this method

  2. Establish parential relations, see repype.task.Task.parent

  3. Resolve filepaths, see repype.pipeline.Pipeline.resolve()

task_cls: Type[Task]

The class to use for tasks.

task_process: Process | None

The process running the current task.

tasks: Dict[Path, Task]

A dictionary of tasks, indexed by their path.

class repype.batch.RunContext(task: Task)

Bases: object

The pipeline and the hyperparameters used to run a task.

Parameters:

task – The task to run.

config: Config

The hyperparameters to run the task with. Defaults to task.create_config().

pending: Literal['incomplete', 'pipeline', 'specification', '']

If and why the task is pending, or not pending at all.

See repype.task.Task.is_pending() for possible values.

pipeline: Pipeline

The pipeline to run the task with. Defaults to task.create_pipeline().

run(*args, **kwargs) Dict[InputID, Dict[str, Any]]

Run the task.

Parameters:
  • args – Additional arguments to pass to the task.

  • kwargs – Additional keyword arguments to pass to the task.

Returns:

The task data object returned by the task.

task: Task

The task to run.

repype.batch.run_task_process(rc, status) int

Run a task using specific RunContext and repype.status.Status objects inside a separate process.

Parameters:
  • exit_code – The connection to send the exit code to.

  • args_serialized – The serialized arguments to run the task. This should be a tuple of the shape (rc, status), where rc is a RunContext object and status is a repype.status.Status object, serialized using dill.

Returns:

0 upon successful completion, and 1 indicates failure.