PYSURFEX scheduler documentation

https://coveralls.io/repos/github/metno/pysurfex-scheduler/badge.svg?branch=master

https://coveralls.io/github/metno/pysurfex-scheduler

Python abstraction layer for a scheduling system like Ecflow

See online documentation in https://metno.github.io/pysurfex-scheduler/

Installation of pregenerated packages from pypi (pip)

pip3 install pysurfex-scheduler

User installation:

pip3 install pysurfex-scheduler --user

Installation on debian based Linux system

Install the required pacakges (some might be obsolete if the pip packages contain the needed depedencies):

sudo apt-get update
# Python tools
sudo apt-get install python3-setuptools
# Ecflow
sudo apt-get install ecflow-server ecflow-client python3-ecflow

The following depencies are needed. Install the non-standard ones e.g. with pip or your system installation system.

General dependencies (from pypi)

toml
json; python_version < '3'

For testing:

unittest
nose

Download the source code, then install pysurfex-scheduler by executing the following inside the extracted folder:

sudo pip3 install -e .

or

pip3 install -e . --user

Create documentation

cd docs
# Create html documentation
make html
# Create latex documentation
make latex
# Create a pdf documentation
make latexpdf

Usage

import scheduler

# EcFlow variables parsed in EcFlow job
ecf_name = "%ECF_NAME%"
ecf_pass = "%ECF_PASS%"
ecf_tryno = "%ECF_TRYNO%"
ecf_rid = "%ECF_RID%"
submission_id = "%SUBMISSION_ID%"
task_name = "%TASK%"

task = scheduler.EcflowTask(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id)
env_submit = {}
env_server = {
  "ECF_HOST": "localhost",
  "ECF_PORT": 3141,
  "ECF_PORT_OFFSET": 0
}
joboutdir = "/tmp/job"

sub = scheduler.EcflowSubmitTask(task, env_submit, env_server, joboutdir)
sub.submit()

Running an experiment in EcFlow

See examples in unit tests (test directory)

Classes

class scheduler.SuiteDefinition(suite_name, joboutdir, ecf_files, env_submit, ecf_home=None, ecf_include=None, ecf_out=None, ecf_jobout=None, ecf_job_cmd=None, ecf_status_cmd=None, ecf_kill_cmd=None, pythonpath='', path='')[source]

The definition of the suite.

Parameters:

object (_type_) – _description_

class scheduler.EcflowSuite(name, **kwargs)[source]

EcflowSuite.

Parameters:

EcflowNodeContainer (EcflowNodeContainer) – A child of the EcflowNodeContainer class.

class scheduler.EcflowSuiteTriggers(triggers, **kwargs)[source]

Triggers to an ecflow suite.

class scheduler.EcflowSuiteTrigger(node, mode='complete')[source]

EcFlow Trigger in a suite.

class scheduler.EcflowSuiteVariable(name, value)[source]

A variable in an ecflow suite.

class scheduler.EcflowSuiteFamily(name, parent, **kwargs)[source]

A family in ecflow.

Parameters:

EcflowNodeContainer (_type_) – _description_

class scheduler.EcflowSuiteTask(name, parent, **kwargs)[source]

A task in an ecflow suite/family.

Parameters:

EcflowNode (EcflowNodeContainer) – The node container.

class scheduler.EcflowSubmitTask(task, env_submit, server, joboutdir, stream=None, dbfile=None, interpreter='#!/usr/bin/env python3', ensmbr=None, submit_exceptions=None, coldstart=False, env_file=None)[source]

Submit class for ecflow.

class scheduler.TaskSettings(task, submission_defs, joboutdirs, submit_exceptions=None, interpreter='#!/usr/bin/env python3', complete=False, coldstart=False)[source]

Set the task specific setttings.

class scheduler.SubmitException(msg, task, task_settings)[source]

Submit exception.

class scheduler.KillException(msg, task, task_settings)[source]

Kill exception.

class scheduler.StatusException(msg, task, task_settings)[source]

Status exception.

class scheduler.BackgroundSubmission(task, task_settings, server, db_file=None)[source]

Backgrpund submission.

A subprocess on the host system.

Parameters:

