Source code for cerise.back_end.execution_manager

import logging
import time
import traceback
from typing import cast

import cerulean
from paramiko.ssh_exception import SSHException  # type: ignore

from cerise.back_end.cwl import get_cwltool_result, is_workflow
from cerise.back_end.job_planner import InvalidJobError, JobPlanner
from cerise.back_end.job_runner import JobRunner
from cerise.back_end.local_files import ConnectionError, LocalFiles
from cerise.back_end.remote_api import RemoteApi
from cerise.back_end.remote_job_files import RemoteJobFiles
from cerise.config import Config
from cerise.job_store.job_state import JobState
from cerise.job_store.sqlite_job import SQLiteJob
from cerise.job_store.sqlite_job_store import SQLiteJobStore

[docs]class ExecutionManager: """Handles the execution of jobs on the remote resource. The execution manager monitors the job store for files that are ready to be staged in, started, cancelled, staged out, or deleted, and performs the required activity. It also monitors the remote resource, ensuring that any remote state changes are propagated to the job store correctly. """ def __init__(self, config: Config, local_api_dir: cerulean.Path) -> None: """Set up the execution manager. Args: config: The configuration. local_api_dir: The path to the local API directory. """ self._logger = logging.getLogger(__name__) self._update_available = False """Whether the installed API is older than the local one.""" self._shutting_down = False """True iff we're shutting down.""" self._job_store = SQLiteJobStore(config.get_database_location()) """The job store to use.""" self._local_files = LocalFiles(self._job_store, config) """The local files manager.""" self._remote_api = RemoteApi(config, local_api_dir) """The remote API manager.""" self._remote_refresh = config.get_remote_refresh() self._job_planner = JobPlanner(self._job_store, local_api_dir) """Determines required hardware resources.""" self._remote_job_files = RemoteJobFiles(self._job_store, config) """The remote job files manager.""" remote_cwlrunner = self._remote_api.translate_runner_location( config.get_remote_cwl_runner()) self._job_runner = JobRunner(self._job_store, config, remote_cwlrunner) """The job runner submits jobs and checks on them.""" # recover database from crash with self._job_store: for job in self._job_store.list_jobs(): if job.state == JobState.STAGING_IN: self._remote_job_files.delete_job( job.state = JobState.SUBMITTED if job.state == JobState.STAGING_OUT: self._local_files.delete_output_dir( job.state = JobState.FINISHED if job.state == JobState.WAITING_CR: self._job_runner.cancel_job( if job.state == JobState.RUNNING_CR: self._job_runner.cancel_job( # Check for updates self._update_available = self._remote_api.update_available() if self._update_available:'Specialisation update available')'Started back-end')
[docs] def shutdown(self) -> None: """Requests the execution manager to execute a clean shutdown.""" self._logger.debug('Shutdown requested') self._shutting_down = True
def _delete_job(self, job_id: str, job: SQLiteJob) -> None: """Delete a job. Deletes the job from the compute resource, and if it was destaged, also from the local file store. Prerequisite: the job is in a final state. Args: job_id: The id of the job job: The job object """ self._logger.debug('Deleting job ' + job_id) self._remote_job_files.delete_job(job_id) self._local_files.delete_output_dir(job_id) self._job_store.delete_job(job_id) def _cancel_job(self, job_id: str, job: SQLiteJob) -> None: """Cancel a job. If the job is running, the cancellation request may take some time to process by the compute resource. In this case, the job will remain in RUNNING_CR. Otherwise, it will be cancelled immediately, and be put in CANCELLED. Precondition: Job is in a _CR state. Postcondition: Job is in CANCELLED or RUNNING_CR. Args: job_id: The id of the job job: The job object """'Cancelling job') if self._job_runner.cancel_job(job_id): job.state = JobState.RUNNING_CR else: job.state = JobState.CANCELLED'Job cancelled') def _stage_and_start_job(self, job_id: str, job: SQLiteJob) -> None: """Stages, plans and starts a job. Precondition: Job is in STAGING_IN state Postcondition: Job is in WAITING, PERMANENT_FAILURE, CANCELLED, or WAITING_CR Args: job_id: The id of the job job: The job object """ try:'Resolving inputs') input_files = self._local_files.resolve_input(job_id) except FileNotFoundError as e: job.error('Input not found, failing with PermanentFailure') job.state = JobState.PERMANENT_FAILURE return except ValueError as e: job.error('Invalid input: {}'.format(e.args[0])) job.state = JobState.PERMANENT_FAILURE return if not is_workflow(cast(bytes, job.workflow_content)): job.error('Input is not a CWL workflow') job.state = JobState.PERMANENT_FAILURE return if job.try_transition(JobState.STAGING_IN_CR, JobState.CANCELLED):'Job was cancelled while resolving input') return'Resolved input, now planning') try: self._job_planner.plan_job(job_id) except InvalidJobError: job.error('Job is invalid') job.state = JobState.PERMANENT_FAILURE return if job.state == JobState.PERMANENT_FAILURE: return'Planned job, now staging in inputs') workflow_content = self._remote_api.translate_workflow( cast(bytes, job.workflow_content)) try: self._remote_job_files.stage_job(job_id, input_files, workflow_content) except FileNotFoundError as e: job.error('Input not found, failing with PermanentFailure') job.state = JobState.PERMANENT_FAILURE return'Staged job, now starting')'API versions:') for project_version in self._remote_api.get_projects():' {}'.format(project_version)) self._job_runner.start_job(job_id)'Started job') if not (job.try_transition(JobState.STAGING_IN, JobState.WAITING) or job.try_transition(JobState.STAGING_IN_CR, JobState.WAITING_CR)): self._logger.critical( 'Something odd happened while staging and starting') self._logger.critical('State is now {}'.format(job.state)) job.state = JobState.SYSTEM_ERROR def _destage_job(self, job_id: str, job: SQLiteJob) -> None: """Get job results back from the compute resource. Precondition: Job is in FINISHED Postcondition: Job is in SUCCESS, TEMPORARY_FAILURE, PERMANENT_FAILURE or CANCELLED Args: job_id: The job's id job: The job object """ result = get_cwltool_result(job.remote_error) if job.try_transition(JobState.FINISHED, JobState.STAGING_OUT):'Starting destaging of results') output_files = self._remote_job_files.destage_job_output( job_id) self._local_files.publish_job_output(job_id, output_files)'Results downloaded and available') if not (job.try_transition(JobState.STAGING_OUT, result) or job.try_transition(JobState.STAGING_OUT_CR, JobState.CANCELLED)): job.state = JobState.SYSTEM_ERROR def _process_jobs(self, check_remote: bool) -> bool: """ Go through the jobs and do what needs to be done. Args: check_remote: Whether to access the remote compute resource to check on jobs. Returns: True iff there are currently running jobs. """ # If we don't check remote, assume that we have running jobs, # so that we don't install updates while jobs are running. have_running_jobs = not check_remote jobs = self._job_store.list_jobs() for job_id in [ for job in jobs]: if self._shutting_down: break try: job = self._job_store.get_job(job_id) previous_state = job.state self._logger.debug('Processing job ' + job_id + ' with current state ' + job.state.value) if check_remote and JobState.is_remote(job.state): self._logger.debug('Checking remote state') self._job_runner.update_job(job_id) self._remote_job_files.update_job(job_id) job = self._job_store.get_job(job_id) have_running_jobs = ( have_running_jobs or JobState.is_remote(job.state) ) if job.state == JobState.FINISHED: self._destage_job(job_id, job) if not self._update_available: if job.try_transition(JobState.SUBMITTED, JobState.STAGING_IN): self._stage_and_start_job(job_id, job) self._logger.debug('Staged and started job') if JobState.cancellation_active(job.state): self._cancel_job(job_id, job) self._logger.debug('State is now ' + job.state.value) if job.please_delete and JobState.is_final(job.state): self._delete_job(job_id, job) except (ConnectionError, IOError, EOFError, OSError, SSHException ) as e: self._logger.debug('System exception while processing job:' ' {}'.format(e)) if isinstance(e, IOError) or isinstance(e, OSError): if ('Socket' not in str(e) and 'Network' not in str(e) and 'Temporary' not in str(e) and 'Timeout opening channel' not in str(e)): job.error('An IO error occurred while processing the' ' job: {}. Please check that your network' ' connection works, and that you have enough' ' disk space or quota on the remote machine.' ''.format(e)) job.state = JobState.SYSTEM_ERROR self._logger.critical('An internal error occurred when' ' processing job ' + self._logger.critical(traceback.format_exc()) return False job = self._job_store.get_job(job_id) job.debug('Connection problem with remote resource: {}, will' ' try again later'.format(e.args[0])) job.state = previous_state have_running_jobs = True except: job.state = JobState.SYSTEM_ERROR self._logger.critical( 'An internal error occurred when processing job ' + self._logger.critical(traceback.format_exc()) return have_running_jobs
[docs] def execute_jobs(self) -> None: """Run the main backend execution loop. This repeatedly processes jobs, but does not check the remote compute resource more often than specified in the remote_refresh configuration parameter. """ with self._job_store: last_active = time.perf_counter() - self._remote_refresh - 1 # Handler in run_back_end throws KeyboardInterrupt in order to # break the sleep call; catch it to exit gracefully try: while not self._shutting_down: now = time.perf_counter() check_remote = now - last_active > self._remote_refresh have_running_jobs = self._process_jobs(check_remote) if not have_running_jobs and self._update_available: self._remote_api.install() self._update_available = False if check_remote: last_active = time.perf_counter() time.sleep(0.1) except KeyboardInterrupt: pass self._logger.debug('Shutting down')