"""Job submission setup."""
import shutil
import os
import subprocess
from abc import ABC, abstractmethod
import logging
[docs]class EcflowSubmitTask(object):
"""Submit class for ecflow."""
def __init__(self, task, env_submit, server, joboutdir,
stream=None, dbfile=None, interpreter="#!/usr/bin/env python3",
ensmbr=None, submit_exceptions=None, coldstart=False, env_file=None):
"""Construct a ecflow submission task.
Args:
task (scheduler.EcflowTask): Task
env_submit (_type_): _description_
server (scheduler.Server): Server
joboutdir (_type_): _description_
stream (int, optional): Stream. Defaults to None.
dbfile (str, optional): Data base for monitoring. Defaults to None.
interpreter (str, optional): Python interpreter. Defaults to "#!/usr/bin/env python3".
ensmbr (str, optional): Ensemble member. Defaults to None.
submit_exceptions (dict, optional): Task submission exceptions. Defaults to None.
coldstart (bool, optional): Cold start. Defaults to False.
env_file (str, optional): Environment file. Defaults to None.
"""
self.task = task
self.env_file = env_file
self.ecflow_server = server
self.coldstart = coldstart
self.ensmbr = ensmbr
if self.ensmbr is not None:
if ensmbr < 0:
self.ensmbr = None
self.db_file = dbfile
self.stream = stream
self.complete = False
self.debug = True
# Parse Env_submit
self.task_settings = TaskSettings(self.task, env_submit, joboutdir, interpreter=interpreter,
submit_exceptions=submit_exceptions, coldstart=False)
self.sub = get_submission_object(self.task, self.task_settings, self.ecflow_server,
db_file=self.db_file)
[docs] def write_trailer(self, file_handler):
"""Write trailer in job file.
Args:
file_handler (_type_): _description_
"""
if self.task_settings.trailer is not None:
for value in self.task_settings.trailer.values():
file_handler.write(str(value) + "\n")
[docs] def write_job(self):
"""Write job file."""
logging.info("Job file: %s", self.task_settings.ecf_job)
ecf_job = self.task_settings.ecf_job
fname = ecf_job + ".tmp"
shutil.move(ecf_job, fname)
with open(ecf_job, mode="w", encoding="utf-8") as file_handler:
wrapper, host = self.write_header(file_handler)
with open(fname, mode="r", encoding="utf-8") as job_fh:
for line in job_fh.readlines():
line = line.replace("@WRAPPER_TO_BE_SUBSTITUTED@", wrapper)
line = line.replace("@HOST_TO_BE_SUBSTITUTED@", host)
file_handler.write(line)
self.write_trailer(file_handler)
os.system("chmod u+x " + ecf_job)
[docs] def submit(self):
"""Sumit task."""
try:
if "OUTPUT" not in self.task_settings.header:
self.task_settings.header.update({"OUTPUT": self.sub.set_output()})
if "NAME" not in self.task_settings.header:
self.task_settings.header.update({"NAME": self.sub.set_job_name()})
logging.debug("write")
self.write_job()
if self.ecflow_server is None:
raise Exception("You must set server to submit!")
if self.complete:
logging.debug("force_complete")
self.ecflow_server.force_complete(self.task)
else:
logging.debug("sub.set_submit_cmd")
self.sub.set_submit_cmd()
logging.debug("submit_job")
self.sub.submit_job()
logging.debug("set_jobid")
self.sub.set_jobid()
logging.debug("job_id")
self.task.submission_id = self.sub.job_id
logging.debug("update_submission_id")
self.ecflow_server.update_submission_id(self.task)
except RuntimeError:
# Supposed to handle abort self unless killed
pass
except Exception as error:
msg = f"Submission failed {str(error)}"
raise SubmitException(msg, self.task, self.task_settings) from error
[docs]class TaskSettings(object):
"""Set the task specific setttings."""
def __init__(self, task, submission_defs, joboutdirs, submit_exceptions=None,
interpreter="#!/usr/bin/env python3",
complete=False, coldstart=False):
"""Construct the task specific settings.
Args:
task (scheduler.EcflowTask): Task
submission_defs (dict): Submission definitions
joboutdirs (_type_): _description_
submit_exceptions (_type_, optional): _description_. Defaults to None.
interpreter (str, optional): Python interpreter. Defaults to "#!/usr/bin/env python3".
complete (bool, optional): Set to complete. Defaults to False.
coldstart (bool, optional): Coldstart. Defaults to False.
Raises:
Exception: _description_
Exception: _description_
Exception: _description_
"""
self.task = task
self.submission_defs = submission_defs
self.header = {}
self.trailer = {}
self.wrapper = None
self.submit_type = "background"
self.interpreter = interpreter
self.complete = complete
self.remote_submit_cmd = None
self.remote_status_cmd = None
self.remote_kill_cmd = None
self.coldstart = coldstart
self.host = None
self.submit_variables = None
if submit_exceptions is not None:
self.check_exceptions(submit_exceptions)
self.task_settings = self.parse_submission_defs()
self.process_settings()
if self.host is None:
raise Exception("Host number is mandatory!")
if "0" in joboutdirs:
joboutdir = joboutdirs["0"]
else:
raise Exception("No joboutdir defined for HOST 0")
print("Task HOST is ", self.host)
print(__file__)
if self.host in joboutdirs:
joboutdir_at_host = joboutdirs[self.host]
else:
raise Exception("No joboutdir found for host " + self.host)
self.joboutdir = joboutdir
self.joboutdir_at_host = joboutdir_at_host
self.ecf_job = self.task.create_ecf_job(joboutdir)
self.ecf_job_at_host = self.task.create_ecf_job(joboutdir_at_host)
self.ecf_jobout = self.task.create_ecf_jobout(joboutdir)
self.ecf_jobout_at_host = self.task.create_ecf_jobout(joboutdir_at_host)
[docs] def check_exceptions(self, submit_exceptions):
"""Possibility to create submission exceptions.
Args:
submit_exceptions (_type_): _description_
"""
if submit_exceptions is not None:
for state in submit_exceptions:
if state == "complete":
if "task" in submit_exceptions[state]:
for task in submit_exceptions[state]["task"]:
if task == self.task.ecf_task:
if submit_exceptions[state]["task"][task] == "is_coldstart":
if self.coldstart:
self.complete = f"Task {task} complete due to cold start"
if "family" in submit_exceptions[state]:
for family in submit_exceptions[state]["family"]:
for ecf_family in self.task.ecf_families:
if family == ecf_family:
if submit_exceptions[state]["family"][family] == "is_coldstart":
if self.coldstart:
self.complete = f"Family {family} complete due to " \
"cold start"
[docs] def process_settings(self):
"""Process the settings."""
for key, value in self.task_settings.items():
# print("key=", key, " value=", value)
if key == "TRAILER":
self.trailer.update({key: value})
else:
if key == "SUBMIT_TYPE":
if value != "":
self.submit_type = value
elif key == "SSH":
self.remote_submit_cmd = value
self.remote_status_cmd = value
self.remote_kill_cmd = value
elif key == "INTERPRETER":
self.interpreter = value
elif key == "SUBMIT_VARIABLES":
self.submit_variables = value
elif key == "WRAPPER":
self.wrapper = value
elif key == "HOST":
self.host = str(value)
if self.host != "0" and self.host != "1":
raise Exception("Expected a single or dual-host system. HOST=", self.host)
else:
self.header.update({key: value})
[docs] def parse_submission_defs(self):
"""Parse the submssion definitions."""
ecf_task = self.task.ecf_task
task_settings = {}
# print("parse", ecf_task)
# print(self.submission_defs)
all_defs = self.submission_defs
print(all_defs)
submit_types = all_defs["submit_types"]
default_submit_type = all_defs["default_submit_type"]
task_submit_type = None
for s_t in submit_types:
if s_t in all_defs and "tasks" in all_defs[s_t]:
for tname in all_defs[s_t]["tasks"]:
if tname == ecf_task:
task_submit_type = s_t
if task_submit_type is None:
task_submit_type = default_submit_type
if task_submit_type in all_defs:
for setting in all_defs[task_submit_type]:
if setting != "tasks":
task_settings.update({setting: all_defs[task_submit_type][setting]})
ex = "task_exceptions"
if ex in all_defs:
for tname in all_defs[ex]:
if tname == ecf_task:
for setting in all_defs[ex][tname]:
task_settings.update({setting: all_defs[ex][tname][setting]})
# print(task_settings)
return task_settings
[docs]class SubmitException(Exception):
"""Submit exception."""
[docs] def __init__(self, msg, task, task_settings):
"""Construct SubmitException.
Args:
msg (str): Exception message.
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
"""
Exception().__init__()
logfile = task.create_submission_log(task_settings.joboutdir)
with open(logfile, mode="a", encoding="utf-8") as file_handler:
file_handler.write(msg)
file_handler.flush()
print(msg)
exit(0)
[docs]class KillException(Exception):
"""Kill exception."""
[docs] def __init__(self, msg, task, task_settings):
"""Construct KillException.
Args:
msg (str): Exception message.
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
"""
Exception().__init__()
logfile = task.create_kill_log(task_settings.joboutdir)
with open(logfile, mode="a", encoding="utf-8") as file_handler:
file_handler.write(msg)
file_handler.flush()
print(msg)
exit(0)
[docs]class StatusException(Exception):
"""Status exception."""
[docs] def __init__(self, msg, task, task_settings):
"""Construct status exception.
Args:
msg (str): Exception message.
task (scheduler.EcflowTask): task.
task_settings (TaskSettings): Task settings
"""
Exception().__init__()
# joboutdir = task.joboutdir
logfile = task.create_status_log(task_settings.joboutdir)
with open(logfile, mode="a", encoding="utf-8") as file_handler:
file_handler.write(msg)
file_handler.flush()
print(msg)
exit(0)
[docs]def get_submission_object(task, task_settings, server, db_file=None):
"""Get the submission object constructed from a submit type.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings
server (scheduler.Server): Server.
db_file (str, optional): Data base for monitoring.. Defaults to None.
Raises:
NotImplementedError: _description_
Returns:
SubmissionBaseClass: Return a submission object.
"""
submit_type = task_settings.submit_type
logging.info("Submit type: %s", submit_type)
if submit_type.lower() == "pbs":
sub = PBSSubmission(task, task_settings, server, db_file=db_file)
elif submit_type.lower() == "slurm":
sub = SlurmSubmission(task, task_settings, server, db_file=db_file)
elif submit_type.lower() == "grid_engine":
sub = GridEngineSubmission(task, task_settings, server, db_file=db_file)
elif submit_type == "background":
sub = BackgroundSubmission(task, task_settings, server, db_file=db_file)
else:
raise NotImplementedError
return sub
class SubmissionBaseClass(ABC):
"""An abstract class for submssion to be implemented by all children.
Args:
ABC (_type_): _description_
"""
def __init__(self, task, task_settings, server, db_file=None, remote_submit_cmd=None,
remote_kill_cmd=None,
remote_status_cmd=None):
"""Construct the base class.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
server (scheduler.Server): Server.
db_file (str, optional): Data base for monitoring. Defaults to None.
remote_submit_cmd (str, optional): Remote submit command. Defaults to None.
remote_kill_cmd (str, optional): Remote kill command. Defaults to None.
remote_status_cmd (str, optional): Remote status command. Defaults to None.
"""
self.task = task
self.task_settings = task_settings
self.server = server
self.db_file = db_file
self.process = None
self.job_id = None
if task.submission_id is not None:
self.job_id = task.submission_id
self.submit_cmd = None
self.kill_job_cmd = None
self.job_status_cmd = None
self.remote_submit_cmd = remote_submit_cmd
self.remote_kill_cmd = remote_kill_cmd
self.remote_status_cmd = remote_status_cmd
def update_db(self, job_id):
"""Update the data base.
Args:
job_id (str): Job identifier.
"""
if self.db_file is not None:
with open(self.db_file, mode="a", encoding="utf-8") as file_handler:
file_handler.write(job_id + "\n")
def clear_db(self):
"""Remove database."""
if self.db_file is not None:
if os.path.exists(self.db_file):
os.unlink(self.db_file)
@abstractmethod
def set_submit_cmd(self):
"""Set submit command.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
@abstractmethod
def set_jobid(self):
"""Set job id.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
@abstractmethod
def get_logfile(self):
"""Set logfile.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
def submit_job(self):
"""Submit job.
Raises:
RuntimeError: _description_
"""
logging.info("SUBMIT COMMAND: %s", self.submit_cmd)
if self.submit_cmd is not None:
cmd = self.set_remote_cmd(self.submit_cmd, self.remote_submit_cmd)
logfile = self.get_logfile()
logging.info(cmd)
if logfile is None:
subfile = self.task.create_submission_log(self.task_settings.joboutdir)
with open(subfile, mode="w", encoding="utf-8") as subfile:
self.server.update_log("ECF_JOB_CMD: " + cmd)
process = subprocess.Popen(cmd, stdout=subfile, stderr=subfile, shell=True)
process.wait()
ret = process.returncode
if ret != 0:
raise RuntimeError("Submit command failed with error code " + str(ret))
else:
self.server.update_log("ECF_JOB_CMD: " + cmd)
with open(logfile, mode="w", encoding="utf-8") as logfile:
self.process = subprocess.Popen(cmd, stdout=logfile, stderr=logfile, shell=True)
logging.debug(self.submit_cmd)
self.job_id = self.set_jobid()
logging.debug(self.job_id)
if self.db_file is not None:
SubmissionBaseClass.update_db(self, self.job_id)
def kill_job(self):
"""Kill task.
Raises:
RuntimeError: _description_
"""
if self.kill_job_cmd is not None:
killfile = self.task.create_kill_log(self.task_settings.joboutdir)
# print(self.kill_job_cmd)
cmd = self.set_remote_cmd(self.kill_job_cmd, self.remote_kill_cmd)
with open(killfile, mode="w", encoding="utf-8") as kill_file:
kill_file.write(f"Kill job {self.task_settings.ecf_job_at_host} with command:\n")
kill_file.write(f"{cmd}\n")
kill_file.flush()
stdout = open(killfile, mode="a", encoding="utf-8")
process = subprocess.Popen(cmd, stdout=stdout, stderr=stdout, shell=True)
process.wait()
ret = process.returncode
if ret != 0:
raise RuntimeError("Kill command failed with error code " + str(ret))
logfile = self.task_settings.ecf_jobout
if os.path.exists(logfile):
mode = "a"
else:
mode = "w"
log_handler = open(logfile, mode=mode, encoding="utf-8")
log_handler.write("\n\n*** KILLED BY ECF_kill ****")
log_handler.flush()
log_handler.close()
@abstractmethod
def set_job_status(self,):
"""Set job status.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
def status(self):
"""General status method.
Raises:
StatusException: _description_
StatusException: _description_
"""
if self.job_id is None:
StatusException("No job ID was provided!", self.task, self.task_settings)
try:
self.set_job_status()
except Exception as error:
raise StatusException("Setting of status command failed " + repr(error), self.task,
self.task_settings) from error
if self.job_status_cmd is None:
raise StatusException("No status command set for " + self.task_settings.submit_type,
self.task, self.task_settings)
try:
self.job_status()
except Exception as error:
raise StatusException("Status command failed " + repr(error), self.task,
self.task_settings) from error
def job_status(self):
"""General job status method.
Raises:
RuntimeError: _description_
"""
logging.info(self.job_status_cmd)
if self.job_status_cmd is not None:
statusfile = self.task.create_status_log(self.task_settings.joboutdir)
cmd = self.set_remote_cmd(self.job_status_cmd, self.remote_status_cmd)
with open(statusfile, mode="w", encoding="utf-8") as stdout:
process = subprocess.Popen(cmd, stdout=stdout, stderr=stdout, shell=True)
process.wait()
ret = process.returncode
if ret != 0:
raise RuntimeError("Status command failed with error code " + str(ret))
def kill(self):
"""General kill method.
Raises:
KillException: _description_
KillException: _description_
KillException: _description_
KillException: _description_
"""
if self.job_id is None:
raise KillException("No job ID was provided!", self.task, self.task_settings)
try:
self.set_kill_cmd()
except Exception as error:
raise KillException("Setting of kill command failed " + repr(error), self.task,
self.task_settings) from error
if self.kill_job_cmd is None:
raise KillException("No kill command set for " + self.task_settings.submit_type,
self.task, self.task_settings)
try:
self.kill_job()
except Exception as error:
raise KillException("Kill failed " + repr(error), self.task,
self.task_settings) from error
@abstractmethod
def set_kill_cmd(self):
"""Set kill command.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
@staticmethod
def set_remote_cmd(cmd, remote_cmd):
"""Set remote command.
Pre-pending a remote command statement (e.g. ssh)
"""
if remote_cmd is not None:
cmd = remote_cmd + " \"" + str(cmd) + "\""
return cmd
@abstractmethod
def set_output(self):
"""Set output.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
@abstractmethod
def set_job_name(self):
"""Set job name.
Must be implemented.
Raises:
NotImplementedError: If not implemented.
"""
raise NotImplementedError
[docs]class BackgroundSubmission(SubmissionBaseClass):
"""Backgrpund submission.
A subprocess on the host system.
Args:
SubmissionBaseClass (_type_): _description_
"""
[docs] def __init__(self, task, task_settings, server, db_file=None):
"""Construct BackgroundSubmission.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
server (scheduler.Server): Server.
db_file (str, optional): Data base for monitoring. Defaults to None.
"""
SubmissionBaseClass.__init__(self, task, task_settings, server, db_file=db_file)
[docs] def set_submit_cmd(self):
"""Set submit command."""
ecf_job = self.task_settings.ecf_job
submit_vars = ""
if self.task_settings.submit_variables is not None:
for key, val in self.task_settings.submit_variables.items():
submit_vars = f"export {key}={val}; {submit_vars}"
ecf_job = f"{submit_vars} {ecf_job}"
print(ecf_job)
self.submit_cmd = self.set_remote_cmd(ecf_job, self.remote_submit_cmd)
[docs] def set_jobid(self):
"""Set job id."""
return str(self.process.pid)
[docs] def get_logfile(self):
"""Get the logfile."""
ecf_jobout = self.task_settings.ecf_jobout
return ecf_jobout
[docs] def set_kill_cmd(self):
"""Set the kill command."""
if self.job_id is not None:
cmd = "kill -9 " + str(self.job_id)
self.kill_job_cmd = self.set_remote_cmd(cmd, self.remote_kill_cmd)
[docs] def set_job_status(self):
"""Set the job status."""
logging.debug("set_job_status %s cmd: %s", self.job_id, self.job_status_cmd)
if self.job_id is not None:
self.job_status_cmd = "ps -auxq " + str(self.job_id)
print(self.job_status_cmd)
[docs] def set_output(self):
"""Set output."""
string = "# Background jobs use standard output/error\n"
return string
[docs] def set_job_name(self):
"""Set job name."""
string = "# Background jobs get job name from process name\n"
return string
[docs]class BatchSubmission(SubmissionBaseClass):
"""A general batch system class for a task using a job scheduler system.
Args:
SubmissionBaseClass (_type_): _description_
"""
[docs] def __init__(self, task, task_settings, server, db_file=None, sub=None, stat=None,
kill=None, prefix="#"):
"""Construct the BatchSubmission object.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
server (scheduler.Server): Server.
db_file (str, optional): Data base for monitoring. Defaults to None.
sub (str, optional): Submission command. Defaults to None.
stat (str, optional): Status command. Defaults to None.
kill (str, optional): Kill command. Defaults to None.
prefix (str, optional): Batch prefix. Defaults to "#".
"""
SubmissionBaseClass.__init__(self, task, task_settings, server, db_file=db_file)
name = self.task.ecf_name.split("/")
self.batch_sub = sub
self.batch_stat = stat
self.batch_kill = kill
self.batch_prefix = prefix
self.name = name[-1]
[docs] def set_submit_cmd(self):
"""Set submit command.
Args:
remote_cmd (_type_, optional): _description_. Defaults to None.
"""
submit_vars = ""
if self.task_settings.submit_variables is not None:
for key, val in self.task_settings.submit_variables.items():
submit_vars = f"-v {key}={val} {submit_vars}"
cmd = f"{self.batch_sub} {submit_vars} {self.task_settings.ecf_job_at_host}"
cmd = self.set_remote_cmd(cmd, self.task_settings.remote_submit_cmd)
self.submit_cmd = cmd
[docs] def set_jobid(self):
"""Set job id."""
raise NotImplementedError
[docs] def get_logfile(self):
"""Get the logfile."""
return None
[docs] def set_kill_cmd(self):
"""Set kill command."""
if self.job_id is not None:
self.kill_job_cmd = self.batch_kill + " " + str(self.job_id)
[docs] def set_job_status(self):
"""Set job status."""
if self.job_id is not None:
self.job_status_cmd = self.batch_stat + " " + str(self.job_id)
[docs] def set_output(self):
"""Set output."""
logfile = self.task_settings.ecf_jobout_at_host
string = self.batch_prefix + " -o " + logfile + "\n"
string += self.batch_prefix + " -e " + logfile + "\n"
string += self.batch_prefix + " -j oe\n"
return string
[docs] def set_job_name(self):
"""Set job name."""
string = self.batch_prefix + " -N " + self.name + "\n"
return string
[docs]class PBSSubmission(BatchSubmission):
"""Job submission on a PBS job scheduler.
Args:
BatchSubmission (_type_): _description_
"""
[docs] def __init__(self, task, task_settings, server, sub="qsub", stat="qstat -j", kill="qdel",
prefix="#PBS", db_file=None):
"""Construct the PBS job submission object.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
server (scheduler.Server): Server.
sub (str, optional): Submission command. Defaults to "qsub".
stat (str, optional): Status command. Defaults to "qstat -j".
kill (str, optional): Kill command. Defaults to "qdel".
prefix (str, optional): PBS prefix. Defaults to "#PBS".
db_file (str, optional): Data base for monitoring. Defaults to None.
"""
BatchSubmission.__init__(self, task, task_settings, server, db_file=db_file, sub=sub,
stat=stat, kill=kill, prefix=prefix)
self.name = self.name[0:15]
[docs] def set_jobid(self):
"""Set job id."""
logfile = self.task.create_submission_log(self.task_settings.joboutdir)
with open(logfile, mode="r", encoding="utf-8") as file_handler:
lines = file_handler.readlines()
answer = None
for line in lines:
answer = line
# expected_len = 7
expected_len = 1
answer = answer.replace("\n", "")
words = answer.split(" ")
if len(words) == expected_len:
# Set job id as the second element in answer
self.job_id = str(words[0])
# self.job_id = str(words[2])
else:
raise Exception("Expected " + str(expected_len) + " in output. Got " + str(len(words)))
[docs] def set_job_name(self):
"""Set job name."""
string = self.batch_prefix + " -N " + self.name + "\n"
return string
[docs]class SlurmSubmission(BatchSubmission):
"""General slurm submssion class.
Args:
BatchSubmission (_type_): _description_
"""
[docs] def __init__(self, task, task_settings, server, sub="sbatch", stat="squeue -j", kill="scancel",
prefix="#SBATCH", db_file=None):
"""Construct SlurmSubmission.
Args:
task (scheduler.EcflowTask): Task.
task_settings (TaskSettings): Task settings.
server (scheduler.Server): Server.
sub (str, optional): Submission command. Defaults to "sbatch".
stat (str, optional): Status command. Defaults to "squeue -j".
kill (str, optional): Kill command. Defaults to "scancel".
prefix (str, optional): Slurm prefix. Defaults to "#SBATCH".
db_file (_type_, optional): Data base for monitoring. Defaults to None.
"""
BatchSubmission.__init__(self, task, task_settings, server, db_file=db_file, sub=sub,
stat=stat, kill=kill, prefix=prefix)
name = self.task.ecf_name.split("/")
self.name = name[-1]
[docs] def set_output(self):
"""Set output."""
logfile = self.task_settings.ecf_jobout_at_host
string = self.batch_prefix + " -o " + logfile + "\n"
string += self.batch_prefix + " -e " + logfile
return string
[docs] def set_jobid(self):
"""Set job id."""
logfile = self.task.create_submission_log(self.task_settings.joboutdir)
with open(logfile, mode="r", encoding="utf-8") as file_handler:
lines = file_handler.readlines()
answer = None
for line in lines:
answer = line
if answer is None:
raise Exception("No answer found " + str(lines) + " " + logfile)
expected_len = 4
answer = answer.replace("\n", "")
words = answer.split(" ")
if len(words) == expected_len:
# Set job id as the second element in answer
self.job_id = str(words[-1])
else:
raise Exception("Expected " + str(expected_len) + " in output. Got " + str(len(words)))
def set_submit_cmd(self):
"""Set submit command."""
submit_vars = ""
if self.task_settings.submit_variables is not None:
for key, val in self.task_settings.submit_variables.items():
submit_vars = f"--export {key}={val} {submit_vars}"
cmd = f"{self.batch_sub} {submit_vars} {self.task_settings.ecf_job_at_host}"
cmd = self.set_remote_cmd(cmd, self.task_settings.remote_submit_cmd)
self.submit_cmd = cmd
[docs] def set_job_name(self):
"""Set job name."""
string = self.batch_prefix + " -J " + self.name + "\n"
return string
[docs]class GridEngineSubmission(BatchSubmission):
"""Sun Grid Engine (SGE) job submission.
Args:
BatchSubmission (_type_): _description_
"""
[docs] def __init__(self, task, task_settings, server, db_file=None, sub="qsub", stat="qstat -j",
kill="qdel", prefix="#$"):
"""Construct the GridEngineSubmission object.
Args:
task (scheduler.EcflowTask): Task
task_settings (TaskSettings): Task settings
server (scheduler.Server): Server.
db_file (_type_, optional): Data base for monitoring. Defaults to None.
sub (str, optional): Sumission command. Defaults to "qsub".
stat (str, optional): Status command. Defaults to "qstat -j".
kill (str, optional): Kill command. Defaults to "qdel".
prefix (str, optional): SGE prefix. Defaults to "#$".
"""
BatchSubmission.__init__(self, task, task_settings, server, db_file=db_file, sub=sub,
stat=stat, kill=kill, prefix=prefix)
name = self.task.ecf_name.split("/")
self.name = name[-1]
[docs] def set_output(self):
"""Set output."""
logfile = self.task_settings.ecf_jobout_at_host
string = self.batch_prefix + " -o " + logfile + "\n"
string += self.batch_prefix + " -e " + logfile
return string
[docs] def set_jobid(self):
"""Set job id."""
# Your job XXXXXX ("name") has been submitted
logfile = self.task.create_submission_log(self.task_settings.joboutdir)
with open(logfile, mode="r", encoding="utf-8") as file_handler:
lines = file_handler.readlines()
answer = None
for line in lines:
answer = line
expected_len = 7
answer = answer.replace("\n", "")
words = answer.split(" ")
if len(words) == expected_len:
# Set job id as the second element in answer
self.job_id = str(words[2])
else:
raise Exception("Expected " + str(expected_len) + " in output. Got " + str(len(words)))
[docs] def set_job_name(self):
"""Set job name."""
string = self.batch_prefix + " -N " + self.name + "\n"
return string