SubmissionBaseClass (_type_) – _description_

class scheduler.BatchSubmission(task, task_settings, server, db_file=None, sub=None, stat=None, kill=None, prefix='#')[source]

A general batch system class for a task using a job scheduler system.

Parameters:

SubmissionBaseClass (_type_) – _description_

class scheduler.PBSSubmission(task, task_settings, server, sub='qsub', stat='qstat -j', kill='qdel', prefix='#PBS', db_file=None)[source]

Job submission on a PBS job scheduler.

Parameters:

BatchSubmission (_type_) – _description_

class scheduler.SlurmSubmission(task, task_settings, server, sub='sbatch', stat='squeue -j', kill='scancel', prefix='#SBATCH', db_file=None)[source]

General slurm submssion class.

Parameters:

BatchSubmission (_type_) – _description_

class scheduler.GridEngineSubmission(task, task_settings, server, db_file=None, sub='qsub', stat='qstat -j', kill='qdel', prefix='#$')[source]

Sun Grid Engine (SGE) job submission.

Parameters:

BatchSubmission (_type_) – _description_

class scheduler.Server[source]

Base server/scheduler class.

class scheduler.EcflowServer(ecf_host, ecf_port, logfile)[source]

Ecflow server.

Parameters:

Server (Server) – Is a child of the base server.

class scheduler.EcflowServerFromFile(ecflow_server_file, logfile)[source]

Construct an ecflow server from a config file.

class scheduler.EcflowLogServer(ecf_loghost, ecf_logport)[source]

Ecflow log server.

class scheduler.EcflowTask(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id=None, ecf_timeout=20)[source]

Ecflow scheduler task.

class scheduler.EcflowClient(server, task)[source]

An ecflow client.

Encapsulate communication with the ecflow server. This will automatically call the child command init()/complete(), for job start/finish. It will also handle exceptions and signals, by calling the abort child command. ONLY one instance of this class, should be used. Otherwise zombies will be created.

Class methods

SuiteDefinition.__init__(suite_name, joboutdir, ecf_files, env_submit, ecf_home=None, ecf_include=None, ecf_out=None, ecf_jobout=None, ecf_job_cmd=None, ecf_status_cmd=None, ecf_kill_cmd=None, pythonpath='', path='')[source]

Construct the definition.

Parameters:
  • suite_name (_type_) – _description_

  • joboutdir (_type_) – _description_

  • ecf_files (_type_) – _description_

  • env_submit (_type_) – _description_

  • ecf_home (_type_, optional) – _description_. Defaults to None.

  • ecf_include (_type_, optional) – _description_. Defaults to None.

  • ecf_out (_type_, optional) – _description_. Defaults to None.

  • ecf_jobout (_type_, optional) – _description_. Defaults to None.

  • ecf_job_cmd (_type_, optional) – _description_. Defaults to None.

  • ecf_status_cmd (_type_, optional) – _description_. Defaults to None.

  • ecf_kill_cmd (_type_, optional) – _description_. Defaults to None.

  • pythonpath (str, optional) – _description_. Defaults to “”.

  • path (str, optional) – _description_. Defaults to “”.

Raises:

Exception – _description_

SuiteDefinition.save_as_defs(def_file)[source]

Save definition file.

Parameters:

def_file (str) – Name of definition file

EcflowSuite.__init__(name, **kwargs)[source]

Construct the Ecflow suite.

Parameters:

name (_type_) – _description_

EcflowSuite.save_as_defs(def_file)[source]

Save defintion file.

Parameters:

def_file (str) – Name of the definition file.

EcflowSuiteTriggers.__init__(triggers, **kwargs)[source]

Construct EcflowSuiteTriggers.

Parameters:

triggers (list) – List of EcflowSuiteTrigger objects.

static EcflowSuiteTriggers.create_string(triggers, mode)[source]

Create the trigger string.

Parameters:
  • triggers (list) – List of trigger objects

  • mode (str) – Concatenation type.

Raises:
  • Exception – _description_

  • Exception – _description_

Returns:

The trigger string based on trigger objects.

Return type:

str

EcflowSuiteTriggers.add_triggers(triggers, mode='AND')[source]

