tmlib.workflow package¶
Module contents¶
TissueMAPS workflow.
A workflow is a sequence of distributed computational tasks (steps). Each step represents a collection of batch jobs that can be processed in parallel and is comprised of the following phases:
- init: Paritioning of the computational task into smaller batches based on user provided arguments.
- run: Scheduled parallel processing of individual batches according to user defined resource allocation.
- collect (optional): Postprocessing of results obtained by individual batch jobs.
A step is implemented as a subpackage of tmlib.workflow
containing
three modules:
- args: Must implement
BatchArguments
andSubmissionArguments
and decorate them withregister_step_batch_args
andregister_step_submission_args
, respectively. These classes provide step-specific arguments controlling the partitioning of the given computational task into separate batch jobs and the amount of computational resources, which should get allocated to each batch job.- api: Must implement
WorkflowStepAPI
and decorate it withregister_step_api
. This class provides the active programming interface (API) with methods for creation and management of batch jobs. The methodscreate_batches
,run_job
andcollect_job_output
implemented by derived classes are responsible for the step-specific processing behaviour and controlled via the batch and submission arguments described above.- cli: Must implement
WorkflowStepCLI
. This class provides the command line interface (CLI) and main entry points for the program. It supplies high-level methods for the different phases, which delegate processing to the respective API methods.
This implementation automatically registers a step and makes it available for integration into larger workflows. To this end, steps are further combined into abstract task collections referred to as stages. A stage bundles one ore more steps into a logical processing unit taking potential dependencies between them into account.
A Workflow
is dynamically build
from steps based on
WorkflowDescription
-
provided by users as a mapping in YAML format.
The workflow structure, i.e. the sequence of stages and steps and their
interdependencies, is defined by its type, which is determined based on an
implementation of
WorkflowDependencies
.
To make a type available for use, the derived class must be
registered via
register_workflow_type
.
As an example serves the “canonical” type declared by
CanonicalWorkflowDependencies
.
-
tmlib.workflow.
climethod
(help, **kwargs)¶ Method decorator that flags a method for use in the command line interface and provides description for the arguments of the method, which are required for parsing of arguments via the command line.
The decorator further constructs the docstring for the docorated method.
Parameters: help: str
brief description of the method
**kwargs: Dict[str, tmlib.workflow.args.Argument]
descriptors for each argument of the method
Returns: unboundmethod
Raises: TypeError
when registered function is not a method
TypeError
when the class of the registered method is not derived from
WorkflowStepCLI
TypeError
when the value specified by a keyword argument doesn’t have type
Argument
ValueError
when the key of an keyword argument doesn’t match a parameter of the method
-
tmlib.workflow.
get_step_api
(name)¶ Gets the step-specific implementation of the
WorkflowStepAPI
API class.Parameters: name: str
name of the step
Returns: classobj
API class
-
tmlib.workflow.
get_step_args
(name)¶ Gets step-specific implementations of
ArgumentCollection
classes.Parameters: name: str
name of the step
Returns: Tuple[Union[tmlib.workflow.args.BatchArguments, tmlib.workflow.args.SubmissionArguments]]
class for batch and submission arguments
-
tmlib.workflow.
get_step_information
(name)¶ Gets the full name of the given step and a brief description.
Parameters: name: str
name of the step
Returns: Tuple[str]
full name and brief description
See also
-
tmlib.workflow.
register_step_api
(name)¶ Class decorator to register a derived class of
WorkflowStepAPI
as a step API.Parameters: name: str
name of the corresponding worklow step
Returns: classobj
Raises: TypeError
when decorated class is not derived from
WorkflowStepAPI
-
tmlib.workflow.
register_step_batch_args
(name)¶ Class decorator to register a derived class of
BatchArguments
for a workflow step to use it via the command line or within a workflow.Parameters: name: str
name of the corresponding workflow step
Returns: classobj
Raises: TypeError
when decorated class is not derived from
BatchArguments
-
tmlib.workflow.
register_step_submission_args
(name)¶ Class decorator to register a derived class of
SubmissionArguments
for a worklow step to use it via the command line or within a worklow.Parameters: name: str
name of the corresponding workflow step
Returns: classobj
Raises: TypeError
when decorated class is not derived from
SubmissionArguments
-
tmlib.workflow.
register_workflow_type
(name)¶ Class decorator to register a derived class of
WorkflowDependencies
.Parameters: name: str
name of the type of workflow
Returns: classobj
Raises: TypeError
when decorated class is not derived from
WorkflowDependencies
Subpackages¶
- tmlib.workflow.align package
- tmlib.workflow.corilla package
- tmlib.workflow.illuminati package
- tmlib.workflow.imextract package
- tmlib.workflow.jterator package
- Module contents
- Submodules
- tmlib.workflow.jterator.api module
- tmlib.workflow.jterator.args module
- tmlib.workflow.jterator.description module
- tmlib.workflow.jterator.cli module
- jterator
- jterator experiment_id check
- jterator experiment_id cleanup
- jterator experiment_id collect
- jterator experiment_id create
- jterator experiment_id debug
- jterator experiment_id info
- jterator experiment_id init
- jterator experiment_id log
- jterator experiment_id remove
- jterator experiment_id resubmit
- jterator experiment_id run
- jterator experiment_id submit
- jterator
- tmlib.workflow.jterator.handles module
- tmlib.workflow.jterator.module module
- tmlib.workflow.jterator.project module
- tmlib.workflow.jterator.utils module
- tmlib.workflow.metaconfig package
- Module contents
- Submodules
- tmlib.workflow.metaconfig.api module
- tmlib.workflow.metaconfig.args module
- tmlib.workflow.metaconfig.base module
- tmlib.workflow.metaconfig.cellvoyager module
- tmlib.workflow.metaconfig.cli module
- tmlib.workflow.metaconfig.metamorph module
- tmlib.workflow.metaconfig.omexml module
- tmlib.workflow.metaconfig.visiview module
- tmlib.workflow.metaextract package
Submodules¶
tmlib.workflow.api module¶
-
class
tmlib.workflow.api.
BasicWorkflowStepAPI
¶ Bases:
object
Abstract base class for cluster routines.
-
datetimestamp
¶ Returns: str
datetime stamp in the form “year-month-day_hour:minute:second”
-
timestamp
¶ Returns: str
time stamp in the form “hour:minute:second”
-
-
class
tmlib.workflow.api.
WorkflowStepAPI
(experiment_id)¶ Bases:
tmlib.workflow.api.BasicWorkflowStepAPI
Abstract base class for API classes, which provide methods for for large-scale image processing on a batch cluster.
Each workflow step must implement this class and decorate it with
register_step_api
to register it for use within aWorkflow
.Parameters: experiment_id: int
ID of the processed experiment
Attributes
experiment_id: int ID of the processed experiment workflow_location: str absolute path to location where workflow related data should be stored -
batches_location
¶ str: location where job description files are stored
-
collect_job_output
(batch)¶ Collects the output of jobs and fuse them if necessary.
Parameters: batches: List[dict]
job descriptions
**kwargs: dict
additional variable input arguments as key-value pairs
-
create_collect_batch
(args)¶ Creates a job description with information required for processing of the batch job of the optional collect phase.
This method returns an empty dictionary. In case the step implements the collect phase and arguments need to be parsed to
collect_job_output
, the derived class can override this method.Parameters: args: tmlib.workflow.args.BatchArguments
an instance of a step-specific implementation
BatchArguments
Returns: dict
job description
-
create_collect_job
(user_name, job_collection, verbosity, duration='06:00:00')¶ Creates job for the “collect” phase of the step.
Parameters: user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.CollectPhase
empty collection of collect jobs that should be populated
verbosity: int
logging verbosity for jobs
duration: str, optional
computational time that should be allocated for a single job; in HH:MM:SS format (default:
"06:00:00"
)Returns: tmlib.workflow.jobs.CollectJob
collect job
-
create_collect_phase
(submission_id, parent_id)¶ Creates a job collection for the “collect” phase of the step.
Parameters: submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
Returns: tmlib.workflow.job.CollectPhase
collection of “collect” jobs
-
create_init_job
(user_name, job_collection, batch_args, verbosity, duration='12:00:00')¶ Creates job for the “init” phase of the step.
Parameters: user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.InitPhase
empty collection of init jobs that should be populated
batch_args: tmlib.workflow.args.BatchArguments
step-specific implementation of
BatchArguments
duration: str, optional
computational time that should be allocated for the job in HH:MM:SS format (default:
"12:00:00"
)verbosity: int
logging verbosity for job
Returns: tmlib.workflow.jobs.InitPhase
init job
-
create_init_phase
(submission_id, parent_id)¶ Creates a job collection for the “init” phase of the step.
Parameters: submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
Returns: tmlib.workflow.job.InitPhase
collection of “init” jobs
-
create_run_batches
(args)¶ Creates job descriptions with information required for processing of individual batch jobs of the parallel run phase.
Each batch is a mapping that must provide at least the following key-value pair:
- “id”: one-based job identifier number (int)
Additional key-value pairs may be provided, depending on requirements of the step.
Parameters: args: tmlib.workflow.args.BatchArguments
an instance of a step-specific implementation
BatchArguments
Returns: Union[List[dict], generator]]
job descriptions
-
create_run_jobs
(user_name, job_collection, verbosity, duration, memory, cores)¶ Creates jobs for the parallel “run” phase of the step.
Parameters: user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.RunPhase
empty collection of run jobs that should be populated
verbosity: int
logging verbosity for jobs
duration: str
computational time that should be allocated for a single job; in HH:MM:SS format
memory: int
amount of memory in Megabyte that should be allocated for a single job
cores: int
number of CPU cores that should be allocated for a single job
Returns: tmlib.workflow.jobs.RunPhase
collection of jobs
-
create_run_phase
(submission_id, parent_id)¶ Creates a job collection for the “run” phase of the step.
Parameters: submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
Returns: tmlib.workflow.job.SingleRunPhase
collection of “run” jobs
-
create_step
(submission_id, user_name, description)¶ Creates the workflow step.
Parameters: submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
description: tmlib.workflow.description.WorkflowStepDescription
Returns: tmlib.workflow.WorkflowStep
-
delete_previous_job_output
()¶ Deletes the output of a previous submission.
-
get_collect_batch
()¶ Get description for a
CollectJob
.Returns: Dict[str, Union[int, str, list, dict]]
job description
-
get_log_output
(phase, job_id=None)¶ Gets log outputs (standard output and error).
Parameters: phase: str
phase of the workflow step (options:
{"init", "run", "collect"}
)job_id: int, optional
one-based identifier number of “run” jobs (default:
None
)Returns: Dict[str, str]
“stdout” and “stderr” for the given job
-
get_run_batch
(job_id)¶ Get description for a
RunJob
.Parameters: job_id: int
one-based job identifier
Returns: Dict[str, Union[int, str, list, dict]]
job description
-
get_run_job_ids
()¶ Gets IDs of jobs of the run phase from persisted descriptions.
Returns: List[int]
job IDs
-
log_location
¶ str: location where log files are stored
-
print_job_descriptions
(batches)¶ Prints job descriptions to standard output in YAML format.
Parameters: batches: Dict[List[dict]]
description of inputs and outputs or individual jobs
-
run_job
(batch, assume_clean_state=False)¶ Runs an individual job.
Parameters: batch: dict
description of the job
assume_clean_state: bool, optional
assume that output of previous runs has already been cleaned up
-
step_location
¶ str: location were step-specific data is stored
-
step_name
¶ str: name of the step
-
store_collect_batch
(batch)¶ Persists description for a
CollectJob
.Parameters: batch: Dict[str, Union[int, str, list, dict]]
JSON serializable job description
-
tmlib.workflow.args module¶
-
class
tmlib.workflow.args.
Argument
(type, help, default=None, choices=None, flag=None, short_flag=None, required=False, disabled=False, get_choices=None, meta=None, dependency=())¶ Bases:
object
Descriptor class for an argument.
Parameters: type: type
type of the argument
help: str
help message that describes the argument
default: Union[str, int, float, bool], optional
default value (default:
None
)choices: set or list or function, optional
choices for value
flag: str, optional
alternative name that serves as a flag for command line usage; will be prepented with two hyphens
--
(default:None
; defaults to name of the argument when not provided)short_flag: str, optional
single letter that serves as an additional flag for command line usage; will be prepended with one hyphen
-
(default:None
)required: bool, optional
whether the argument is required (default:
False
)disabled: bool, optional
whether the argument should be disabled in the UI (default:
False
)get_choices: function, optional
function that takes an object of type
Experiment
and returns the choices in case they need to (and can) be determined dynamically (default:None
)meta: str, optional
alternative name of the argument displayed for command line options
dependency: tuple, optional
name-value pair of an argument the given argument depends on
-
add_to_argparser
(parser)¶ Adds the argument to an argument parser for use in a command line interface.
Parameters: parser: argparse.ArgumentParser
argument parser
Returns: argparse.ArgumentParser
parser with added arguments
-
name
¶ str: name of the argument
-
-
class
tmlib.workflow.args.
ArgumentCollection
(**kwargs)¶ Bases:
object
Abstract base class for an argument collection. The collection serves as a container for arguments that can be parsed to methods of
WorkflowStepCLI
decorated withclimethod
.Implementations of the class can be instantiated without having to implement a constructor, i.e. an __init__ method. The constructor accepts keyword arguments and strips the values from those arguments that are implemented as class attributes with type
Argument
and replaces any default values. When there is no default value specified, the argument has to be provided as a key-value pair. Derived classes can explicitly implement a constructor if required.Parameters: **kwargs: dict, optional
keyword arguments to overwrite
-
add_to_argparser
(parser)¶ Adds each argument to an argument parser for use in a command line interface.
Parameters: parser: argparse.ArgumentParser
argument parser
Returns: argparse.ArgumentParser
parser with added arguments
-
docstring
¶ str: docstring in NumPy style for the method to which the arguments should be passed; build based on the value of the help attribute and those of the type and help attributes of individual arguments
-
help
¶ str: brief description of the collection or the method to which the arguments should be passed
-
iterargitems
()¶ Iterates over the argument items stored as attributes of the instance.
-
classmethod
iterargs
()¶ Iterates over the class attributes of type
Argument
.
-
to_list
()¶ Returns class attributes of type
Argument
as an array of key-value pairs.Returns: List[dict]
description of each argument
-
union
(collection)¶ Adds all arguments contained in another collection.
Parameters: collection: tmlib.workflow.args.ArgumentCollection
collection of arguments that should be added
-
-
class
tmlib.workflow.args.
BatchArguments
(**kwargs)¶ Bases:
tmlib.workflow.args.ArgumentCollection
Base class for arguments that are used to define how the computational task should be devided into individual batch jobs for parallel processing on the cluster.
These arguments can be passed to a step-specific implementation of the
init
CLI method and will be parsed to thecreate_batches
API method.Parameters: **kwargs: dict, optional
keyword arguments to overwrite
-
class
tmlib.workflow.args.
CliMethodArguments
(**kwargs)¶ Bases:
tmlib.workflow.args.ArgumentCollection
Collection of arguments that can be passed to a method of a step-specific implemenation of
WorkflowStepCLI
, which are decoreated withclimethod
.Parameters: **kwargs: dict, optional
keyword arguments to overwrite
-
class
tmlib.workflow.args.
SubmissionArguments
(**kwargs)¶ Bases:
tmlib.workflow.args.ArgumentCollection
Base class for arguments that are used to control the submission of jobs to the cluster.
These arguments can be passed to a step-specific implementation of the
submit
CLI method and are parse to thecreate_jobs
API method.Parameters: **kwargs: dict, optional
keyword arguments to overwrite
-
cores
¶ int: number of cores that should be allocated to each “run” job (may be increased in case memory requirements of a job exceed resources of a single core)
-
duration
¶ str: walltime that should be allocated to a each “run” job in the format “HH:MM:SS” (may need to be adapted depending on the choice of batch size)
-
memory
¶ int: amount of memory in megabytes that should be allocated to each “run” job
-
tmlib.workflow.cli module¶
-
class
tmlib.workflow.cli.
WorkflowStepCLI
(api_instance, verbosity)¶ Bases:
tmlib.workflow.submission.WorkflowSubmissionManager
Abstract base class for command line interfaces.
Each workflow step must implement this class. The derived class will get automatically equipped with command line interface functionality in form of an instance of argparse.ArgumentParser. The docstring of the derived class is also used for the description attribute of the parser for display in the command line. A separate subparser is added for each method of the derived class that is decorated with
climethod
. The decorator provides descriptions of the method arguments, which are also added to the corresponding subparser.The
init
andsubmit
methods require additional step-specific arguments that are passed to the API methodscreate_run_batches
andcreate_run_jobs
, respectively. These arguments are handled separately, since they also need to be accessible outside the scope of the command line interace. They are provided by step-specific implementations ofBatchArguments
andSubmissionArguments
and added to the corresponding init and submit subparsers, respectively.Parameters: api_instance: tmlib.api.WorkflowStepAPI
instance of API class to which processing is delegated
verbosity: int
logging verbosity level
-
cleanup
()¶ Cleans up the output of a previous submission, i.e. removes files and database entries created by previously submitted jobs
-
collect
()¶ Collects the output of run jobs, i.e. performs a post-processing operation that either cannot be parallelized or needs to be performed afterwards
-
info
(phase, job_id)¶ Prints the description of a given batch job to the console
Parameters: phase: str
phase of the workflow step to which the job belongs
job_id: int
ID of the job for which information should be displayed
-
init
()¶ Creates batches for parallel processing and thereby defines how the computational task should be distrubuted over the cluster (also cleans up the output of previous submissions)
-
log
(phase, job_id)¶ Prints the log output of a given batch job to the console
Parameters: phase: str
phase of the workflow step to which the job belongs
job_id: int
ID of the job for which log output should be shown
-
name
¶ str: name of the step (command line program)
-
resubmit
(monitoring_depth, monitoring_interval)¶ Resubmits previously created jobs for “run” and “collect” phases to the cluster and monitors their status upon processing
Parameters: monitoring_depth: int
number of child tasks that should be monitored
monitoring_interval: int
seconds to wait between monitoring iterations
-
run
(job_id, assume_clean_state)¶ Runs an invidiual batch job on the local machine
Parameters: assume_clean_state: bool
assume that previous outputs have been cleaned up
job_id: int
ID of the job that should be run
-
submit
(monitoring_depth, monitoring_interval)¶ Creates batch jobs for the “run” and “collect” phases, submits them to the cluster and monitors their status upon processing (requires a prior “init”)
Parameters: monitoring_depth: int
number of child tasks that should be monitored
monitoring_interval: int
seconds to wait between monitoring iterations
-
tmlib.workflow.dependencies module¶
-
class
tmlib.workflow.dependencies.
CanonicalWorkflowDependencies
¶ Bases:
tmlib.workflow.dependencies.WorkflowDependencies
Declaration of dependencies for the canonical workflow.
-
INTER_STAGE_DEPENDENCIES
= {'pyramid_creation': set(['image_conversion', 'image_preprocessing']), 'image_conversion': {}, 'image_analysis': set(['image_conversion', 'image_preprocessing']), 'image_preprocessing': set(['image_conversion'])}¶ collections.OrderedDict[str, Set[str]]: dependencies between workflow stages
-
INTRA_STAGE_DEPENDENCIES
= {'metaextract': {}, 'imextract': set(['metaconfig']), 'metaconfig': set(['metaextract'])}¶ Dict[str, Set[str]: dependencies between workflow steps within one stage
-
STAGES
= ['image_conversion', 'image_preprocessing', 'pyramid_creation', 'image_analysis']¶ List[str]: names of workflow stages
-
STAGE_MODES
= {'pyramid_creation': 'sequential', 'image_conversion': 'sequential', 'image_analysis': 'sequential', 'image_preprocessing': 'parallel'}¶ Dict[str, str]: mode for each workflow stage, i.e. whether setps of a stage should be submitted in parallel or sequentially
-
STEPS_PER_STAGE
= {'pyramid_creation': ['illuminati'], 'image_conversion': ['metaextract', 'metaconfig', 'imextract'], 'image_analysis': ['jterator'], 'image_preprocessing': ['corilla']}¶ collections.OrderedDict[str, List[str]]: names of steps within each stage
-
-
class
tmlib.workflow.dependencies.
MultiplexingWorkflowDependencies
¶ Bases:
tmlib.workflow.dependencies.CanonicalWorkflowDependencies
Declaration of dependencies for a multiplexing workflow, which includes the
algin
step.-
STAGE_MODES
= {'pyramid_creation': 'sequential', 'image_conversion': 'sequential', 'image_analysis': 'sequential', 'image_preprocessing': 'sequential'}¶ Dict[str, str]: mode for each workflow stage, i.e. whether setps of a stage should be submitted in parallel or sequentially
-
STEPS_PER_STAGE
= {'pyramid_creation': ['illuminati'], 'image_conversion': ['metaextract', 'metaconfig', 'imextract'], 'image_analysis': ['jterator'], 'image_preprocessing': ['corilla', 'align']}¶ Dict[str, List[str]]: names of steps within each stage
-
-
class
tmlib.workflow.dependencies.
WorkflowDependencies
¶ Bases:
object
Abstract base class for declartion of workflow dependencies.
Derived classes will be used by descriptor classes in
tmlib.worklow.description
to declare a workflow type.In addtion, derived classes must implement the following attributes:
__type__
: name of the workflow typeSTAGES
(list): names of stages that the workflow should haveSTAGE_MODES
(dict): mapping of stage name to processing mode (either"parallel"
or"sequential"
)STEPS_PER_STAGE
(dict): ordered mapping of stage name to corresponding step namesINTER_STAGE_DEPENDENCIES
(dict): mapping of stage name to names of other stages the referenced stage depends onINTRA_STAGE_DEPENDENCIES
(dict): mapping of step name to names of other steps the referenced step depends on
-
tmlib.workflow.dependencies.
get_workflow_dependencies
(name)¶ Gets a workflow type specific implementation of
WorkflowDependencies
.Parameters: name: str
name of the workflow type
Returns: classobj
-
tmlib.workflow.dependencies.
get_workflow_type_information
()¶ Gets the names of each implemented workflow type.
Returns: Set[str]
names of workflow types
tmlib.workflow.description module¶
-
class
tmlib.workflow.description.
WorkflowDescription
(type, stages=None)¶ Bases:
object
Description of a TissueMAPS workflow.
A workflow consists of a sequence of stages, which are themselves composed of steps. Each step represents a collection of computational jobs, which can be submitted for parallel processing on a cluster.
In principle, workflow steps could be arranged in arbitrary order, since dependencies for each step are checked dynamically while the workflow progresses. If a dependency is not fullfilled upon progression to the next step, i.e. if a required input has not been generated by another upstream step, the workflow will stop. However, in order to be able to check the workflow logic before any jobs get submitted, workflows dependencies are checked according a specific implementation of
WorkflowDependencies
.Parameters: type: str
type of the workflow, i.e. a registered workflow dependency declaration in form of an implementation of
tmlib.workflow.dependencies.WorkflowDependencies
stages: List[dict]
description of workflow stages as a mapping of key-value pairs
-
add_stage
(stage_description)¶ Adds an additional stage to the workflow.
Parameters: stage_description: tmlib.workflow.description.WorkflowStageDescription
description of the stage that should be added
Raises: TypeError
when stage_description doesn’t have type
WorkflowStageDescription
-
jsonify
()¶ Returns attributes as key-value pairs endcoded as JSON.
Returns: str
JSON string encoding the description of the workflow as a mapping of key-value pairs
-
to_dict
()¶ Returns attributes as key-value pairs.
Returns: dict
-
-
class
tmlib.workflow.description.
WorkflowStageDescription
(type, name, mode, active=True, steps=None)¶ Bases:
object
Description of a TissueMAPS workflow stage.
Parameters: type: str
name of the workflow type
name: str
name of the stage
mode: str
mode of workflow stage submission, i.e. whether steps are submitted simultaneously or one after another (options:
{"sequential", "parallel"}
)active: bool, optional
whether the stage should be processed
steps: List[dict]
description of steps in form of key-value pairs
Raises: TypeError
when name or steps have the wrong type
-
add_step
(step_description)¶ Adds an additional step to the stage.
Parameters: step_description: tmlib.workflow.description.WorkflowStepDescription
description of the step that should be added
Raises: TypeError
when step_description doesn’t have type
WorkflowStepDescription
-
jsonify
()¶ Returns the attributes as key-value pairs encoded as JSON.
Returns: str
JSON string encoding the description of the stage as a mapping of key-value pairs
-
to_dict
()¶ Returns the attributes as key-value pairs.
Returns: dict
-
-
class
tmlib.workflow.description.
WorkflowStepDescription
(name, active=True, batch_args=None, submission_args=None)¶ Bases:
object
Description of a workflow step.
Parameters: name: str
name of the step
active: bool, optional
whether the step should be processed
batch_args: tmlib.workflow.args.BatchArguments, optional
batch arguments
submission_args: tmlib.workflow.args.SubmissionArguments, optional
submission arguments
Raises: WorkflowDescriptionError
when a provided argument is not a valid argument for the given step
-
batch_args
¶ tmlib.workflow.args.BatchArguments: batch arguments
-
jsonify
()¶ Returns attributes as key-value pairs encoded as JSON.
Returns: str
JSON string encoding the description of the step as a mapping of key-value pairs
-
submission_args
¶ tmlib.workflow.args.SubmissionArguments: submission arguments
-
to_dict
()¶ Returns attributes as key-value pairs.
Returns: dict
-
tmlib.workflow.jobs module¶
-
class
tmlib.workflow.jobs.
CollectJob
(step_name, arguments, output_dir, submission_id, user_name, parent_id)¶ Bases:
tmlib.workflow.jobs.WorkflowStepJob
Class for a collect jobs, which can be processed once all parallel jobs are successfully completed.
Parameters: step_name: str
name of the corresponding TissueMAPS workflow step
arguments: List[str]
command line arguments
output_dir: str
absolute path to the output directory, where log reports will be stored
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
CollectPhase
-
name
¶ str:name of the job
-
-
class
tmlib.workflow.jobs.
CollectPhase
(step_name, submission_id, parent_id, job=None)¶ Bases:
gc3libs.workflow.ParallelTaskCollection
,tmlib.workflow.jobs.JobCollection
Collection of jobs for the “collect” phase of a workflow step.
Parameters: step_name: str
name of the corresponding
WorkflowStep
submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
job: tmlibs.workflow.jobs.CollectJob, optional
job that should be processed (default:
None
)-
add
(job)¶ Adds a job to the phase.
Parameters: job: tmlibs.workflow.jobs.CollectJob
job that should be added
Raises: TypeError
when job has wrong type
ValueError
when the phase already contains another job
-
-
class
tmlib.workflow.jobs.
IndependentJobCollection
(step_name, submission_id, jobs=None)¶ Bases:
gc3libs.workflow.SequentialTaskCollection
,tmlib.workflow.jobs.JobCollection
Collection of jobs for manual submission of the run and collect phases of a workflow steps independent of the main workflow.
Parameters: step_name: str
name of the corresponding TissueMAPS workflow step
submission_id: int
ID of the corresponding submission
jobs: List[tmlibs.workflow.jobs.RunPhase or tmlibs.workflow.jobs.CollectJob], optional
list of jobs that should be processed (default:
None
)
-
class
tmlib.workflow.jobs.
InitJob
(step_name, arguments, output_dir, submission_id, user_name, parent_id)¶ Bases:
tmlib.workflow.jobs.WorkflowStepJob
Class for a init jobs, which creates the descriptions for the subsequent run and collect phases.
Parameters: step_name: str
name of the corresponding TissueMAPS workflow step
arguments: List[str]
command line arguments
output_dir: str
absolute path to the output directory, where log reports will be stored
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
InitPhase
-
name
¶ str:name of the job
-
-
class
tmlib.workflow.jobs.
InitPhase
(step_name, submission_id, parent_id, job=None)¶ Bases:
gc3libs.workflow.ParallelTaskCollection
,tmlib.workflow.jobs.JobCollection
Collection of jobs for the “init” phase of a workflow step.
Parameters: step_name: str
name of the parent
WorkflowStep
submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
job: tmlibs.workflow.jobs.InitJob, optional
job that should be processed (default:
None
)-
add
(job)¶ Adds a job to the phase.
Parameters: job: tmlibs.workflow.jobs.InitJob
job that should be added
Raises: TypeError
when job has wrong type
ValueError
when the phase already contains another job
-
-
class
tmlib.workflow.jobs.
JobCollection
¶ Bases:
object
Abstract base class for collections of individual jobs.
-
class
tmlib.workflow.jobs.
MultiRunPhase
(step_name, submission_id, parent_id, run_job_collections=[])¶ Bases:
gc3libs.workflow.AbortOnError
,gc3libs.workflow.SequentialTaskCollection
,tmlib.workflow.jobs.RunPhase
Collection of jobs for the “run” phase of workflow step that consits of multiple nested job collections that should be processed sequentially.
Parameters: step_name: str
name of the corresponding
WorkflowStep
submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
run_job_collections: List[tmlib.workflow.jobs.SingleRunPhase], optional
collections of run jobs that should be processed one after another
-
add
(run_job_collection)¶ Add a collection of run jobs to the phase.
Parameters: run_job_collection: tmlib.workflow.jobs.SingleRunPhase
collection of run jobs that should be added
Raises: TypeError
when run_job_collection has wrong type
-
-
class
tmlib.workflow.jobs.
RunJob
(step_name, arguments, output_dir, job_id, submission_id, user_name, parent_id, index=None)¶ Bases:
tmlib.workflow.jobs.WorkflowStepJob
Class for TissueMAPS run jobs, which can be processed in parallel.
Parameters: step_name: str
name of the corresponding TissueMAPS workflow step
arguments: List[str]
command line arguments
output_dir: str
absolute path to the output directory, where log reports will be stored
job_id: int
one-based job identifier number
submission_id: int
ID of the corresponding submission
parent_id: int
ID of the parent
RunPhase
index: int, optional
index of the run job collection in case the step has multiple run phases
-
name
¶ str: name of the job
-
-
class
tmlib.workflow.jobs.
RunPhase
¶ Bases:
tmlib.workflow.jobs.JobCollection
Abstract base class for a collection of jobs for the “run” phase of a workflow step.
-
class
tmlib.workflow.jobs.
SingleRunPhase
(step_name, submission_id, parent_id, jobs=None, index=None)¶ Bases:
gc3libs.workflow.ParallelTaskCollection
,tmlib.workflow.jobs.RunPhase
Collection of jobs for the “run” phase of workflow step that consits of a single job collection.
Parameters: step_name: str
name of the corresponding
WorkflowStep
submission_id: int
ID of the corresponding
Submission
parent_id: int
ID of the parent
WorkflowStep
jobs: List[tmlibs.workflow.jobs.RunJob], optional
list of jobs that should be processed (default:
None
)index: int, optional
index of the run job collection in case the step has multiple run phases
-
add
(job)¶ Adds a job to the phase.
Parameters: job: tmlibs.workflow.jobs.RunJob
job that should be added
Raises: TypeError
when job has wrong type
-
-
class
tmlib.workflow.jobs.
WorkflowStepJob
(step_name, arguments, output_dir, submission_id, user_name, parent_id)¶ Bases:
tmlib.jobs.Job
Abstract base class for an individual job as part of a workflow step phase.
Parameters: step_name: str
name of the corresponding TissueMAPS workflow step
arguments: List[str]
command line arguments
output_dir: str
absolute path to the output directory, where log reports will be stored
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
JobCollection
See also
-
name
¶ str:name of the job
-
tmlib.workflow.manager module¶
-
class
tmlib.workflow.manager.
WorkflowManager
(experiment_id, verbosity)¶ Bases:
tmlib.workflow.submission.WorkflowSubmissionManager
Command line interface for submitting, and monitoring TissueMAPS workflows.
Parameters: experiment_id: int
ID of the processed experiment
verbosity: int
logging verbosity level
-
resubmit
(monitoring_depth, stage)¶ Resumits a previously created workflow to the cluster and monitors its status.
Parameters: monitoring_depth: int
number of child tasks that should be monitored
stage: str
stage at which workflow should be submitted
-
submit
(monitoring_depth, monitoring_interval, force=False)¶ Creates a workflow, submits it to the cluster and monitors its progress.
Parameters: monitoring_depth: int
number of child tasks that should be monitored
monitoring_interval: int
query status of jobs every monitoring_interval seconds
force: bool, opional
whether inactivated stages and steps should be submitted anyways
-
tm_workflow¶
- Command line interface for submitting, and monitoring
- TissueMAPS workflows.
-
experiment_id
¶
ID of the experiment that should be processed
-
-h
,
--help
¶
show this help message and exit
-
--verbosity
,
-v
¶
increase logging verbosity
tm_workflow experiment_id resubmit¶
- resubmit a previously created workflow to the
- cluster and monitor its status
-
-h
,
--help
¶
show this help message and exit
-
--monitoring_depth
<monitoring_depth>
,
-m
<monitoring_depth>
¶ number of child tasks that should be monitored (default: 2)
-
--monitoring_interval
<monitoring_interval>
,
-i
<monitoring_interval>
¶ interval for monitoring interations (default: 10)
-
--stage
<stage>
,
-s
<stage>
¶ stage at which workflow should be resubmitted
tm_workflow experiment_id submit¶
- create a workflow, submit it to the cluster and
- monitor its status
-
-h
,
--help
¶
show this help message and exit
-
--monitoring_depth
<monitoring_depth>
,
-d
<monitoring_depth>
¶ number of child tasks that should be monitored (default: 2)
-
--monitoring_interval
<monitoring_interval>
,
-i
<monitoring_interval>
¶ interval for monitoring interations (default: 10)
-
--force
,
-f
¶
also submit inactivated stages and steps
tmlib.workflow.submission module¶
-
class
tmlib.workflow.submission.
WorkflowSubmissionManager
(experiment_id, program_name)¶ Bases:
tmlib.submission.SubmissionManager
Mixin class for submission and monitoring of workflows.
Parameters: experiment_id: int
ID of the processed experiment
program_name: str
name of the submitting program
-
get_task_id_of_last_submission
()¶ Gets the ID of the last submitted task for the given experiment and program.
Returns: int
ID of top task that was last submitted
-
submit_jobs
(jobs, engine, start_index=0, monitoring_depth=1, monitoring_interval=10)¶ Submits jobs to a cluster and continuously monitors their progress.
Parameters: jobs: tmlib.tmaps.workflow.WorkflowStep
jobs that should be submitted
engine: gc3libs.core.Engine
engine that should submit the jobs
start_index: int, optional
index of the job at which the collection should be (re)submitted
monitoring_depth: int, optional
recursion depth for job monitoring, i.e. in which detail subtasks in the task tree should be monitored (default:
1
)monitoring_interval: int, optional
seconds to wait between monitoring iterations (default:
10
)Returns: dict
information about each job
-
update_submission
(jobs)¶ Updates the submission with the submitted tasks.
Sets the value for “top_task” column in the “submissions” table.
Parameters: jobs: gc3libs.Task or gc3libs.workflow.TaskCollection
submitted tasks
Raises: AttributeError
when jobs doesn’t have a “persistent_id” attribute, which indicates that the task has not yet been inserted into the database table
See also
-
tmlib.workflow.utils module¶
-
tmlib.workflow.utils.
create_gc3pie_engine
(store)¶ Creates an Engine instance for submitting jobs for parallel processing.
Parameters: store: gc3libs.persistence.store.Store
GC3Pie store object
Returns: gc3libs.core.Engine
engine
-
tmlib.workflow.utils.
create_gc3pie_session
(location, store)¶ Creates a Session instance for job persistence in the PostgresSQL table
Parameters: location: str
path to a directory on disk for the file system representation of the store
store: gc3libs.persistence.store.Store
store instance
Returns: gc3libs.persistence.session.Session
GC3Pie session
-
tmlib.workflow.utils.
create_gc3pie_sql_store
()¶ Creates a Store instance for job persistence in the PostgreSQL table
Tasks
.Returns: gc3libs.persistence.sql.SqlStore
GC3Pie store
-
tmlib.workflow.utils.
format_stats_data
(stats)¶ For each task state (and pseudo-state like
ok
orfailed
), two values are returned: the count of managed tasks that were in that state when Engine.progress() was last run, and what percentage of the total managed tasks this is.Parameters: stats: gc3libs.core.Engine
as returned by
gc3libs.core.Engine.stats()
Returns: dict
global statistics about the jobs in the
Engine
-
tmlib.workflow.utils.
format_task_data
(name, type, created_at, updated_at, state, exitcode, memory, time, cpu_time)¶ Formats task data in the way expected by clients:
name
(str): name of tasktype
(str): type of the task objectcreated_at
(str): date and time when task was createdupdated_at
(str): date and time when task was last updatedstate
(str): state of the tasklive
(bool): whether the task is currently processeddone
(bool): whether the task is donefailed
(bool): whether the task failed, i.e. terminated with non-zero exitcodepercent_done
(float): percent of subtasks that are doneexitcode
(int): status code returned by the programtime
(str): duration as “HH:MM:SS”memory
(float): amount of used memory in MBcpu_time
(str): used cpu time as “HH:MM:SS”
Parameters: name: str
type: str
created_at: datetime.datetime
updated_at: datetime.datetime
state: g3clibs.Run.State
exitcode: int
memory: str
time: str
cpu_time: str
Returns: Dict[str, Union[str, int, bool]]
-
tmlib.workflow.utils.
format_timestamp
(elapsed_time)¶ Formats a timestamp in seconds to “HH:MM:SS” string.
Parameters: elapsed_time: float
elapsed time in seconds
Returns: str
formatted timestamp
-
tmlib.workflow.utils.
get_task_status_recursively
(task_id, recursion_depth=None, id_encoder=None)¶ Provides status information for each task and recursively for subtasks.
Parameters: task: gc3libs.workflow.TaskCollection or gc3libs.Task
submitted highest level GC3Pie task
recursion_depth: int, optional
recursion depth for subtask querying; by default data of all subtasks will be queried (default:
None
)id_encoder: function, optional
function that encodes task IDs
Returns: dict
information about each task and its subtasks
-
tmlib.workflow.utils.
log_task_failure
(task_info, logger)¶ Logs the failure of a submitted GC3Pie task.
Parameters: task_info: dict
information about each task and its subtasks
logger: logging.Logger
configured logger instance
-
tmlib.workflow.utils.
log_task_status
(task_info, logger, monitoring_depth)¶ Logs the status of a submitted GC3Pie task.
Parameters: task_info: dict
information about each task and its subtasks
logger: logging.Logger
configured logger instance
monitoring_depth: int
recursion depth for subtask querying
-
tmlib.workflow.utils.
print_task_status
(task_info)¶ Pretty prints the status of a submitted GC3Pie tasks to the console in table format.
Parameters: task_info: dict
information about each task and its subtasks
tmlib.workflow.workflow module¶
-
class
tmlib.workflow.workflow.
ParallelWorkflowStage
(name, experiment_id, verbosity, submission_id, user_name, parent_id, description=None)¶ Bases:
tmlib.workflow.workflow.WorkflowStage
,gc3libs.workflow.ParallelTaskCollection
,tmlib.workflow.workflow.State
A workflow stage whose workflow steps should be processed at once in parallel. The number of jobs must thus be known for each step in advance.
Parameters: name: str
name of the stage
experiment_id: int
ID of the processed experiment
verbosity: int
logging verbosity index
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
Workflow
description: tmlib.tmaps.description.WorkflowStageDescription, optional
description of the stage (default:
None
)-
add
(step)¶ Adds a step.
Parameters: step: tmlibs.tmaps.workflow.WorkflowStep
step that should be added
Raises: TypeError
when step has wrong type
-
-
class
tmlib.workflow.workflow.
SequentialWorkflowStage
(name, experiment_id, verbosity, submission_id, user_name, parent_id, description=None, waiting_time=0)¶ Bases:
gc3libs.workflow.SequentialTaskCollection
,tmlib.workflow.workflow.WorkflowStage
,tmlib.workflow.workflow.State
A workflow stage whose steps should be processed sequentially. The number of jobs is generally only known for the first step of the stage, but unknown for the subsequent steps, since their input depends on the output of the previous step. Subsequent steps are thus build dynamically upon transition from one step to the next.
Parameters: name: str
name of the stage
experiment_id: int
ID of the processed experiment
verbosity: int
logging verbosity index
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
Workflow
description: tmlib.tmaps.description.WorkflowStageDescription, optional
description of the stage (default:
None
)waiting_time: int, optional
time in seconds that should be waited upon transition from one stage to the other to avoid issues related to network file systems (default:
0
)-
next
(done)¶ Progresses to next step.
Parameters: done: int
zero-based index of the last processed step
Returns: gc3libs.Run.State
-
update_step
(index)¶ Updates the indexed step, i.e. creates new jobs for it.
Parameters: index: int
index for the list of tasks (steps)
-
-
class
tmlib.workflow.workflow.
State
¶ Bases:
object
Mixin class that provides convenience properties to determine whether a task is in a given state.
-
is_new
¶ bool: whether the job is state NEW
-
is_running
¶ bool: whether the step is in state RUNNING
-
is_stopped
¶ bool: whether the step is in state STOPPED
-
is_submitted
¶ bool: whether the step is in state SUBMITTED
-
is_terminated
¶ bool: whether the step is in state TERMINATED
-
-
class
tmlib.workflow.workflow.
Workflow
(experiment_id, verbosity, submission_id, user_name, description, waiting_time=0)¶ Bases:
gc3libs.workflow.SequentialTaskCollection
,tmlib.workflow.workflow.State
A workflow represents a computational pipeline that gets dynamically assembled from individual stages based on a user provided description.
Parameters: experiment_id: int
ID of processed experiment
verbosity: int
logging verbosity index
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
description: tmlib.tmaps.description.WorkflowDescription
description of the workflow
waiting_time: int, optional
time in seconds that should be waited upon transition from one stage; required with certain network file systems settings (default:
0
)-
n_stages
¶ int: total number of active stages
-
next
(done)¶ Progresses to next stage.
Parameters: done: int
zero-based index of the last processed stage
Returns: gc3libs.Run.State
-
update_description
(description)¶ Updates the workflow description by removing inactive stages/steps from the description that will ultimately be used to dynamically build stages upon processing.
Parameters: description: tmlib.tmaps.description.WorkflowDescription
description of the workflow
Raises: TypeError
when description doesn’t have type
WorkflowDescription
-
update_stage
(index)¶ Updates the indexed stage, i.e. creates the individual computational jobs for each step of the stage.
Parameters: index: int
index for the list of tasks (stages)
-
-
class
tmlib.workflow.workflow.
WorkflowStage
(name, experiment_id, verbosity, submission_id, user_name, parent_id, description)¶ Bases:
tmlib.workflow.workflow.State
Base class for TissueMAPS workflow stages. A workflow stage is composed of one or more workflow steps, which together comprise an abstract computational task.
Parameters: name: str
name of the stage
experiment_id: int
ID of the processed experiment
verbosity: int
logging verbosity index
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
Workflow
description: tmlib.tmaps.description.WorkflowStageDescription
description of the stage
Raises: TypeError
when description doesn’t have the correct type
-
n_steps
¶ int: number of steps in the stage
-
-
class
tmlib.workflow.workflow.
WorkflowStep
(name, experiment_id, verbosity, submission_id, user_name, parent_id, description)¶ Bases:
gc3libs.workflow.AbortOnError
,gc3libs.workflow.SequentialTaskCollection
,tmlib.workflow.workflow.State
A workflow step represents a collection of computational tasks that should be processed in parallel on a cluster, i.e. one parallelization step within a larger workflow.
Parameters: name: str
name of the step
experiment_id: int
ID of the processed experiment
verbosity: int
logging verbosity index
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int
ID of the parent
WorkflowStage
description: tmlib.tmaps.description.WorkflowStepDescription
description of the step
-
collect_phase
¶ tmlib.workflow.jobs.CollectPhase: collection of jobs for “collect” phase
-
init_phase
¶ tmlib.workflow.jobs.InitJob: collection of job for the “init” phase
-
initialize
()¶ Initializes the step, i.e. generates the jobs for the different phases.
-
next
(done)¶ Progresses to the next phase.
Parameters: done: int
zero-based index of the last processed phase
Returns: gc3libs.Run.State
-
run_phase
¶ tmlib.workflow.jobs.RunPhase: collection of jobs for the “run” phase
-