tmlib.workflow package

Module contents

TissueMAPS workflow.

A workflow is a sequence of distributed computational tasks (steps). Each step represents a collection of batch jobs that can be processed in parallel and is comprised of the following phases:

  • init: Paritioning of the computational task into smaller batches based on user provided arguments.
  • run: Scheduled parallel processing of individual batches according to user defined resource allocation.
  • collect (optional): Postprocessing of results obtained by individual batch jobs.

A step is implemented as a subpackage of tmlib.workflow containing three modules:

  • args: Must implement BatchArguments and SubmissionArguments and decorate them with register_step_batch_args and register_step_submission_args, respectively. These classes provide step-specific arguments controlling the partitioning of the given computational task into separate batch jobs and the amount of computational resources, which should get allocated to each batch job.
  • api: Must implement WorkflowStepAPI and decorate it with register_step_api. This class provides the active programming interface (API) with methods for creation and management of batch jobs. The methods create_batches, run_job and collect_job_output implemented by derived classes are responsible for the step-specific processing behaviour and controlled via the batch and submission arguments described above.
  • cli: Must implement WorkflowStepCLI. This class provides the command line interface (CLI) and main entry points for the program. It supplies high-level methods for the different phases, which delegate processing to the respective API methods.

This implementation automatically registers a step and makes it available for integration into larger workflows. To this end, steps are further combined into abstract task collections referred to as stages. A stage bundles one ore more steps into a logical processing unit taking potential dependencies between them into account.

A Workflow is dynamically build from steps based on WorkflowDescription - provided by users as a mapping in YAML format. The workflow structure, i.e. the sequence of stages and steps and their interdependencies, is defined by its type, which is determined based on an implementation of WorkflowDependencies. To make a type available for use, the derived class must be registered via register_workflow_type. As an example serves the “canonical” type declared by CanonicalWorkflowDependencies.

tmlib.workflow.climethod(help, **kwargs)

Method decorator that flags a method for use in the command line interface and provides description for the arguments of the method, which are required for parsing of arguments via the command line.

The decorator further constructs the docstring for the docorated method.

Parameters:

help: str

brief description of the method

**kwargs: Dict[str, tmlib.workflow.args.Argument]

descriptors for each argument of the method

Returns:

unboundmethod

Raises:

TypeError

when registered function is not a method

TypeError

when the class of the registered method is not derived from WorkflowStepCLI

TypeError

when the value specified by a keyword argument doesn’t have type Argument

ValueError

when the key of an keyword argument doesn’t match a parameter of the method

tmlib.workflow.get_step_api(name)

Gets the step-specific implementation of the WorkflowStepAPI API class.

Parameters:

name: str

name of the step

Returns:

classobj

API class

tmlib.workflow.get_step_args(name)

Gets step-specific implementations of ArgumentCollection classes.

Parameters:

name: str

name of the step

Returns:

Tuple[Union[tmlib.workflow.args.BatchArguments, tmlib.workflow.args.SubmissionArguments]]

class for batch and submission arguments

tmlib.workflow.get_step_information(name)

Gets the full name of the given step and a brief description.

Parameters:

name: str

name of the step

Returns:

Tuple[str]

full name and brief description

tmlib.workflow.register_step_api(name)

Class decorator to register a derived class of WorkflowStepAPI as a step API.

Parameters:

name: str

name of the corresponding worklow step

Returns:

classobj

Raises:

TypeError

when decorated class is not derived from WorkflowStepAPI

tmlib.workflow.register_step_batch_args(name)

Class decorator to register a derived class of BatchArguments for a workflow step to use it via the command line or within a workflow.

Parameters:

name: str

name of the corresponding workflow step

Returns:

classobj

Raises:

TypeError

when decorated class is not derived from BatchArguments

tmlib.workflow.register_step_submission_args(name)

Class decorator to register a derived class of SubmissionArguments for a worklow step to use it via the command line or within a worklow.

Parameters:

name: str

name of the corresponding workflow step

Returns:

classobj

Raises:

TypeError

when decorated class is not derived from SubmissionArguments

tmlib.workflow.register_workflow_type(name)

Class decorator to register a derived class of WorkflowDependencies.

Parameters:

name: str

name of the type of workflow

Returns:

classobj

Raises:

TypeError

when decorated class is not derived from WorkflowDependencies

Subpackages

Submodules

tmlib.workflow.api module

class tmlib.workflow.api.BasicWorkflowStepAPI

Bases: object

Abstract base class for cluster routines.

datetimestamp
Returns:

str

datetime stamp in the form “year-month-day_hour:minute:second”

timestamp
Returns:

str

time stamp in the form “hour:minute:second”

class tmlib.workflow.api.WorkflowStepAPI(experiment_id)

Bases: tmlib.workflow.api.BasicWorkflowStepAPI

Abstract base class for API classes, which provide methods for for large-scale image processing on a batch cluster.

Each workflow step must implement this class and decorate it with register_step_api to register it for use within a Workflow.

Parameters:

experiment_id: int

ID of the processed experiment

Attributes

experiment_id: int ID of the processed experiment
workflow_location: str absolute path to location where workflow related data should be stored
batches_location

str: location where job description files are stored

collect_job_output(batch)

Collects the output of jobs and fuse them if necessary.

Parameters:

batches: List[dict]