Add triggers.

Parameters:
  • triggers (EcflowSuiteTriggers) – The triggers

  • mode (str, optional) – Cat mode. Defaults to “AND”.

EcflowSuiteTrigger.__init__(node, mode='complete')[source]

Create a EcFlow trigger object.

Parameters:
  • node (scheduler.EcflowNode) – The node to trigger on

  • mode (str) –

EcflowSuiteVariable.__init__(name, value)[source]

Constuct the EcflowSuiteVariable.

Parameters:
  • name (str) – Name

  • value (str) – Value

EcflowSuiteFamily.__init__(name, parent, **kwargs)[source]

Construct the family.

Parameters:
  • name (str) – Name of the family.

  • parent (EcflowNodeContainer) – Parent node.

EcflowSuiteTask.__init__(name, parent, **kwargs)[source]

Constuct the EcflowSuiteTask.

Parameters:
  • name (str) – Name of task

  • parent (EcflowNode) – Parent node.

Raises:

Exception – _description_

EcflowSubmitTask.write_header(file_handler)[source]

Write header to file handler.

Parameters:

file_handler (_type_) – _description_

Returns:

_description_

Return type:

_type_

EcflowSubmitTask.write_trailer(file_handler)[source]

Write trailer in job file.

Parameters:

file_handler (_type_) – _description_

EcflowSubmitTask.write_job()[source]

Write job file.

EcflowSubmitTask.submit()[source]

Sumit task.

TaskSettings.check_exceptions(submit_exceptions)[source]

Possibility to create submission exceptions.

Parameters:

submit_exceptions (_type_) – _description_

TaskSettings.process_settings()[source]

Process the settings.

TaskSettings.parse_submission_defs()[source]

Parse the submssion definitions.

SubmitException.__init__(msg, task, task_settings)[source]

Construct SubmitException.

Parameters:
KillException.__init__(msg, task, task_settings)[source]

Construct KillException.

Parameters:
StatusException.__init__(msg, task, task_settings)[source]

Construct status exception.

Parameters:
BackgroundSubmission.__init__(task, task_settings, server, db_file=None)[source]

Construct BackgroundSubmission.

Parameters:
BackgroundSubmission.set_submit_cmd()[source]

Set submit command.

BackgroundSubmission.set_jobid()[source]

Set job id.

BackgroundSubmission.get_logfile()[source]

Get the logfile.

BackgroundSubmission.set_kill_cmd()[source]

Set the kill command.

BackgroundSubmission.set_job_status()[source]

Set the job status.

BackgroundSubmission.set_output()[source]

Set output.

BackgroundSubmission.set_job_name()[source]

Set job name.

BatchSubmission.__init__(task, task_settings, server, db_file=None, sub=None, stat=None, kill=None, prefix='#')[source]

Construct the BatchSubmission object.

Parameters:
  • task (scheduler.EcflowTask) – Task.

  • task_settings (TaskSettings) – Task settings.

  • server (scheduler.Server) – Server.

  • db_file (str, optional) – Data base for monitoring. Defaults to None.

  • sub (str, optional) – Submission command. Defaults to None.

  • stat (str, optional) – Status command. Defaults to None.

  • kill (str, optional) – Kill command. Defaults to None.

  • prefix (str, optional) – Batch prefix. Defaults to “#”.

BatchSubmission.set_submit_cmd()[source]

Set submit command.

Parameters:

remote_cmd (_type_, optional) – _description_. Defaults to None.

BatchSubmission.set_jobid()[source]

Set job id.

BatchSubmission.get_logfile()[source]

Get the logfile.

BatchSubmission.set_kill_cmd()[source]

Set kill command.

BatchSubmission.set_job_status()[source]

Set job status.

BatchSubmission.set_output()[source]

Set output.

BatchSubmission.set_job_name()[source]

Set job name.

PBSSubmission.__init__(task, task_settings, server, sub='qsub', stat='qstat -j', kill='qdel', prefix='#PBS', db_file=None)[source]

Construct the PBS job submission object.

