import logging
from time import time
from cerise.job_store.job_state import JobState
[docs]class MockJob:
"""This class provides an in-memory implementation of a job.
It's used for testing, so that we don't need to bother with a
database there.
"""
# Attributes:
def __init__(self, job_id, name, workflow, job_input):
"""Creates a new Job object.
The state of a newly created job is JobState.SUBMITTED.
Args:
id (str): The id of the job, a string containing a GUID
name (str): The name of the job, as given by the user
workflow (str): The URI of the workflow file
job_input (str): An input definition for the job
"""
# General description
self.id = job_id
"""str: Job id, a string containing a UUID."""
self.name = name
"""str: Name, as specified by the submitter."""
self.workflow = workflow
"""str: Workflow file URI, as specified by the submitter."""
self.local_input = job_input
"""str: Input JSON string, as specified by the submitter."""
# Current status
self.state = JobState.SUBMITTED
"""JobState: Current state of the job."""
self.resolve_retry_count = 0
"""int: Number of times we've tried to resolve inputs."""
self.please_delete = False
"""bool: Whether deletion of the job has been requested."""
self.log = ''
"""str: Log output as of last update."""
self.remote_output = ''
"""str: cwl-runner output as of last update."""
self.remote_error = ''
"""str: cwl-runner stderr output as of last update."""
# Post-resolving data
self.workflow_content = None
"""Union[bytes, NoneType]: The content of the workflow
description file, or None if it has not been resolved yet.
"""
self.required_num_cores = 0
"""The number of cores to reserve for this workflow.
If 0, use cluster default."""
self.time_limit = 0
"""The time to reserve, in seconds.
If 0, use cluster default."""
# Post-staging data
self.remote_workdir_path = ''
"""str: The absolute remote path of the working directory."""
self.remote_workflow_path = ''
"""str: The absolute remote path of the CWL workflow file."""
self.remote_input_path = ''
"""str: The absolute remote path of the input description file."""
self.remote_stdout_path = ''
"""str: The absolute remote path of the standard output dump."""
self.remote_stderr_path = ''
"""str: The absolute remote path of the standard error dump."""
self.remote_system_out_path = ''
"""str: The absolute remote path of the system output dump."""
self.remote_system_err_path = ''
"""str: The absolute remote path of the standard error dump."""
# Post-destaging data
self.local_output = ''
"""str: The serialised JSON output object describing the
destaged outputs.
"""
# Internal data
self.remote_job_id = None
"""str: The id the remote scheduler gave to this job."""
# Logging
self.__log = list()
"""list: The job's execution log: [(level, time, msg)]."""
[docs] def try_transition(self, from_state, to_state):
"""Attempts to transition the job's state to a new one.
If the current state equals from_state, it is set to to_state,
and True is returned, otherwise False is returned and the
current state remains what it was.
Args:
from_state (JobState): The expected current state
to_state (JobState): The desired next state
Returns:
True iff the transition was successful.
"""
if self.state == from_state:
self.state = to_state
return True
return False
[docs] def add_log(self, level, message):
"""Add a message to the job's log.
Args:
level (logging.LogLevel): Level of importance
message (str): The log message.
"""
self.__log.append((level, time(), message))
[docs] def debug(self, message):
"""Add a message to the job's log at level DEBUG.
Args:
message (str): The log message.
"""
self.add_log(logging.DEBUG, message)
[docs] def info(self, message):
"""Add a message to the job's log at level INFO.
Args:
message (str): The log message.
"""
self.add_log(logging.INFO, message)
[docs] def warning(self, message):
"""Add a message to the job's log at level WARNING.
Args:
message (str): The log message.
"""
self.add_log(logging.WARNING, message)
[docs] def error(self, message):
"""Add a message to the job's log at level ERROR.
Args:
message (str): The log message.
"""
self.add_log(logging.ERROR, message)
[docs] def critical(self, message):
"""Add a message to the job's log at level CRITICAL.
Args:
message (str): The log message.
"""
self.add_log(logging.CRITICAL, message)