job descriptions

**kwargs: dict

additional variable input arguments as key-value pairs

create_collect_batch(args)

Creates a job description with information required for processing of the batch job of the optional collect phase.

This method returns an empty dictionary. In case the step implements the collect phase and arguments need to be parsed to collect_job_output, the derived class can override this method.

Parameters:

args: tmlib.workflow.args.BatchArguments

an instance of a step-specific implementation BatchArguments

Returns:

dict

job description

create_collect_job(user_name, job_collection, verbosity, duration='06:00:00')

Creates job for the “collect” phase of the step.

Parameters:

user_name: str

name of the submitting user

job_collection: tmlib.workflow.job.CollectPhase

empty collection of collect jobs that should be populated

verbosity: int

logging verbosity for jobs

duration: str, optional

computational time that should be allocated for a single job; in HH:MM:SS format (default: "06:00:00")

Returns:

tmlib.workflow.jobs.CollectJob

collect job

create_collect_phase(submission_id, parent_id)

Creates a job collection for the “collect” phase of the step.

Parameters:

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

Returns:

tmlib.workflow.job.CollectPhase

collection of “collect” jobs

create_init_job(user_name, job_collection, batch_args, verbosity, duration='12:00:00')

Creates job for the “init” phase of the step.

Parameters:

user_name: str

name of the submitting user

job_collection: tmlib.workflow.job.InitPhase

empty collection of init jobs that should be populated

batch_args: tmlib.workflow.args.BatchArguments

step-specific implementation of BatchArguments

duration: str, optional

computational time that should be allocated for the job in HH:MM:SS format (default: "12:00:00")

verbosity: int

logging verbosity for job

Returns:

tmlib.workflow.jobs.InitPhase

init job

create_init_phase(submission_id, parent_id)

Creates a job collection for the “init” phase of the step.

Parameters:

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

Returns:

tmlib.workflow.job.InitPhase

collection of “init” jobs

create_run_batches(args)

Creates job descriptions with information required for processing of individual batch jobs of the parallel run phase.

Each batch is a mapping that must provide at least the following key-value pair:

  • “id”: one-based job identifier number (int)

Additional key-value pairs may be provided, depending on requirements of the step.

Parameters:

args: tmlib.workflow.args.BatchArguments

an instance of a step-specific implementation BatchArguments

Returns:

Union[List[dict], generator]]

job descriptions

create_run_jobs(user_name, job_collection, verbosity, duration, memory, cores)

Creates jobs for the parallel “run” phase of the step.

Parameters:

user_name: str

name of the submitting user

job_collection: tmlib.workflow.job.RunPhase

empty collection of run jobs that should be populated

verbosity: int

logging verbosity for jobs

duration: str

computational time that should be allocated for a single job; in HH:MM:SS format

memory: int

amount of memory in Megabyte that should be allocated for a single job

cores: int

number of CPU cores that should be allocated for a single job

Returns:

tmlib.workflow.jobs.RunPhase

collection of jobs

create_run_phase(submission_id, parent_id)

Creates a job collection for the “run” phase of the step.

Parameters:

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

Returns:

tmlib.workflow.job.SingleRunPhase

collection of “run” jobs

create_step(submission_id, user_name, description)

Creates the workflow step.

Parameters:

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

description: tmlib.workflow.description.WorkflowStepDescription

Returns:

tmlib.workflow.WorkflowStep

delete_previous_job_output()

Deletes the output of a previous submission.

get_collect_batch()

Get description for a CollectJob.

Returns:

Dict[str, Union[int, str, list, dict]]

job description

get_log_output(phase, job_id=None)

Gets log outputs (standard output and error).

Parameters:

phase: str

phase of the workflow step (options: {"init", "run", "collect"})

job_id: int, optional

one-based identifier number of “run” jobs (default: None)

Returns:

Dict[str, str]

“stdout” and “stderr” for the given job

get_run_batch(job_id)

Get description for a RunJob.

Parameters:

job_id: int

one-based job identifier

Returns:

Dict[str, Union[int, str, list, dict]]

job description

get_run_job_ids()

Gets IDs of jobs of the run phase from persisted descriptions.

Returns:

List[int]

job IDs

log_location

str: location where log files are stored

print_job_descriptions(batches)

Prints job descriptions to standard output in YAML format.

Parameters:

batches: Dict[List[dict]]

description of inputs and outputs or individual jobs

run_job(batch, assume_clean_state=False)

Runs an individual job.

Parameters:

batch: dict

description of the job

assume_clean_state: bool, optional

assume that output of previous runs has already been cleaned up

step_location

str: location were step-specific data is stored

step_name

str: name of the step

store_collect_batch(batch)

Persists description for a CollectJob.

Parameters:

batch: Dict[str, Union[int, str, list, dict]]

JSON serializable job description

store_run_batch(batch, job_id)

Persists description for a RunJob.

Parameters:

batch: Dict[str, Union[int, str, list, dict]]

JSON serializable job description

job_id: str

job ID

tmlib.workflow.args module

class tmlib.workflow.args.Argument(type, help, default=None, choices=None, flag=None, short_flag=None, required=False, disabled=False, get_choices=None, meta=None, dependency=())

Bases: object

Descriptor class for an argument.

Parameters:

type: type

type of the argument

help: str

help message that describes the argument

