Source code for pysurfex.run

"""Run time methods."""
import logging
import os
import shutil
import sys
from subprocess import PIPE, STDOUT, CalledProcessError, Popen

from .util import remove_existing_file


[docs]class BatchJob(object): """Batch job."""
[docs] def __init__(self, rte, wrapper=""): """Construct batch job. Args: rte (dict): Run time environment. wrapper (str, optional): Wrapper around command. Defaults to "". """ self.rte = rte self.wrapper = wrapper logging.debug("Constructed BatchJob")
[docs] def run(self, cmd): """Run command. Args: cmd (str): Command to run. Raises: CalledProcessError: Command failed. RuntimeError: No command provided! """ if cmd is None: raise RuntimeError("No command provided!") cmd = self.wrapper + " " + cmd if "OMP_NUM_THREADS" in self.rte: logging.info("BATCH: %s", self.rte["OMP_NUM_THREADS"]) logging.info("Batch running %s", cmd) process = Popen( # noqaS602 cmd, shell=True, # noqaS602 env=self.rte, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1, ) # Poll process for new output until finished while True: nextline = process.stdout.readline() if nextline == "" and process.poll() is not None: break sys.stdout.write(nextline) sys.stdout.flush() return_code = process.wait() if return_code != 0: raise CalledProcessError(return_code, cmd)
[docs]class SURFEXBinary(object): """SURFEX binary class."""
[docs] def __init__(self, binary, batch, iofile, settings, input_data, **kwargs): """Surfex binary task. Args: binary (str): Binary to run batch (surfex.Batch): Batch object to run command in iofile (surfex.SurfexIO): Input file to command. settings (f90nml.Namelist): Fortran namelist namelist input_data (surfex.InputDataToSurfexBinaries): Input to binary kwargs (dict): Key word arguments. Raises: FileNotFoundError: Input file not found RuntimeError: Execution failed """ self.binary = binary self.batch = batch self.iofile = iofile self.settings = settings self.surfout = kwargs.get("surfout") self.archive_data = kwargs.get("archive_data") self.print_namelist = False if "print_namelist" in kwargs: self.print_namelist = kwargs["print_namelist"] self.pgdfile = kwargs.get("pgdfile") # Set input self.input_data = input_data self.input_data.prepare_input() if os.path.exists("OPTIONS.nam"): os.remove("OPTIONS.nam") self.settings.write("OPTIONS.nam") with open("OPTIONS.nam", mode="r", encoding="utf-8") as file_handler: content = file_handler.read() if self.print_namelist: logging.info(content) if self.iofile.need_pgd and self.pgdfile is not None: logging.debug(self.pgdfile.filename) try: logging.info("PGD is %s", self.pgdfile.filename) if self.pgdfile.input_file is not None and os.path.abspath( self.pgdfile.filename ) != os.path.abspath(self.pgdfile.input_file): remove_existing_file(self.pgdfile.input_file, self.pgdfile.filename) os.symlink(self.pgdfile.input_file, self.pgdfile.filename) if not os.path.exists(self.pgdfile.filename): raise FileNotFoundError(f"PGD {self.pgdfile.filename} not found!") except FileNotFoundError: raise FileNotFoundError("Could not set PGD") from FileNotFoundError if self.surfout is not None: try: logging.info("PREP is %s", self.iofile.filename) if self.iofile.input_file is not None and os.path.abspath( self.iofile.filename ) != os.path.abspath(self.iofile.input_file): remove_existing_file(self.iofile.input_file, self.iofile.filename) os.symlink(self.iofile.input_file, self.iofile.filename) if not os.path.exists(self.iofile.filename): raise FileNotFoundError(f"PREP {self.iofile.filename} not found!") except FileNotFoundError: raise FileNotFoundError("Could not set PREP") from FileNotFoundError cmd = self.binary logging.info("Running %s with settings OPTIONS.nam", cmd) try: self.batch.run(cmd) except Exception as exc: raise RuntimeError(repr(exc)) from Exception listings = [ "LISTING_PGD0.txt", "LISTING_PREP0.txt", "LISTING_OFFLINE0.txt", "LISTING_SODA0.txt", ] for listing in listings: if os.path.exists(listing): with open(listing, mode="r", encoding="utf-8") as file_handler: content = file_handler.read() logging.info("Content of %s:", listing) logging.debug(content) # Archive output self.iofile.archive_output_file() if self.surfout is not None: self.surfout.archive_output_file() if self.archive_data is not None: self.archive_data.archive_files()
[docs]class PerturbedOffline(SURFEXBinary): """Pertubed offline.""" def __init__( self, binary, batch, io, pert_number, settings, input_data, surfout=None, archive_data=None, pgdfile=None, print_namelist=False, negpert=False, ): """Perturbed offline. Args: binary (_type_): _description_ batch (_type_): _description_ io (_type_): _description_ pert_number (_type_): _description_ settings (_type_): _description_ input_data (_type_): _description_ surfout (_type_, optional): _description_. Defaults to None. archive_data (_type_, optional): _description_. Defaults to None. pgdfile (str, optional): _description_. Defaults to None. print_namelist (bool, optional): _description_. Defaults to False. negpert (bool, optional): _description_. Defaults to False. """ pert_number = int(pert_number) settings["nam_io_varassim"]["LPRT"] = True settings["nam_var"]["nivar"] = pert_number # Handle negative pertubations if negpert: nncv = settings["nam_var"]["nncv"] for nvi, _nval in enumerate(nncv): val = settings["nam_var"]["xtprt_m"][nvi] settings["nam_var"]["xtprt_m"][nvi] = -val SURFEXBinary.__init__( self, binary, batch, io, settings, input_data, surfout=surfout, archive_data=archive_data, pgdfile=pgdfile, print_namelist=print_namelist, )
[docs]class Masterodb(object): """Masterodb.""" def __init__( self, pgdfile, prepfile, surffile, settings, input_data, binary=None, archive_data=None, print_namelist=True, batch=None, ): """Masterodb. Args: pgdfile (_type_): _description_ prepfile (_type_): _description_ surffile (_type_): _description_ settings (_type_): _description_ input_data (_type_): _description_ binary (_type_, optional): _description_. Defaults to None. archive_data (_type_, optional): _description_. Defaults to None. print_namelist (bool, optional): _description_. Defaults to True. batch (_type_, optional): _description_. Defaults to None. Raises: FileNotFoundError: _description_ FileNotFoundError: _description_ """ self.settings = settings self.binary = binary self.prepfile = prepfile self.surfout = surffile self.batch = batch self.pgdfile = pgdfile self.input = input_data self.archive = archive_data self.print_namelist = print_namelist # Set input self.input.prepare_input() # Prepare namelist if os.path.exists("EXSEG1.nam"): os.remove("EXSEG1.nam") self.settings.write("EXSEG1.nam") with open("EXSEG1.nam", mode="r", encoding="utf-8") as file_handler: content = file_handler.read() if self.print_namelist: logging.info(content) logging.info("PGD file for MASTERODB %s", self.pgdfile.filename) if self.pgdfile.input_file is not None and os.path.abspath( self.pgdfile.filename ) != os.path.abspath(self.pgdfile.input_file): logging.info("Input PGD file is: %s", self.pgdfile.input_file) remove_existing_file(self.pgdfile.input_file, self.pgdfile.filename) os.symlink(self.pgdfile.input_file, self.pgdfile.filename) if not os.path.exists(self.pgdfile.filename): logging.error("PGD not found! %s", self.pgdfile.filename) raise FileNotFoundError(self.pgdfile.filename) logging.info("PREP file for MASTERODB %s", self.prepfile.filename) if self.prepfile.input_file is not None and os.path.abspath( self.prepfile.filename ) != os.path.abspath(self.prepfile.input_file): logging.info("Input PREP file is: %s", self.prepfile.input_file) remove_existing_file(self.prepfile.input_file, self.prepfile.filename) os.symlink(self.prepfile.input_file, self.prepfile.filename) if not os.path.exists(self.prepfile.filename): logging.error("PREP not found! %s", self.prepfile.filename) raise FileNotFoundError(self.prepfile.filename) # Archive if we have run the binary if self.binary is not None: logging.info("Run binary with namelist EXSEG1.nam: %s", self.binary) self.batch.run(self.binary)
[docs] def archive_output(self): """Archive output.""" # Archive output self.surfout.archive_output_file() if self.archive is not None: self.archive.archive_files()
# TODO is it used?
[docs]def create_working_dir(workdir, enter=True): """Create working dir.""" # Create work directory if workdir is not None: if os.path.isdir(workdir): shutil.rmtree(workdir) os.makedirs(workdir, exist_ok=True) if enter: os.chdir(workdir)
# TODO is it used?
[docs]def clean_working_dir(workdir): """Clean working dir.""" if exit: os.chdir("..") # Clean up shutil.rmtree(workdir)