Parameters:
  • task (scheduler.EcflowTask) – Task.

  • task_settings (TaskSettings) – Task settings.

  • server (scheduler.Server) – Server.

  • sub (str, optional) – Submission command. Defaults to “qsub”.

  • stat (str, optional) – Status command. Defaults to “qstat -j”.

  • kill (str, optional) – Kill command. Defaults to “qdel”.

  • prefix (str, optional) – PBS prefix. Defaults to “#PBS”.

  • db_file (str, optional) – Data base for monitoring. Defaults to None.

PBSSubmission.set_jobid()[source]

Set job id.

PBSSubmission.set_job_name()[source]

Set job name.

SlurmSubmission.__init__(task, task_settings, server, sub='sbatch', stat='squeue -j', kill='scancel', prefix='#SBATCH', db_file=None)[source]

Construct SlurmSubmission.

Parameters:
  • task (scheduler.EcflowTask) – Task.

  • task_settings (TaskSettings) – Task settings.

  • server (scheduler.Server) – Server.

  • sub (str, optional) – Submission command. Defaults to “sbatch”.

  • stat (str, optional) – Status command. Defaults to “squeue -j”.

  • kill (str, optional) – Kill command. Defaults to “scancel”.

  • prefix (str, optional) – Slurm prefix. Defaults to “#SBATCH”.

  • db_file (_type_, optional) – Data base for monitoring. Defaults to None.

SlurmSubmission.set_output()[source]

Set output.

SlurmSubmission.set_jobid()[source]

Set job id.

SlurmSubmission.set_job_name()[source]

Set job name.

GridEngineSubmission.__init__(task, task_settings, server, db_file=None, sub='qsub', stat='qstat -j', kill='qdel', prefix='#$')[source]

Construct the GridEngineSubmission object.

Parameters:
  • task (scheduler.EcflowTask) – Task

  • task_settings (TaskSettings) – Task settings

  • server (scheduler.Server) – Server.

  • db_file (_type_, optional) – Data base for monitoring. Defaults to None.

  • sub (str, optional) – Sumission command. Defaults to “qsub”.

  • stat (str, optional) – Status command. Defaults to “qstat -j”.

  • kill (str, optional) – Kill command. Defaults to “qdel”.

  • prefix (str, optional) – SGE prefix. Defaults to “#$”.

GridEngineSubmission.set_output()[source]

Set output.

GridEngineSubmission.set_jobid()[source]

Set job id.

GridEngineSubmission.set_job_name()[source]

Set job name.

Server.__init__()[source]

Construct the server.

abstract Server.start_server()[source]

Start the server.

Raises:

NotImplementedError – Must be implemented by the child server object.

abstract Server.replace(suite_name, def_file)[source]

Create or change the suite definition.

Parameters:
  • suite_name (str) – Name of the suite.

  • def_file (str) – Name of the definition file.

Raises:

NotImplementedError – Must be implemented by the child server object.

Server.start_suite(suite_name, def_file, begin=True)[source]

Start the suite.

All the servers have these methods implemented and can start the server in a server specific way.

Parameters:
  • suite_name (str) – Name of the suite

  • def_file (str) – Name of the definition file.

  • begin (bool, optional) – If the suite should begin. Defaults to True.

EcflowServer.__init__(ecf_host, ecf_port, logfile)[source]

Construct the EcflowServer.

Parameters:
  • ecf_host (str) – Host name of the ecflow server.

  • ecf_port (int) – Port to listen to.

  • logfile (str) – Logfile for the scheduler.

Raises:

Exception – _description_

EcflowServer.start_server()[source]

Start the server.

EcflowServer.force_complete(task)[source]

Force the task complete.

Parameters:

task (scheduler.EcflowTask) – Task to force complete.

EcflowServer.force_aborted(task)[source]

Force the task aborted.

Parameters:

task (scheduler.EcflowTask) – Task to force aborted.

EcflowServer.update_submission_id(task)[source]

Update the submission id.

Parameters:

task (scheduler.EcflowTask) – Ecflow task.

EcflowServer.replace(suite_name, def_file)[source]

Replace the suite name from def_file.

Parameters:
  • suite_name (str) – Suite name.

  • def_file (str) – Definition file.

Raises:

Exception – _description_

EcflowServer.update_log(text)[source]

Update the log.

Parameters:

text (str) – Text to log

EcflowServerFromFile.__init__(ecflow_server_file, logfile)[source]

Construct EcflowServer from a file.

Parameters:
  • ecflow_server_file (str) – File with server definition

  • logfile (str) – Logfile

Raises:

FileNotFoundError – if server file was not found.

EcflowServerFromFile.get_var(var, default=None)[source]

Get variable setting.

Parameters:
  • var (str) – Key in settings.

  • default (_type_, optional) – _description_. Defaults to None.

Raises:

Exception – _description_

Returns:

_description_

Return type:

_type_

EcflowServerFromFile.save_as_file(fname)[source]

Save the server settings to a file.

Parameters:

fname (str) – File name

EcflowLogServer.__init__(ecf_loghost, ecf_logport)[source]

Constuct the ecflow log server.

Parameters:
  • ecf_loghost (str) – Name of the loghost.

  • ecf_logport (int) – Port to listen to.

EcflowTask.__init__(ecf_name, ecf_tryno, ecf_pass, ecf_rid, submission_id=None, ecf_timeout=20)[source]

Construct a task running and communicating with ecflow server.

Parameters:
  • ecf_name (str) – Full name of ecflow task.

  • ecf_tryno (int) – Ecflow task try number

  • ecf_pass (str) – Ecflow task password

  • ecf_rid (int) – Ecflow runtime ID

  • submission_id (str, optional) – Submssion ID from submission. Defaults to None.

  • ecf_timeout (int, optional) – _description_. Defaults to 20.

EcflowTask.create_submission_log(joboutdir)[source]

Create the submssion log file name.

Parameters:

joboutdir (str) – Location of ecflow created job files.

Returns:

Name of the submission output file.

Return type:

str

EcflowTask.create_kill_log(joboutdir)[source]

Create the kill log file name.

Parameters:

joboutdir (str) – Location of ecflow created job files.

Returns:

Name of the kill output file.

Return type:

str

EcflowTask.create_status_log(joboutdir)[source]

Create the status log file name.

Parameters:

joboutdir (str) – Location of ecflow created job files.

Returns:

Name of the status output file.

Return type:

str

EcflowTask.create_ecf_job(joboutdir)[source]

Create the ecflow job file name.

Parameters:

joboutdir (str) – Location of ecflow created job files.

Returns:

Name of the ecflow job file.

Return type:

str

EcflowTask.create_ecf_jobout(joboutdir)[source]

Create the ecflow output file name.

Parameters:

joboutdir (str) – Location of ecflow created job files.

Returns:

Name of the ecflow output file.

Return type:

str

EcflowClient.__init__(server, task)[source]

Construct the ecflow client.

Parameters:
static EcflowClient.at_time()[source]

Generate time stamp.

EcflowClient.signal_handler(signum, extra=None)[source]

Signal handler.

Parameters:
  • signum (_type_) – _description_

  • extra (_type_, optional) – _description_. Defaults to None.

EcflowClient.__enter__()[source]

Enter the object.

Returns:

_description_

Return type:

_type_

EcflowClient.__exit__(ex_type, value, tback)[source]

Exit method.

Parameters:
  • ex_type (_type_) – _description_

  • value (_type_) – _description_

  • tback (_type_) – _description_

Returns:

_description_

Return type:

_type_

Methods

scheduler.parse_submit_cmd(argv)[source]

Parse the command line input arguments.

scheduler.submit_cmd(**kwargs)[source]

Submit command.

scheduler.parse_kill_cmd(argv)[source]

Parse the command line input arguments.

scheduler.kill_cmd(**kwargs)[source]

Kill cmmand.

scheduler.parse_status_cmd(argv)[source]

Parse the command line input arguments.

scheduler.status_cmd(**kwargs)[source]

Status command.

scheduler.get_submission_object(task, task_settings, server, db_file=None)[source]

Get the submission object constructed from a submit type.

Parameters:
Raises:

NotImplementedError – _description_

Returns:

Return a submission object.

Return type:

SubmissionBaseClass

  • ref:

    README

Indices and tables