default: Union[str, int, float, bool], optional

default value (default: None)

choices: set or list or function, optional

choices for value

flag: str, optional

alternative name that serves as a flag for command line usage; will be prepented with two hyphens -- (default: None; defaults to name of the argument when not provided)

short_flag: str, optional

single letter that serves as an additional flag for command line usage; will be prepended with one hyphen - (default: None)

required: bool, optional

whether the argument is required (default: False)

disabled: bool, optional

whether the argument should be disabled in the UI (default: False)

get_choices: function, optional

function that takes an object of type Experiment and returns the choices in case they need to (and can) be determined dynamically (default: None)

meta: str, optional

alternative name of the argument displayed for command line options

dependency: tuple, optional

name-value pair of an argument the given argument depends on

add_to_argparser(parser)

Adds the argument to an argument parser for use in a command line interface.

Parameters:

parser: argparse.ArgumentParser

argument parser

Returns:

argparse.ArgumentParser

parser with added arguments

name

str: name of the argument

class tmlib.workflow.args.ArgumentCollection(**kwargs)

Bases: object

Abstract base class for an argument collection. The collection serves as a container for arguments that can be parsed to methods of WorkflowStepCLI decorated with climethod.

Implementations of the class can be instantiated without having to implement a constructor, i.e. an __init__ method. The constructor accepts keyword arguments and strips the values from those arguments that are implemented as class attributes with type Argument and replaces any default values. When there is no default value specified, the argument has to be provided as a key-value pair. Derived classes can explicitly implement a constructor if required.

Parameters:

**kwargs: dict, optional

keyword arguments to overwrite

add_to_argparser(parser)

Adds each argument to an argument parser for use in a command line interface.

Parameters:

parser: argparse.ArgumentParser

argument parser

Returns:

argparse.ArgumentParser

parser with added arguments

docstring

str: docstring in NumPy style for the method to which the arguments should be passed; build based on the value of the help attribute and those of the type and help attributes of individual arguments

help

str: brief description of the collection or the method to which the arguments should be passed

iterargitems()

Iterates over the argument items stored as attributes of the instance.

classmethod iterargs()

Iterates over the class attributes of type Argument.

to_list()

Returns class attributes of type Argument as an array of key-value pairs.

Returns:

List[dict]

description of each argument

union(collection)

Adds all arguments contained in another collection.

Parameters:

collection: tmlib.workflow.args.ArgumentCollection

collection of arguments that should be added

class tmlib.workflow.args.BatchArguments(**kwargs)

Bases: tmlib.workflow.args.ArgumentCollection

Base class for arguments that are used to define how the computational task should be devided into individual batch jobs for parallel processing on the cluster.

These arguments can be passed to a step-specific implementation of the init CLI method and will be parsed to the create_batches API method.

Parameters:

**kwargs: dict, optional

keyword arguments to overwrite

class tmlib.workflow.args.CliMethodArguments(**kwargs)

Bases: tmlib.workflow.args.ArgumentCollection

Collection of arguments that can be passed to a method of a step-specific implemenation of WorkflowStepCLI, which are decoreated with climethod.

Parameters:

**kwargs: dict, optional

keyword arguments to overwrite

class tmlib.workflow.args.SubmissionArguments(**kwargs)

Bases: tmlib.workflow.args.ArgumentCollection

Base class for arguments that are used to control the submission of jobs to the cluster.

These arguments can be passed to a step-specific implementation of the submit CLI method and are parse to the create_jobs API method.

Parameters:

**kwargs: dict, optional

keyword arguments to overwrite

cores

int: number of cores that should be allocated to each “run” job (may be increased in case memory requirements of a job exceed resources of a single core)

duration

str: walltime that should be allocated to a each “run” job in the format “HH:MM:SS” (may need to be adapted depending on the choice of batch size)

memory

int: amount of memory in megabytes that should be allocated to each “run” job

tmlib.workflow.cli module

class tmlib.workflow.cli.WorkflowStepCLI(api_instance, verbosity)

Bases: tmlib.workflow.submission.WorkflowSubmissionManager

Abstract base class for command line interfaces.

Each workflow step must implement this class. The derived class will get automatically equipped with command line interface functionality in form of an instance of argparse.ArgumentParser. The docstring of the derived class is also used for the description attribute of the parser for display in the command line. A separate subparser is added for each method of the derived class that is decorated with climethod. The decorator provides descriptions of the method arguments, which are also added to the corresponding subparser.

The init and submit methods require additional step-specific arguments that are passed to the API methods create_run_batches and create_run_jobs, respectively. These arguments are handled separately, since they also need to be accessible outside the scope of the command line interace. They are provided by step-specific implementations of BatchArguments and SubmissionArguments and added to the corresponding init and submit subparsers, respectively.

Parameters:

api_instance: tmlib.api.WorkflowStepAPI

instance of API class to which processing is delegated

verbosity: int

logging verbosity level

cleanup()

Cleans up the output of a previous submission, i.e. removes files and database entries created by previously submitted jobs

collect()

Collects the output of run jobs, i.e. performs a post-processing operation that either cannot be parallelized or needs to be performed afterwards

info(phase, job_id)

Prints the description of a given batch job to the console

Parameters:

phase: str

phase of the workflow step to which the job belongs

job_id: int

