PYSURFEX scheduler documentation¶
Python abstraction layer for a scheduling system like Ecflow¶
See online documentation in https://metno.github.io/pysurfex-scheduler/
Installation of pregenerated packages from pypi (pip)¶
pip3 install pysurfex-scheduler
User installation:
pip3 install pysurfex-scheduler --user
Installation on debian based Linux system¶
Install the required pacakges (some might be obsolete if the pip packages contain the needed depedencies):
sudo apt-get update
# Python tools
sudo apt-get install python3-setuptools
# Ecflow
sudo apt-get install ecflow-server ecflow-client python3-ecflow
The following depencies are needed. Install the non-standard ones e.g. with pip or your system installation system.
General dependencies (from pypi)¶
toml
json; python_version < '3'
For testing:
unittest
nose
Download the source code, then install pysurfex-scheduler
by executing the following inside the extracted
folder:
sudo pip3 install -e .
or
pip3 install -e . --user
Create documentation¶
cd docs
# Create html documentation
make html
# Create latex documentation
make latex
# Create a pdf documentation
make latexpdf
Usage¶
import scheduler
# EcFlow variables parsed in EcFlow job
ecf_name = "%ECF_NAME%"
ecf_pass = "%ECF_PASS%"
ecf_tryno = "%ECF_TRYNO%"
ecf_rid = "%ECF_RID%"
submission_id = "%SUBMISSION_ID%"
task_name = "%TASK%"
task = scheduler.EcflowTask(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id)
env_submit = {}
env_server = {
"ECF_HOST": "localhost",
"ECF_PORT": 3141,
"ECF_PORT_OFFSET": 0
}
joboutdir = "/tmp/job"
sub = scheduler.EcflowSubmitTask(task, env_submit, env_server, joboutdir)
sub.submit()
Running an experiment in EcFlow¶
See examples in unit tests (test directory)
Classes¶
- class scheduler.SuiteDefinition(suite_name, joboutdir, ecf_files, env_submit, ecf_home=None, ecf_include=None, ecf_out=None, ecf_jobout=None, ecf_job_cmd=None, ecf_status_cmd=None, ecf_kill_cmd=None, pythonpath='', path='')[source]¶
The definition of the suite.
- Parameters:
object (_type_) – _description_
- class scheduler.EcflowSuite(name, **kwargs)[source]¶
EcflowSuite.
- Parameters:
EcflowNodeContainer (EcflowNodeContainer) – A child of the EcflowNodeContainer class.
- class scheduler.EcflowSuiteFamily(name, parent, **kwargs)[source]¶
A family in ecflow.
- Parameters:
EcflowNodeContainer (_type_) – _description_
- class scheduler.EcflowSuiteTask(name, parent, **kwargs)[source]¶
A task in an ecflow suite/family.
- Parameters:
EcflowNode (EcflowNodeContainer) – The node container.
- class scheduler.EcflowSubmitTask(task, env_submit, server, joboutdir, stream=None, dbfile=None, interpreter='#!/usr/bin/env python3', ensmbr=None, submit_exceptions=None, coldstart=False, env_file=None)[source]¶
Submit class for ecflow.
- class scheduler.TaskSettings(task, submission_defs, joboutdirs, submit_exceptions=None, interpreter='#!/usr/bin/env python3', complete=False, coldstart=False)[source]¶
Set the task specific setttings.
- class scheduler.BackgroundSubmission(task, task_settings, server, db_file=None)[source]¶
Backgrpund submission.
A subprocess on the host system.
- Parameters:
SubmissionBaseClass (_type_) – _description_
- class scheduler.BatchSubmission(task, task_settings, server, db_file=None, sub=None, stat=None, kill=None, prefix='#')[source]¶
A general batch system class for a task using a job scheduler system.
- Parameters:
SubmissionBaseClass (_type_) – _description_
- class scheduler.PBSSubmission(task, task_settings, server, sub='qsub', stat='qstat -j', kill='qdel', prefix='#PBS', db_file=None)[source]¶
Job submission on a PBS job scheduler.
- Parameters:
BatchSubmission (_type_) – _description_
- class scheduler.SlurmSubmission(task, task_settings, server, sub='sbatch', stat='squeue -j', kill='scancel', prefix='#SBATCH', db_file=None)[source]¶
General slurm submssion class.
- Parameters:
BatchSubmission (_type_) – _description_
- class scheduler.GridEngineSubmission(task, task_settings, server, db_file=None, sub='qsub', stat='qstat -j', kill='qdel', prefix='#$')[source]¶
Sun Grid Engine (SGE) job submission.
- Parameters:
BatchSubmission (_type_) – _description_
- class scheduler.EcflowServer(ecf_host, ecf_port, logfile)[source]¶
Ecflow server.
- Parameters:
Server (Server) – Is a child of the base server.
- class scheduler.EcflowServerFromFile(ecflow_server_file, logfile)[source]¶
Construct an ecflow server from a config file.
- class scheduler.EcflowTask(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id=None, ecf_timeout=20)[source]¶
Ecflow scheduler task.
- class scheduler.EcflowClient(server, task)[source]¶
An ecflow client.
Encapsulate communication with the ecflow server. This will automatically call the child command init()/complete(), for job start/finish. It will also handle exceptions and signals, by calling the abort child command. ONLY one instance of this class, should be used. Otherwise zombies will be created.
Class methods¶
- SuiteDefinition.__init__(suite_name, joboutdir, ecf_files, env_submit, ecf_home=None, ecf_include=None, ecf_out=None, ecf_jobout=None, ecf_job_cmd=None, ecf_status_cmd=None, ecf_kill_cmd=None, pythonpath='', path='')[source]¶
Construct the definition.
- Parameters:
suite_name (_type_) – _description_
joboutdir (_type_) – _description_
ecf_files (_type_) – _description_
env_submit (_type_) – _description_
ecf_home (_type_, optional) – _description_. Defaults to None.
ecf_include (_type_, optional) – _description_. Defaults to None.
ecf_out (_type_, optional) – _description_. Defaults to None.
ecf_jobout (_type_, optional) – _description_. Defaults to None.
ecf_job_cmd (_type_, optional) – _description_. Defaults to None.
ecf_status_cmd (_type_, optional) – _description_. Defaults to None.
ecf_kill_cmd (_type_, optional) – _description_. Defaults to None.
pythonpath (str, optional) – _description_. Defaults to “”.
path (str, optional) – _description_. Defaults to “”.
- Raises:
Exception – _description_
- SuiteDefinition.save_as_defs(def_file)[source]¶
Save definition file.
- Parameters:
def_file (str) – Name of definition file
- EcflowSuite.__init__(name, **kwargs)[source]¶
Construct the Ecflow suite.
- Parameters:
name (_type_) – _description_
- EcflowSuite.save_as_defs(def_file)[source]¶
Save defintion file.
- Parameters:
def_file (str) – Name of the definition file.
- EcflowSuiteTriggers.__init__(triggers, **kwargs)[source]¶
Construct EcflowSuiteTriggers.
- Parameters:
triggers (list) – List of EcflowSuiteTrigger objects.
- static EcflowSuiteTriggers.create_string(triggers, mode)[source]¶
Create the trigger string.
- Parameters:
triggers (list) – List of trigger objects
mode (str) – Concatenation type.
- Raises:
Exception – _description_
Exception – _description_
- Returns:
The trigger string based on trigger objects.
- Return type:
str
- EcflowSuiteTriggers.add_triggers(triggers, mode='AND')[source]¶
Add triggers.
- Parameters:
triggers (EcflowSuiteTriggers) – The triggers
mode (str, optional) – Cat mode. Defaults to “AND”.
- EcflowSuiteTrigger.__init__(node, mode='complete')[source]¶
Create a EcFlow trigger object.
- Parameters:
node (scheduler.EcflowNode) – The node to trigger on
mode (str) –
- EcflowSuiteVariable.__init__(name, value)[source]¶
Constuct the EcflowSuiteVariable.
- Parameters:
name (str) – Name
value (str) – Value
- EcflowSuiteFamily.__init__(name, parent, **kwargs)[source]¶
Construct the family.
- Parameters:
name (str) – Name of the family.
parent (EcflowNodeContainer) – Parent node.
- EcflowSuiteTask.__init__(name, parent, **kwargs)[source]¶
Constuct the EcflowSuiteTask.
- Parameters:
name (str) – Name of task
parent (EcflowNode) – Parent node.
- Raises:
Exception – _description_
- EcflowSubmitTask.write_header(file_handler)[source]¶
Write header to file handler.
- Parameters:
file_handler (_type_) – _description_
- Returns:
_description_
- Return type:
_type_
- EcflowSubmitTask.write_trailer(file_handler)[source]¶
Write trailer in job file.
- Parameters:
file_handler (_type_) – _description_
- TaskSettings.check_exceptions(submit_exceptions)[source]¶
Possibility to create submission exceptions.
- Parameters:
submit_exceptions (_type_) – _description_
- SubmitException.__init__(msg, task, task_settings)[source]¶
Construct SubmitException.
- Parameters:
msg (str) – Exception message.
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
- KillException.__init__(msg, task, task_settings)[source]¶
Construct KillException.
- Parameters:
msg (str) – Exception message.
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
- StatusException.__init__(msg, task, task_settings)[source]¶
Construct status exception.
- Parameters:
msg (str) – Exception message.
task (scheduler.EcflowTask) – task.
task_settings (TaskSettings) – Task settings
- BackgroundSubmission.__init__(task, task_settings, server, db_file=None)[source]¶
Construct BackgroundSubmission.
- Parameters:
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
server (scheduler.Server) – Server.
db_file (str, optional) – Data base for monitoring. Defaults to None.
- BatchSubmission.__init__(task, task_settings, server, db_file=None, sub=None, stat=None, kill=None, prefix='#')[source]¶
Construct the BatchSubmission object.
- Parameters:
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
server (scheduler.Server) – Server.
db_file (str, optional) – Data base for monitoring. Defaults to None.
sub (str, optional) – Submission command. Defaults to None.
stat (str, optional) – Status command. Defaults to None.
kill (str, optional) – Kill command. Defaults to None.
prefix (str, optional) – Batch prefix. Defaults to “#”.
- BatchSubmission.set_submit_cmd()[source]¶
Set submit command.
- Parameters:
remote_cmd (_type_, optional) – _description_. Defaults to None.
- PBSSubmission.__init__(task, task_settings, server, sub='qsub', stat='qstat -j', kill='qdel', prefix='#PBS', db_file=None)[source]¶
Construct the PBS job submission object.
- Parameters:
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
server (scheduler.Server) – Server.
sub (str, optional) – Submission command. Defaults to “qsub”.
stat (str, optional) – Status command. Defaults to “qstat -j”.
kill (str, optional) – Kill command. Defaults to “qdel”.
prefix (str, optional) – PBS prefix. Defaults to “#PBS”.
db_file (str, optional) – Data base for monitoring. Defaults to None.
- SlurmSubmission.__init__(task, task_settings, server, sub='sbatch', stat='squeue -j', kill='scancel', prefix='#SBATCH', db_file=None)[source]¶
Construct SlurmSubmission.
- Parameters:
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings.
server (scheduler.Server) – Server.
sub (str, optional) – Submission command. Defaults to “sbatch”.
stat (str, optional) – Status command. Defaults to “squeue -j”.
kill (str, optional) – Kill command. Defaults to “scancel”.
prefix (str, optional) – Slurm prefix. Defaults to “#SBATCH”.
db_file (_type_, optional) – Data base for monitoring. Defaults to None.
- GridEngineSubmission.__init__(task, task_settings, server, db_file=None, sub='qsub', stat='qstat -j', kill='qdel', prefix='#$')[source]¶
Construct the GridEngineSubmission object.
- Parameters:
task (scheduler.EcflowTask) – Task
task_settings (TaskSettings) – Task settings
server (scheduler.Server) – Server.
db_file (_type_, optional) – Data base for monitoring. Defaults to None.
sub (str, optional) – Sumission command. Defaults to “qsub”.
stat (str, optional) – Status command. Defaults to “qstat -j”.
kill (str, optional) – Kill command. Defaults to “qdel”.
prefix (str, optional) – SGE prefix. Defaults to “#$”.
- abstract Server.start_server()[source]¶
Start the server.
- Raises:
NotImplementedError – Must be implemented by the child server object.
- abstract Server.replace(suite_name, def_file)[source]¶
Create or change the suite definition.
- Parameters:
suite_name (str) – Name of the suite.
def_file (str) – Name of the definition file.
- Raises:
NotImplementedError – Must be implemented by the child server object.
- Server.start_suite(suite_name, def_file, begin=True)[source]¶
Start the suite.
All the servers have these methods implemented and can start the server in a server specific way.
- Parameters:
suite_name (str) – Name of the suite
def_file (str) – Name of the definition file.
begin (bool, optional) – If the suite should begin. Defaults to True.
- EcflowServer.__init__(ecf_host, ecf_port, logfile)[source]¶
Construct the EcflowServer.
- Parameters:
ecf_host (str) – Host name of the ecflow server.
ecf_port (int) – Port to listen to.
logfile (str) – Logfile for the scheduler.
- Raises:
Exception – _description_
- EcflowServer.force_complete(task)[source]¶
Force the task complete.
- Parameters:
task (scheduler.EcflowTask) – Task to force complete.
- EcflowServer.force_aborted(task)[source]¶
Force the task aborted.
- Parameters:
task (scheduler.EcflowTask) – Task to force aborted.
- EcflowServer.update_submission_id(task)[source]¶
Update the submission id.
- Parameters:
task (scheduler.EcflowTask) – Ecflow task.
- EcflowServer.replace(suite_name, def_file)[source]¶
Replace the suite name from def_file.
- Parameters:
suite_name (str) – Suite name.
def_file (str) – Definition file.
- Raises:
Exception – _description_
- EcflowServerFromFile.__init__(ecflow_server_file, logfile)[source]¶
Construct EcflowServer from a file.
- Parameters:
ecflow_server_file (str) – File with server definition
logfile (str) – Logfile
- Raises:
FileNotFoundError – if server file was not found.
- EcflowServerFromFile.get_var(var, default=None)[source]¶
Get variable setting.
- Parameters:
var (str) – Key in settings.
default (_type_, optional) – _description_. Defaults to None.
- Raises:
Exception – _description_
- Returns:
_description_
- Return type:
_type_
- EcflowServerFromFile.save_as_file(fname)[source]¶
Save the server settings to a file.
- Parameters:
fname (str) – File name
- EcflowLogServer.__init__(ecf_loghost, ecf_logport)[source]¶
Constuct the ecflow log server.
- Parameters:
ecf_loghost (str) – Name of the loghost.
ecf_logport (int) – Port to listen to.
- EcflowTask.__init__(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id=None, ecf_timeout=20)[source]¶
Construct a task running and communicating with ecflow server.
- Parameters:
ecf_name (str) – Full name of ecflow task.
ecf_tryno (int) – Ecflow task try number
ecf_pass (str) – Ecflow task password
ecf_rid (int) – Ecflow runtime ID
submission_id (str, optional) – Submssion ID from submission. Defaults to None.
ecf_timeout (int, optional) – _description_. Defaults to 20.
- EcflowTask.create_submission_log(joboutdir)[source]¶
Create the submssion log file name.
- Parameters:
joboutdir (str) – Location of ecflow created job files.
- Returns:
Name of the submission output file.
- Return type:
str
- EcflowTask.create_kill_log(joboutdir)[source]¶
Create the kill log file name.
- Parameters:
joboutdir (str) – Location of ecflow created job files.
- Returns:
Name of the kill output file.
- Return type:
str
- EcflowTask.create_status_log(joboutdir)[source]¶
Create the status log file name.
- Parameters:
joboutdir (str) – Location of ecflow created job files.
- Returns:
Name of the status output file.
- Return type:
str
- EcflowTask.create_ecf_job(joboutdir)[source]¶
Create the ecflow job file name.
- Parameters:
joboutdir (str) – Location of ecflow created job files.
- Returns:
Name of the ecflow job file.
- Return type:
str
- EcflowTask.create_ecf_jobout(joboutdir)[source]¶
Create the ecflow output file name.
- Parameters:
joboutdir (str) – Location of ecflow created job files.
- Returns:
Name of the ecflow output file.
- Return type:
str
- EcflowClient.__init__(server, task)[source]¶
Construct the ecflow client.
- Parameters:
server (EcflowServer) – Ecflow server object.
task (EcflowTask) – Ecflow task object.
Methods¶
- scheduler.get_submission_object(task, task_settings, server, db_file=None)[source]¶
Get the submission object constructed from a submit type.
- Parameters:
task (scheduler.EcflowTask) – Task.
task_settings (TaskSettings) – Task settings
server (scheduler.Server) – Server.
db_file (str, optional) – Data base for monitoring.. Defaults to None.
- Raises:
NotImplementedError – _description_
- Returns:
Return a submission object.
- Return type:
SubmissionBaseClass
- ref:
README