"""Ecflow suites."""
import os
import logging
try:
import ecflow
except ImportError:
ecflow = None
[docs]class SuiteDefinition(object):
"""The definition of the suite.
Args:
object (_type_): _description_
"""
[docs] def __init__(self, suite_name, joboutdir, ecf_files, env_submit,
ecf_home=None, ecf_include=None, ecf_out=None, ecf_jobout=None,
ecf_job_cmd=None, ecf_status_cmd=None, ecf_kill_cmd=None, pythonpath="", path=""):
"""Construct the definition.
Args:
suite_name (_type_): _description_
joboutdir (_type_): _description_
ecf_files (_type_): _description_
env_submit (_type_): _description_
ecf_home (_type_, optional): _description_. Defaults to None.
ecf_include (_type_, optional): _description_. Defaults to None.
ecf_out (_type_, optional): _description_. Defaults to None.
ecf_jobout (_type_, optional): _description_. Defaults to None.
ecf_job_cmd (_type_, optional): _description_. Defaults to None.
ecf_status_cmd (_type_, optional): _description_. Defaults to None.
ecf_kill_cmd (_type_, optional): _description_. Defaults to None.
pythonpath (str, optional): _description_. Defaults to "".
path (str, optional): _description_. Defaults to "".
Raises:
Exception: _description_
"""
if ecflow is None:
raise Exception("Ecflow not loaded properly")
name = suite_name
self.joboutdir = joboutdir
if ecf_include is None:
ecf_include = ecf_files
self.ecf_include = ecf_include
self.ecf_files = ecf_files
if ecf_home is None:
ecf_home = joboutdir
self.ecf_home = ecf_home
if ecf_out is None:
ecf_out = joboutdir
self.ecf_out = ecf_out
if ecf_jobout is None:
ecf_jobout = joboutdir + "/%ECF_NAME%.%ECF_TRYNO%"
self.ecf_jobout = ecf_jobout
self.env_submit = env_submit
# self.server_config = server_config
# self.server_log = server_log
if pythonpath != "":
pythonpath = pythonpath + "; "
if path != "":
path = path + "/"
if ecf_job_cmd is None:
ecf_job_cmd = pythonpath + path + "ECF_submit " \
"-sub %ENV_SUBMIT% " \
"-dir %ECF_OUT% " \
"-server %SERVER_CONFIG% " \
"--log %LOGFILE% " \
"-ecf_name %ECF_NAME% " \
"-ecf_tryno %ECF_TRYNO% " \
"-ecf_pass %ECF_PASS% " \
"-ecf_rid %ECF_RID% "
self.ecf_job_cmd = ecf_job_cmd
if ecf_status_cmd is None:
ecf_status_cmd = pythonpath + path + "ECF_status " \
"-dir %ECF_OUT% " \
"-ecf_name %ECF_NAME% " \
"-ecf_tryno %ECF_TRYNO% " \
"-ecf_pass %ECF_PASS% " \
"-ecf_rid %ECF_RID% " \
"-submission_id %SUBMISSION_ID%"
self.ecf_status_cmd = ecf_status_cmd
if ecf_kill_cmd is None:
ecf_kill_cmd = pythonpath + path + "ECF_kill " \
"-sub %ENV_SUBMIT% " \
"-dir %ECF_OUT% " \
"-server %SERVER_CONFIG% " \
"--log %LOGFILE% " \
"-ecf_name %ECF_NAME% " \
"-ecf_tryno %ECF_TRYNO% " \
"-ecf_pass %ECF_PASS% " \
"-ecf_rid %ECF_RID% " \
"-submission_id %SUBMISSION_ID%"
self.ecf_kill_cmd = ecf_kill_cmd
variables = [
EcflowSuiteVariable("ECF_EXTN", ".py"),
EcflowSuiteVariable("STREAM", ""),
EcflowSuiteVariable("ENSMBR", ""),
EcflowSuiteVariable("ECF_FILES", self.ecf_files),
EcflowSuiteVariable("ECF_INCLUDE", self.ecf_include),
EcflowSuiteVariable("ECF_TRIES", 1),
EcflowSuiteVariable("SUBMISSION_ID", ""),
EcflowSuiteVariable("ECF_HOME", self.ecf_home),
EcflowSuiteVariable("ECF_KILL_CMD", self.ecf_kill_cmd),
EcflowSuiteVariable("ECF_JOB_CMD", self.ecf_job_cmd),
EcflowSuiteVariable("ECF_STATUS_CMD", self.ecf_status_cmd),
EcflowSuiteVariable("ECF_OUT", self.ecf_out),
EcflowSuiteVariable("ECF_JOBOUT", self.ecf_jobout),
EcflowSuiteVariable("ENV_SUBMIT", self.env_submit),
# EcflowSuiteVariable("SERVER_CONFIG", self.server_config),
# EcflowSuiteVariable("LOGFILE", self.server_log)
]
self.suite = EcflowSuite(name, variables=variables)
[docs] def save_as_defs(self, def_file):
"""Save definition file.
Args:
def_file (str): Name of definition file
"""
self.suite.save_as_defs(def_file)
class EcflowNode():
"""A Node class is the abstract base class for Suite, Family and Task.
Every Node instance has a name, and a path relative to a suite
"""
def __init__(self, name, node_type, parent, **kwargs):
"""Construct the EcflowNode.
Args:
name (_type_): _description_
node_type (_type_): _description_
parent (_type_): _description_
Raises:
NotImplementedError: _description_
Exception: _description_
Exception: _description_
"""
self.name = name
self.node_type = node_type
if self.node_type == "family":
self.ecf_node = parent.ecf_node.add_family(self.name)
elif self.node_type == "task":
self.ecf_node = parent.ecf_node.add_task(self.name)
elif self.node_type == "suite":
self.ecf_node = parent.add_suite(self.name)
else:
raise NotImplementedError
self.path = self.ecf_node.get_abs_node_path()
triggers = None
if "triggers" in kwargs:
triggers = kwargs["triggers"]
if "variables" in kwargs:
variables = kwargs["variables"]
if not isinstance(variables, list):
variables = [variables]
if variables is None:
variables = []
else:
variables = []
for var in variables:
self.ecf_node.add_variable(var.name, var.value)
if triggers is not None:
if isinstance(triggers, EcflowSuiteTriggers):
if triggers.trigger_string is not None:
self.ecf_node.add_trigger(triggers.trigger_string)
else:
print("WARNING: Empty trigger")
else:
raise Exception("Triggers must be a Triggers object")
self.triggers = triggers
if "def_status" in kwargs:
def_status = kwargs["def_status"]
if isinstance(def_status, str):
self.ecf_node.add_defstatus(ecflow.Defstatus(def_status))
elif isinstance(def_status, ecflow.Defstatus):
self.ecf_node.add_defstatus(def_status)
else:
raise Exception("Unknown defstatus")
def add_part_trigger(self, triggers, mode=True):
"""Add a part trigger.
Args:
triggers (_type_): _description_
mode (bool, optional): _description_. Defaults to True.
Raises:
Exception: _description_
"""
if isinstance(triggers, EcflowSuiteTriggers):
if triggers.trigger_string is not None:
self.ecf_node.add_part_trigger(triggers.trigger_string, mode)
else:
print("WARNING: Empty trigger")
else:
raise Exception("Triggers must be a Triggers object")
class EcflowNodeContainer(EcflowNode):
"""Ecflow node container.
Args:
EcflowNode (EcflowNode): Parent class.
"""
def __init__(self, name, node_type, parent, **kwargs):
"""Construct EcflowNodeContainer.
Args:
name (str): Name of the node container.
node_type (str): What kind of node.
parent (EcflowNode): Parent to this node.
"""
EcflowNode.__init__(self, name, node_type, parent, **kwargs)
[docs]class EcflowSuite(EcflowNodeContainer):
"""EcflowSuite.
Args:
EcflowNodeContainer (EcflowNodeContainer): A child of the EcflowNodeContainer class.
"""
[docs] def __init__(self, name, **kwargs):
"""Construct the Ecflow suite.
Args:
name (_type_): _description_
"""
self.defs = ecflow.Defs({})
EcflowNodeContainer.__init__(self, name, "suite", self.defs, **kwargs)
[docs] def save_as_defs(self, def_file):
"""Save defintion file.
Args:
def_file (str): Name of the definition file.
"""
self.defs.save_as_defs(def_file)
logging.info("def file saved to %s", def_file)
[docs]class EcflowSuiteTriggers():
"""Triggers to an ecflow suite."""
[docs] def __init__(self, triggers, **kwargs):
"""Construct EcflowSuiteTriggers.
Args:
triggers (list): List of EcflowSuiteTrigger objects.
"""
mode = kwargs.get("mode")
if mode is None:
mode = "AND"
trigger_string = self.create_string(triggers, mode)
self.trigger_string = trigger_string
[docs] @staticmethod
def create_string(triggers, mode):
"""Create the trigger string.
Args:
triggers (list): List of trigger objects
mode (str): Concatenation type.
Raises:
Exception: _description_
Exception: _description_
Returns:
str: The trigger string based on trigger objects.
"""
if not isinstance(triggers, list):
triggers = [triggers]
if len(triggers) == 0:
raise Exception
trigger_string = "("
first = True
for trigger in triggers:
if trigger is not None:
cat = ""
if not first:
cat = " " + mode + " "
if isinstance(trigger, EcflowSuiteTriggers):
trigger_string = trigger_string + cat + trigger.trigger_string
else:
if isinstance(trigger, EcflowSuiteTrigger):
trigger_string = trigger_string + cat + trigger.node.path + " == " +\
trigger.mode
else:
raise Exception("Trigger must be a Trigger object")
first = False
trigger_string = trigger_string + ")"
# If no triggers were found/set
if first:
trigger_string = None
return trigger_string
[docs] def add_triggers(self, triggers, mode="AND"):
"""Add triggers.
Args:
triggers (EcflowSuiteTriggers): The triggers
mode (str, optional): Cat mode. Defaults to "AND".
"""
cat_string = " " + mode + " "
trigger_string = self.create_string(triggers, mode)
if trigger_string is not None:
self.trigger_string = self.trigger_string + cat_string + trigger_string
[docs]class EcflowSuiteTrigger():
"""EcFlow Trigger in a suite."""
[docs] def __init__(self, node, mode="complete"):
"""Create a EcFlow trigger object.
Args:
node (scheduler.EcflowNode): The node to trigger on
mode (str):
"""
self.node = node
self.mode = mode
[docs]class EcflowSuiteVariable():
"""A variable in an ecflow suite."""
[docs] def __init__(self, name, value):
"""Constuct the EcflowSuiteVariable.
Args:
name (str): Name
value (str): Value
"""
self.name = name
self.value = value
[docs]class EcflowSuiteFamily(EcflowNodeContainer):
"""A family in ecflow.
Args:
EcflowNodeContainer (_type_): _description_
"""
[docs] def __init__(self, name, parent, **kwargs):
"""Construct the family.
Args:
name (str): Name of the family.
parent (EcflowNodeContainer): Parent node.
"""
EcflowNodeContainer.__init__(self, name, "family", parent, **kwargs)
[docs]class EcflowSuiteTask(EcflowNode):
"""A task in an ecflow suite/family.
Args:
EcflowNode (EcflowNodeContainer): The node container.
"""
[docs] def __init__(self, name, parent, **kwargs):
"""Constuct the EcflowSuiteTask.
Args:
name (str): Name of task
parent (EcflowNode): Parent node.
Raises:
Exception: _description_
"""
EcflowNode.__init__(self, name, "task", parent, **kwargs)
ecf_files = kwargs.get("ecf_files")
if ecf_files is not None:
if name == "default":
raise Exception("Job should not be called default")
else:
default_job = ecf_files + "/default.py"
task_job = ecf_files + "/" + name + ".py"
if not os.path.exists(task_job) and not os.path.islink(task_job):
print(default_job + " - > " + task_job)
os.symlink(default_job, task_job)