ID of the job for which information should be displayed

init()

Creates batches for parallel processing and thereby defines how the computational task should be distrubuted over the cluster (also cleans up the output of previous submissions)

log(phase, job_id)

Prints the log output of a given batch job to the console

Parameters:

phase: str

phase of the workflow step to which the job belongs

job_id: int

ID of the job for which log output should be shown

name

str: name of the step (command line program)

resubmit(monitoring_depth, monitoring_interval)

Resubmits previously created jobs for “run” and “collect” phases to the cluster and monitors their status upon processing

Parameters:

monitoring_depth: int

number of child tasks that should be monitored

monitoring_interval: int

seconds to wait between monitoring iterations

run(job_id, assume_clean_state)

Runs an invidiual batch job on the local machine

Parameters:

assume_clean_state: bool

assume that previous outputs have been cleaned up

job_id: int

ID of the job that should be run

submit(monitoring_depth, monitoring_interval)

Creates batch jobs for the “run” and “collect” phases, submits them to the cluster and monitors their status upon processing (requires a prior “init”)

Parameters:

monitoring_depth: int

number of child tasks that should be monitored

monitoring_interval: int

seconds to wait between monitoring iterations

tmlib.workflow.dependencies module

class tmlib.workflow.dependencies.CanonicalWorkflowDependencies

Bases: tmlib.workflow.dependencies.WorkflowDependencies

Declaration of dependencies for the canonical workflow.

INTER_STAGE_DEPENDENCIES = {'pyramid_creation': set(['image_conversion', 'image_preprocessing']), 'image_conversion': {}, 'image_analysis': set(['image_conversion', 'image_preprocessing']), 'image_preprocessing': set(['image_conversion'])}

collections.OrderedDict[str, Set[str]]: dependencies between workflow stages

INTRA_STAGE_DEPENDENCIES = {'metaextract': {}, 'imextract': set(['metaconfig']), 'metaconfig': set(['metaextract'])}

