Source code for cerise.back_end.job_planner

import logging
from typing import Dict, cast

import cerulean

from cerise.back_end.cwl import (get_required_num_cores, get_time_limit,
                                 get_workflow_step_names)
from cerise.job_store.sqlite_job_store import SQLiteJobStore


[docs]class InvalidJobError(RuntimeError): pass
[docs]class JobPlanner: """Handles workflow execution requirements. This class keeps track of which hardware is needed for each available step, then analyses a workflow and decides which resources it needs based on this. """ def __init__(self, job_store: SQLiteJobStore, local_api_dir: cerulean.Path): """Create a JobPlanner. Args: job_store: The job store to act on. local_api_dir: Path of local api directory. """ self._logger = logging.getLogger(__name__) """A logger for this object.""" self._job_store = job_store """The job store to act on.""" self._steps_requirements = dict() # type: Dict[str, Dict[str, int]] """Requirements per step, keyed by step name and requirement name. """ self._get_steps_resource_requirements(local_api_dir)
[docs] def plan_job(self, job_id: str) -> None: """Figures out which resources a job needs. Resources are identified by strings. Currently, there is ``num_cores``, the number of cores to run on, and ``time_limit``, the amount of time to reserve in seconds. Args: job_id: Id of the job to plan. """ with self._job_store: job = self._job_store.get_job(job_id) steps = get_workflow_step_names(cast(bytes, job.workflow_content)) for step in steps: if step not in self._steps_requirements: job.error('Found invalid step {} in workflow'.format(step)) raise InvalidJobError('Invalid step in workflow') job.required_num_cores = get_required_num_cores( cast(bytes, job.workflow_content)) num_cores_steps = [ self._steps_requirements[step]['num_cores'] for step in steps ] if max(num_cores_steps) > 0: job.required_num_cores = max(num_cores_steps) job.time_limit = get_time_limit(cast(bytes, job.workflow_content)) time_limit_steps = [ self._steps_requirements[step]['time_limit'] for step in steps ] job.time_limit = max(job.time_limit, sum(time_limit_steps))
def _get_steps_resource_requirements(self, local_api_dir: cerulean.Path) -> None: """Scan CWL steps and extract resource requirements. Args: local_api_dir: The local directory with the API """ for project_dir in local_api_dir.iterdir(): local_steps_dir = project_dir / 'steps' for this_dir, _, files in local_steps_dir.walk(): for filename in files: if filename.endswith('.cwl'): self._logger.debug( 'Scanning file for requirements: {}'.format( this_dir / filename)) rel_this_dir = this_dir.relative_to( str(local_steps_dir)) step_name = str(rel_this_dir / filename) step_contents = (this_dir / filename).read_bytes() step_num_cores = get_required_num_cores(step_contents) step_time_limit = get_time_limit(step_contents) if step_name not in self._steps_requirements: self._steps_requirements[step_name] = dict() self._steps_requirements[step_name][ 'num_cores'] = step_num_cores self._steps_requirements[step_name][ 'time_limit'] = step_time_limit self._logger.debug('Step {} requires {} cores'.format( step_name, step_num_cores))