Dict[str, Set[str]: dependencies between workflow steps within one stage

STAGES = ['image_conversion', 'image_preprocessing', 'pyramid_creation', 'image_analysis']

List[str]: names of workflow stages

STAGE_MODES = {'pyramid_creation': 'sequential', 'image_conversion': 'sequential', 'image_analysis': 'sequential', 'image_preprocessing': 'parallel'}

Dict[str, str]: mode for each workflow stage, i.e. whether setps of a stage should be submitted in parallel or sequentially

STEPS_PER_STAGE = {'pyramid_creation': ['illuminati'], 'image_conversion': ['metaextract', 'metaconfig', 'imextract'], 'image_analysis': ['jterator'], 'image_preprocessing': ['corilla']}

collections.OrderedDict[str, List[str]]: names of steps within each stage

class tmlib.workflow.dependencies.MultiplexingWorkflowDependencies

Bases: tmlib.workflow.dependencies.CanonicalWorkflowDependencies

Declaration of dependencies for a multiplexing workflow, which includes the algin step.

STAGE_MODES = {'pyramid_creation': 'sequential', 'image_conversion': 'sequential', 'image_analysis': 'sequential', 'image_preprocessing': 'sequential'}

Dict[str, str]: mode for each workflow stage, i.e. whether setps of a stage should be submitted in parallel or sequentially

STEPS_PER_STAGE = {'pyramid_creation': ['illuminati'], 'image_conversion': ['metaextract', 'metaconfig', 'imextract'], 'image_analysis': ['jterator'], 'image_preprocessing': ['corilla', 'align']}

Dict[str, List[str]]: names of steps within each stage

class tmlib.workflow.dependencies.WorkflowDependencies

Bases: object

Abstract base class for declartion of workflow dependencies.

Derived classes will be used by descriptor classes in tmlib.worklow.description to declare a workflow type.

In addtion, derived classes must implement the following attributes:

  • __type__: name of the workflow type
  • STAGES (list): names of stages that the workflow should have
  • STAGE_MODES (dict): mapping of stage name to processing mode (either "parallel" or "sequential")
  • STEPS_PER_STAGE (dict): ordered mapping of stage name to corresponding step names
  • INTER_STAGE_DEPENDENCIES (dict): mapping of stage name to names of other stages the referenced stage depends on
  • INTRA_STAGE_DEPENDENCIES (dict): mapping of step name to names of other steps the referenced step depends on
tmlib.workflow.dependencies.get_workflow_dependencies(name)

Gets a workflow type specific implementation of WorkflowDependencies.

Parameters:

name: str

name of the workflow type

Returns:

classobj

tmlib.workflow.dependencies.get_workflow_type_information()

Gets the names of each implemented workflow type.

Returns:

Set[str]

names of workflow types

tmlib.workflow.description module

class tmlib.workflow.description.WorkflowDescription(type, stages=None)

Bases: object

Description of a TissueMAPS workflow.

A workflow consists of a sequence of stages, which are themselves composed of steps. Each step represents a collection of computational jobs, which can be submitted for parallel processing on a cluster.

In principle, workflow steps could be arranged in arbitrary order, since dependencies for each step are checked dynamically while the workflow progresses. If a dependency is not fullfilled upon progression to the next step, i.e. if a required input has not been generated by another upstream step, the workflow will stop. However, in order to be able to check the workflow logic before any jobs get submitted, workflows dependencies are checked according a specific implementation of WorkflowDependencies.

Parameters:

type: str

type of the workflow, i.e. a registered workflow dependency declaration in form of an implementation of tmlib.workflow.dependencies.WorkflowDependencies

stages: List[dict]

description of workflow stages as a mapping of key-value pairs

add_stage(stage_description)

Adds an additional stage to the workflow.

Parameters:

stage_description: tmlib.workflow.description.WorkflowStageDescription

description of the stage that should be added

Raises:

TypeError

when stage_description doesn’t have type WorkflowStageDescription

jsonify()

Returns attributes as key-value pairs endcoded as JSON.

Returns:

str

JSON string encoding the description of the workflow as a mapping of key-value pairs

to_dict()

Returns attributes as key-value pairs.

Returns:dict
class tmlib.workflow.description.WorkflowStageDescription(type, name, mode, active=True, steps=None)

Bases: object

Description of a TissueMAPS workflow stage.

Parameters:

type: str

name of the workflow type

name: str

name of the stage

mode: str

mode of workflow stage submission, i.e. whether steps are submitted simultaneously or one after another (options: {"sequential", "parallel"})

active: bool, optional

whether the stage should be processed

steps: List[dict]

description of steps in form of key-value pairs

Raises:

TypeError

when name or steps have the wrong type

add_step(step_description)

Adds an additional step to the stage.

Parameters:

step_description: tmlib.workflow.description.WorkflowStepDescription

description of the step that should be added

Raises:

TypeError

when step_description doesn’t have type WorkflowStepDescription

jsonify()

Returns the attributes as key-value pairs encoded as JSON.

Returns:

str

JSON string encoding the description of the stage as a mapping of key-value pairs

to_dict()

Returns the attributes as key-value pairs.

Returns:dict
class tmlib.workflow.description.WorkflowStepDescription(name, active=True, batch_args=None, submission_args=None)

Bases: object

Description of a workflow step.

Parameters:

name: str

name of the step

active: bool, optional

whether the step should be processed

batch_args: tmlib.workflow.args.BatchArguments, optional

batch arguments

submission_args: tmlib.workflow.args.SubmissionArguments, optional

submission arguments

Raises:

WorkflowDescriptionError

when a provided argument is not a valid argument for the given step

batch_args

tmlib.workflow.args.BatchArguments: batch arguments

jsonify()

Returns attributes as key-value pairs encoded as JSON.

Returns:

str

JSON string encoding the description of the step as a mapping of key-value pairs

submission_args

tmlib.workflow.args.SubmissionArguments: submission arguments

to_dict()

Returns attributes as key-value pairs.

Returns:dict

tmlib.workflow.jobs module

class tmlib.workflow.jobs.CollectJob(step_name, arguments, output_dir, submission_id, user_name, parent_id)

Bases: tmlib.workflow.jobs.WorkflowStepJob

Class for a collect jobs, which can be processed once all parallel jobs are successfully completed.

Parameters:

step_name: str

name of the corresponding TissueMAPS workflow step

arguments: List[str]

command line arguments

output_dir: str

absolute path to the output directory, where log reports will be stored

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent CollectPhase

name

str:name of the job

class tmlib.workflow.jobs.CollectPhase(step_name, submission_id, parent_id, job=None)

Bases: gc3libs.workflow.ParallelTaskCollection, tmlib.workflow.jobs.JobCollection

Collection of jobs for the “collect” phase of a workflow step.

Parameters:

step_name: str

name of the corresponding WorkflowStep

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

job: tmlibs.workflow.jobs.CollectJob, optional

job that should be processed (default: None)

add(job)

Adds a job to the phase.

Parameters:

job: tmlibs.workflow.jobs.CollectJob

job that should be added

Raises:

TypeError

when job has wrong type

ValueError

when the phase already contains another job

class tmlib.workflow.jobs.IndependentJobCollection(step_name, submission_id, jobs=None)

Bases: gc3libs.workflow.SequentialTaskCollection, tmlib.workflow.jobs.JobCollection

Collection of jobs for manual submission of the run and collect phases of a workflow steps independent of the main workflow.

Parameters:

step_name: str

name of the corresponding TissueMAPS workflow step

submission_id: int

ID of the corresponding submission

jobs: List[tmlibs.workflow.jobs.RunPhase or tmlibs.workflow.jobs.CollectJob], optional

list of jobs that should be processed (default: None)

class tmlib.workflow.jobs.InitJob(step_name, arguments, output_dir, submission_id, user_name, parent_id)

Bases: tmlib.workflow.jobs.WorkflowStepJob

Class for a init jobs, which creates the descriptions for the subsequent run and collect phases.

Parameters:

step_name: str

name of the corresponding TissueMAPS workflow step

arguments: List[str]

command line arguments

output_dir: str

absolute path to the output directory, where log reports will be stored

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent InitPhase

name

str:name of the job

class tmlib.workflow.jobs.InitPhase(step_name, submission_id, parent_id, job=None)

Bases: gc3libs.workflow.ParallelTaskCollection, tmlib.workflow.jobs.JobCollection

Collection of jobs for the “init” phase of a workflow step.

Parameters:

step_name: str

name of the parent WorkflowStep

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

job: tmlibs.workflow.jobs.InitJob, optional

job that should be processed (default: None)

add(job)

Adds a job to the phase.

Parameters:

job: tmlibs.workflow.jobs.InitJob

job that should be added

Raises:

TypeError

when job has wrong type

ValueError

when the phase already contains another job

class tmlib.workflow.jobs.JobCollection

Bases: object

Abstract base class for collections of individual jobs.

class tmlib.workflow.jobs.MultiRunPhase(step_name, submission_id, parent_id, run_job_collections=[])

Bases: gc3libs.workflow.AbortOnError, gc3libs.workflow.SequentialTaskCollection, tmlib.workflow.jobs.RunPhase

Collection of jobs for the “run” phase of workflow step that consits of multiple nested job collections that should be processed sequentially.

Parameters:

step_name: str

name of the corresponding WorkflowStep

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

run_job_collections: List[tmlib.workflow.jobs.SingleRunPhase], optional

collections of run jobs that should be processed one after another

add(run_job_collection)

Add a collection of run jobs to the phase.

Parameters:

run_job_collection: tmlib.workflow.jobs.SingleRunPhase

collection of run jobs that should be added

Raises:

TypeError

when run_job_collection has wrong type

class tmlib.workflow.jobs.RunJob(step_name, arguments, output_dir, job_id, submission_id, user_name, parent_id, index=None)

Bases: tmlib.workflow.jobs.WorkflowStepJob

Class for TissueMAPS run jobs, which can be processed in parallel.

Parameters:

step_name: str

name of the corresponding TissueMAPS workflow step

arguments: List[str]

command line arguments

output_dir: str

absolute path to the output directory, where log reports will be stored

job_id: int

one-based job identifier number

submission_id: int

ID of the corresponding submission

parent_id: int

ID of the parent RunPhase

index: int, optional

index of the run job collection in case the step has multiple run phases

name

str: name of the job

class tmlib.workflow.jobs.RunPhase

Bases: tmlib.workflow.jobs.JobCollection

Abstract base class for a collection of jobs for the “run” phase of a workflow step.

class tmlib.workflow.jobs.SingleRunPhase(step_name, submission_id, parent_id, jobs=None, index=None)

Bases: gc3libs.workflow.ParallelTaskCollection, tmlib.workflow.jobs.RunPhase

Collection of jobs for the “run” phase of workflow step that consits of a single job collection.

Parameters:

step_name: str

name of the corresponding WorkflowStep

submission_id: int

ID of the corresponding Submission

parent_id: int

ID of the parent WorkflowStep

jobs: List[tmlibs.workflow.jobs.RunJob], optional

list of jobs that should be processed (default: None)

index: int, optional

index of the run job collection in case the step has multiple run phases

add(job)

Adds a job to the phase.

Parameters:

job: tmlibs.workflow.jobs.RunJob

job that should be added

Raises:

TypeError

when job has wrong type

class tmlib.workflow.jobs.WorkflowStepJob(step_name, arguments, output_dir, submission_id, user_name, parent_id)

Bases: tmlib.jobs.Job

Abstract base class for an individual job as part of a workflow step phase.

Parameters:

step_name: str

name of the corresponding TissueMAPS workflow step

arguments: List[str]

command line arguments

output_dir: str

absolute path to the output directory, where log reports will be stored

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent JobCollection

name

str:name of the job

tmlib.workflow.manager module

class tmlib.workflow.manager.WorkflowManager(experiment_id, verbosity)

Bases: tmlib.workflow.submission.WorkflowSubmissionManager

Command line interface for submitting, and monitoring TissueMAPS workflows.

Parameters:

experiment_id: int

ID of the processed experiment

verbosity: int

logging verbosity level

resubmit(monitoring_depth, stage)

Resumits a previously created workflow to the cluster and monitors its status.

Parameters:

monitoring_depth: int

number of child tasks that should be monitored

stage: str

stage at which workflow should be submitted

submit(monitoring_depth, monitoring_interval, force=False)

Creates a workflow, submits it to the cluster and monitors its progress.

Parameters:

monitoring_depth: int

number of child tasks that should be monitored

monitoring_interval: int

query status of jobs every monitoring_interval seconds

force: bool, opional

whether inactivated stages and steps should be submitted anyways

tm_workflow

Command line interface for submitting, and monitoring
TissueMAPS workflows.

experiment_id

ID of the experiment that should be processed

-h, --help

show this help message and exit

--verbosity, -v

increase logging verbosity

tm_workflow experiment_id resubmit

resubmit a previously created workflow to the
cluster and monitor its status

-h, --help

show this help message and exit

--monitoring_depth <monitoring_depth>, -m <monitoring_depth>

number of child tasks that should be monitored (default: 2)

--monitoring_interval <monitoring_interval>, -i <monitoring_interval>

interval for monitoring interations (default: 10)

--stage <stage>, -s <stage>

stage at which workflow should be resubmitted

tm_workflow experiment_id submit

create a workflow, submit it to the cluster and
monitor its status

-h, --help

show this help message and exit

--monitoring_depth <monitoring_depth>, -d <monitoring_depth>

number of child tasks that should be monitored (default: 2)

--monitoring_interval <monitoring_interval>, -i <monitoring_interval>

interval for monitoring interations (default: 10)

--force, -f

also submit inactivated stages and steps

tmlib.workflow.submission module

class tmlib.workflow.submission.WorkflowSubmissionManager(experiment_id, program_name)

Bases: tmlib.submission.SubmissionManager

Mixin class for submission and monitoring of workflows.

Parameters:

experiment_id: int

ID of the processed experiment

program_name: str

name of the submitting program

get_task_id_of_last_submission()

Gets the ID of the last submitted task for the given experiment and program.

Returns:

int

ID of top task that was last submitted

submit_jobs(jobs, engine, start_index=0, monitoring_depth=1, monitoring_interval=10)

Submits jobs to a cluster and continuously monitors their progress.

Parameters:

jobs: tmlib.tmaps.workflow.WorkflowStep

jobs that should be submitted

engine: gc3libs.core.Engine

engine that should submit the jobs

start_index: int, optional

index of the job at which the collection should be (re)submitted

monitoring_depth: int, optional

recursion depth for job monitoring, i.e. in which detail subtasks in the task tree should be monitored (default: 1)

monitoring_interval: int, optional

seconds to wait between monitoring iterations (default: 10)

Returns:

dict

information about each job

update_submission(jobs)

Updates the submission with the submitted tasks.

Sets the value for “top_task” column in the “submissions” table.

Parameters:

jobs: gc3libs.Task or gc3libs.workflow.TaskCollection

submitted tasks

Raises:

AttributeError

when jobs doesn’t have a “persistent_id” attribute, which indicates that the task has not yet been inserted into the database table

tmlib.workflow.utils module

tmlib.workflow.utils.create_gc3pie_engine(store)

Creates an Engine instance for submitting jobs for parallel processing.

Parameters:

store: gc3libs.persistence.store.Store

GC3Pie store object

Returns:

gc3libs.core.Engine

engine

tmlib.workflow.utils.create_gc3pie_session(location, store)

Creates a Session instance for job persistence in the PostgresSQL table

Parameters:

location: str

path to a directory on disk for the file system representation of the store

store: gc3libs.persistence.store.Store

store instance

Returns:

gc3libs.persistence.session.Session

GC3Pie session

tmlib.workflow.utils.create_gc3pie_sql_store()

Creates a Store instance for job persistence in the PostgreSQL table Tasks.

Returns:

gc3libs.persistence.sql.SqlStore

GC3Pie store

tmlib.workflow.utils.format_stats_data(stats)

For each task state (and pseudo-state like ok or failed), two values are returned: the count of managed tasks that were in that state when Engine.progress() was last run, and what percentage of the total managed tasks this is.

Parameters:

stats: gc3libs.core.Engine

as returned by gc3libs.core.Engine.stats()

Returns:

dict

global statistics about the jobs in the Engine

tmlib.workflow.utils.format_task_data(name, type, created_at, updated_at, state, exitcode, memory, time, cpu_time)

Formats task data in the way expected by clients:

  • name (str): name of task
  • type (str): type of the task object
  • created_at (str): date and time when task was created
  • updated_at (str): date and time when task was last updated
  • state (str): state of the task
  • live (bool): whether the task is currently processed
  • done (bool): whether the task is done
  • failed (bool): whether the task failed, i.e. terminated with non-zero exitcode
  • percent_done (float): percent of subtasks that are done
  • exitcode (int): status code returned by the program
  • time (str): duration as “HH:MM:SS”
  • memory (float): amount of used memory in MB
  • cpu_time (str): used cpu time as “HH:MM:SS”
Parameters:

name: str

type: str

created_at: datetime.datetime

updated_at: datetime.datetime

state: g3clibs.Run.State

exitcode: int

memory: str

time: str

cpu_time: str

Returns:

Dict[str, Union[str, int, bool]]

tmlib.workflow.utils.format_timestamp(elapsed_time)

Formats a timestamp in seconds to “HH:MM:SS” string.

Parameters:

elapsed_time: float

elapsed time in seconds

Returns:

str

formatted timestamp

tmlib.workflow.utils.get_task_status_recursively(task_id, recursion_depth=None, id_encoder=None)

Provides status information for each task and recursively for subtasks.

Parameters:

task: gc3libs.workflow.TaskCollection or gc3libs.Task

submitted highest level GC3Pie task

recursion_depth: int, optional

recursion depth for subtask querying; by default data of all subtasks will be queried (default: None)

id_encoder: function, optional

function that encodes task IDs

Returns:

dict

information about each task and its subtasks

tmlib.workflow.utils.log_task_failure(task_info, logger)

Logs the failure of a submitted GC3Pie task.

Parameters:

task_info: dict

information about each task and its subtasks

logger: logging.Logger

configured logger instance

tmlib.workflow.utils.log_task_status(task_info, logger, monitoring_depth)

Logs the status of a submitted GC3Pie task.

Parameters:

task_info: dict

information about each task and its subtasks

logger: logging.Logger

configured logger instance

monitoring_depth: int

recursion depth for subtask querying

tmlib.workflow.utils.print_task_status(task_info)

Pretty prints the status of a submitted GC3Pie tasks to the console in table format.

Parameters:

task_info: dict

information about each task and its subtasks

tmlib.workflow.workflow module

class tmlib.workflow.workflow.ParallelWorkflowStage(name, experiment_id, verbosity, submission_id, user_name, parent_id, description=None)

Bases: tmlib.workflow.workflow.WorkflowStage, gc3libs.workflow.ParallelTaskCollection, tmlib.workflow.workflow.State

A workflow stage whose workflow steps should be processed at once in parallel. The number of jobs must thus be known for each step in advance.

Parameters:

name: str

name of the stage

experiment_id: int

ID of the processed experiment

verbosity: int

logging verbosity index

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent Workflow

description: tmlib.tmaps.description.WorkflowStageDescription, optional

description of the stage (default: None)

add(step)

Adds a step.

Parameters:

step: tmlibs.tmaps.workflow.WorkflowStep

step that should be added

Raises:

TypeError

when step has wrong type

class tmlib.workflow.workflow.SequentialWorkflowStage(name, experiment_id, verbosity, submission_id, user_name, parent_id, description=None, waiting_time=0)

Bases: gc3libs.workflow.SequentialTaskCollection, tmlib.workflow.workflow.WorkflowStage, tmlib.workflow.workflow.State

A workflow stage whose steps should be processed sequentially. The number of jobs is generally only known for the first step of the stage, but unknown for the subsequent steps, since their input depends on the output of the previous step. Subsequent steps are thus build dynamically upon transition from one step to the next.

Parameters:

name: str

name of the stage

experiment_id: int

ID of the processed experiment

verbosity: int

logging verbosity index

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent Workflow

description: tmlib.tmaps.description.WorkflowStageDescription, optional

description of the stage (default: None)

waiting_time: int, optional

time in seconds that should be waited upon transition from one stage to the other to avoid issues related to network file systems (default: 0)

next(done)

Progresses to next step.

Parameters:

done: int

zero-based index of the last processed step

Returns:

gc3libs.Run.State

update_step(index)

Updates the indexed step, i.e. creates new jobs for it.

Parameters:

index: int

index for the list of tasks (steps)

class tmlib.workflow.workflow.State

Bases: object

Mixin class that provides convenience properties to determine whether a task is in a given state.

is_new

bool: whether the job is state NEW

is_running

bool: whether the step is in state RUNNING

is_stopped

bool: whether the step is in state STOPPED

is_submitted

bool: whether the step is in state SUBMITTED

is_terminated

bool: whether the step is in state TERMINATED

class tmlib.workflow.workflow.Workflow(experiment_id, verbosity, submission_id, user_name, description, waiting_time=0)

Bases: gc3libs.workflow.SequentialTaskCollection, tmlib.workflow.workflow.State

A workflow represents a computational pipeline that gets dynamically assembled from individual stages based on a user provided description.

Parameters:

experiment_id: int

ID of processed experiment

verbosity: int

logging verbosity index

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

description: tmlib.tmaps.description.WorkflowDescription

description of the workflow

waiting_time: int, optional

time in seconds that should be waited upon transition from one stage; required with certain network file systems settings (default: 0)

n_stages

int: total number of active stages

next(done)

Progresses to next stage.

Parameters:

done: int

zero-based index of the last processed stage

Returns:

gc3libs.Run.State

update_description(description)

Updates the workflow description by removing inactive stages/steps from the description that will ultimately be used to dynamically build stages upon processing.

Parameters:

description: tmlib.tmaps.description.WorkflowDescription

description of the workflow

Raises:

TypeError

when description doesn’t have type WorkflowDescription

update_stage(index)

Updates the indexed stage, i.e. creates the individual computational jobs for each step of the stage.

Parameters:

index: int

index for the list of tasks (stages)

class tmlib.workflow.workflow.WorkflowStage(name, experiment_id, verbosity, submission_id, user_name, parent_id, description)

Bases: tmlib.workflow.workflow.State

Base class for TissueMAPS workflow stages. A workflow stage is composed of one or more workflow steps, which together comprise an abstract computational task.

Parameters:

name: str

name of the stage

experiment_id: int

ID of the processed experiment

verbosity: int

logging verbosity index

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent Workflow

description: tmlib.tmaps.description.WorkflowStageDescription

description of the stage

Raises:

TypeError

when description doesn’t have the correct type

n_steps

int: number of steps in the stage

class tmlib.workflow.workflow.WorkflowStep(name, experiment_id, verbosity, submission_id, user_name, parent_id, description)

Bases: gc3libs.workflow.AbortOnError, gc3libs.workflow.SequentialTaskCollection, tmlib.workflow.workflow.State

A workflow step represents a collection of computational tasks that should be processed in parallel on a cluster, i.e. one parallelization step within a larger workflow.

Parameters:

name: str

name of the step

experiment_id: int

ID of the processed experiment

verbosity: int

logging verbosity index

submission_id: int

ID of the corresponding submission

user_name: str

name of the submitting user

parent_id: int

ID of the parent WorkflowStage

description: tmlib.tmaps.description.WorkflowStepDescription

description of the step

collect_phase

tmlib.workflow.jobs.CollectPhase: collection of jobs for “collect” phase

init_phase

tmlib.workflow.jobs.InitJob: collection of job for the “init” phase

initialize()

Initializes the step, i.e. generates the jobs for the different phases.

next(done)

Progresses to the next phase.

Parameters:

done: int

zero-based index of the last processed phase

Returns:

gc3libs.Run.State

run_phase

tmlib.workflow.jobs.RunPhase: collection of jobs for the “run” phase